Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 4c4e4e1e

History | View | Annotate | Download (247.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import tempfile
30
import re
31
import platform
32
import logging
33
import copy
34
import random
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45
from ganeti import ssconf
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overriden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98
    self.CheckArguments()
99

    
100
  def __GetSSH(self):
101
    """Returns the SshRunner object
102

103
    """
104
    if not self.__ssh:
105
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106
    return self.__ssh
107

    
108
  ssh = property(fget=__GetSSH)
109

    
110
  def CheckArguments(self):
111
    """Check syntactic validity for the opcode arguments.
112

113
    This method is for doing a simple syntactic check and ensure
114
    validity of opcode parameters, without any cluster-related
115
    checks. While the same can be accomplished in ExpandNames and/or
116
    CheckPrereq, doing these separate is better because:
117

118
      - ExpandNames is left as as purely a lock-related function
119
      - CheckPrereq is run after we have aquired locks (and possible
120
        waited for them)
121

122
    The function is allowed to change the self.op attribute so that
123
    later methods can no longer worry about missing parameters.
124

125
    """
126
    pass
127

    
128
  def ExpandNames(self):
129
    """Expand names for this LU.
130

131
    This method is called before starting to execute the opcode, and it should
132
    update all the parameters of the opcode to their canonical form (e.g. a
133
    short node name must be fully expanded after this method has successfully
134
    completed). This way locking, hooks, logging, ecc. can work correctly.
135

136
    LUs which implement this method must also populate the self.needed_locks
137
    member, as a dict with lock levels as keys, and a list of needed lock names
138
    as values. Rules:
139

140
      - use an empty dict if you don't need any lock
141
      - if you don't need any lock at a particular level omit that level
142
      - don't put anything for the BGL level
143
      - if you want all locks at a level use locking.ALL_SET as a value
144

145
    If you need to share locks (rather than acquire them exclusively) at one
146
    level you can modify self.share_locks, setting a true value (usually 1) for
147
    that level. By default locks are not shared.
148

149
    Examples::
150

151
      # Acquire all nodes and one instance
152
      self.needed_locks = {
153
        locking.LEVEL_NODE: locking.ALL_SET,
154
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155
      }
156
      # Acquire just two nodes
157
      self.needed_locks = {
158
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159
      }
160
      # Acquire no locks
161
      self.needed_locks = {} # No, you can't leave it to the default value None
162

163
    """
164
    # The implementation of this method is mandatory only if the new LU is
165
    # concurrent, so that old LUs don't need to be changed all at the same
166
    # time.
167
    if self.REQ_BGL:
168
      self.needed_locks = {} # Exclusive LUs don't need locks.
169
    else:
170
      raise NotImplementedError
171

    
172
  def DeclareLocks(self, level):
173
    """Declare LU locking needs for a level
174

175
    While most LUs can just declare their locking needs at ExpandNames time,
176
    sometimes there's the need to calculate some locks after having acquired
177
    the ones before. This function is called just before acquiring locks at a
178
    particular level, but after acquiring the ones at lower levels, and permits
179
    such calculations. It can be used to modify self.needed_locks, and by
180
    default it does nothing.
181

182
    This function is only called if you have something already set in
183
    self.needed_locks for the level.
184

185
    @param level: Locking level which is going to be locked
186
    @type level: member of ganeti.locking.LEVELS
187

188
    """
189

    
190
  def CheckPrereq(self):
191
    """Check prerequisites for this LU.
192

193
    This method should check that the prerequisites for the execution
194
    of this LU are fulfilled. It can do internode communication, but
195
    it should be idempotent - no cluster or system changes are
196
    allowed.
197

198
    The method should raise errors.OpPrereqError in case something is
199
    not fulfilled. Its return value is ignored.
200

201
    This method should also update all the parameters of the opcode to
202
    their canonical form if it hasn't been done by ExpandNames before.
203

204
    """
205
    raise NotImplementedError
206

    
207
  def Exec(self, feedback_fn):
208
    """Execute the LU.
209

210
    This method should implement the actual work. It should raise
211
    errors.OpExecError for failures that are somewhat dealt with in
212
    code, or expected.
213

214
    """
215
    raise NotImplementedError
216

    
217
  def BuildHooksEnv(self):
218
    """Build hooks environment for this LU.
219

220
    This method should return a three-node tuple consisting of: a dict
221
    containing the environment that will be used for running the
222
    specific hook for this LU, a list of node names on which the hook
223
    should run before the execution, and a list of node names on which
224
    the hook should run after the execution.
225

226
    The keys of the dict must not have 'GANETI_' prefixed as this will
227
    be handled in the hooks runner. Also note additional keys will be
228
    added by the hooks runner. If the LU doesn't define any
229
    environment, an empty dict (and not None) should be returned.
230

231
    No nodes should be returned as an empty list (and not None).
232

233
    Note that if the HPATH for a LU class is None, this function will
234
    not be called.
235

236
    """
237
    raise NotImplementedError
238

    
239
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240
    """Notify the LU about the results of its hooks.
241

242
    This method is called every time a hooks phase is executed, and notifies
243
    the Logical Unit about the hooks' result. The LU can then use it to alter
244
    its result based on the hooks.  By default the method does nothing and the
245
    previous result is passed back unchanged but any LU can define it if it
246
    wants to use the local cluster hook-scripts somehow.
247

248
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
249
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250
    @param hook_results: the results of the multi-node hooks rpc call
251
    @param feedback_fn: function used send feedback back to the caller
252
    @param lu_result: the previous Exec result this LU had, or None
253
        in the PRE phase
254
    @return: the new Exec result, based on the previous result
255
        and hook results
256

257
    """
258
    return lu_result
259

    
260
  def _ExpandAndLockInstance(self):
261
    """Helper function to expand and lock an instance.
262

263
    Many LUs that work on an instance take its name in self.op.instance_name
264
    and need to expand it and then declare the expanded name for locking. This
265
    function does it, and then updates self.op.instance_name to the expanded
266
    name. It also initializes needed_locks as a dict, if this hasn't been done
267
    before.
268

269
    """
270
    if self.needed_locks is None:
271
      self.needed_locks = {}
272
    else:
273
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274
        "_ExpandAndLockInstance called with instance-level locks set"
275
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276
    if expanded_name is None:
277
      raise errors.OpPrereqError("Instance '%s' not known" %
278
                                  self.op.instance_name)
279
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280
    self.op.instance_name = expanded_name
281

    
282
  def _LockInstancesNodes(self, primary_only=False):
283
    """Helper function to declare instances' nodes for locking.
284

285
    This function should be called after locking one or more instances to lock
286
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287
    with all primary or secondary nodes for instances already locked and
288
    present in self.needed_locks[locking.LEVEL_INSTANCE].
289

290
    It should be called from DeclareLocks, and for safety only works if
291
    self.recalculate_locks[locking.LEVEL_NODE] is set.
292

293
    In the future it may grow parameters to just lock some instance's nodes, or
294
    to just lock primaries or secondary nodes, if needed.
295

296
    If should be called in DeclareLocks in a way similar to::
297

298
      if level == locking.LEVEL_NODE:
299
        self._LockInstancesNodes()
300

301
    @type primary_only: boolean
302
    @param primary_only: only lock primary nodes of locked instances
303

304
    """
305
    assert locking.LEVEL_NODE in self.recalculate_locks, \
306
      "_LockInstancesNodes helper function called with no nodes to recalculate"
307

    
308
    # TODO: check if we're really been called with the instance locks held
309

    
310
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311
    # future we might want to have different behaviors depending on the value
312
    # of self.recalculate_locks[locking.LEVEL_NODE]
313
    wanted_nodes = []
314
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315
      instance = self.context.cfg.GetInstanceInfo(instance_name)
316
      wanted_nodes.append(instance.primary_node)
317
      if not primary_only:
318
        wanted_nodes.extend(instance.secondary_nodes)
319

    
320
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324

    
325
    del self.recalculate_locks[locking.LEVEL_NODE]
326

    
327

    
328
class NoHooksLU(LogicalUnit):
329
  """Simple LU which runs no hooks.
330

331
  This LU is intended as a parent for other LogicalUnits which will
332
  run no hooks, in order to reduce duplicate code.
333

334
  """
335
  HPATH = None
336
  HTYPE = None
337

    
338

    
339
def _GetWantedNodes(lu, nodes):
340
  """Returns list of checked and expanded node names.
341

342
  @type lu: L{LogicalUnit}
343
  @param lu: the logical unit on whose behalf we execute
344
  @type nodes: list
345
  @param nodes: list of node names or None for all nodes
346
  @rtype: list
347
  @return: the list of nodes, sorted
348
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349

350
  """
351
  if not isinstance(nodes, list):
352
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
353

    
354
  if not nodes:
355
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356
      " non-empty list of nodes whose name is to be expanded.")
357

    
358
  wanted = []
359
  for name in nodes:
360
    node = lu.cfg.ExpandNodeName(name)
361
    if node is None:
362
      raise errors.OpPrereqError("No such node name '%s'" % name)
363
    wanted.append(node)
364

    
365
  return utils.NiceSort(wanted)
366

    
367

    
368
def _GetWantedInstances(lu, instances):
369
  """Returns list of checked and expanded instance names.
370

371
  @type lu: L{LogicalUnit}
372
  @param lu: the logical unit on whose behalf we execute
373
  @type instances: list
374
  @param instances: list of instance names or None for all instances
375
  @rtype: list
376
  @return: the list of instances, sorted
377
  @raise errors.OpPrereqError: if the instances parameter is wrong type
378
  @raise errors.OpPrereqError: if any of the passed instances is not found
379

380
  """
381
  if not isinstance(instances, list):
382
    raise errors.OpPrereqError("Invalid argument type 'instances'")
383

    
384
  if instances:
385
    wanted = []
386

    
387
    for name in instances:
388
      instance = lu.cfg.ExpandInstanceName(name)
389
      if instance is None:
390
        raise errors.OpPrereqError("No such instance name '%s'" % name)
391
      wanted.append(instance)
392

    
393
  else:
394
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395
  return wanted
396

    
397

    
398
def _CheckOutputFields(static, dynamic, selected):
399
  """Checks whether all selected fields are valid.
400

401
  @type static: L{utils.FieldSet}
402
  @param static: static fields set
403
  @type dynamic: L{utils.FieldSet}
404
  @param dynamic: dynamic fields set
405

406
  """
407
  f = utils.FieldSet()
408
  f.Extend(static)
409
  f.Extend(dynamic)
410

    
411
  delta = f.NonMatching(selected)
412
  if delta:
413
    raise errors.OpPrereqError("Unknown output fields selected: %s"
414
                               % ",".join(delta))
415

    
416

    
417
def _CheckBooleanOpField(op, name):
418
  """Validates boolean opcode parameters.
419

420
  This will ensure that an opcode parameter is either a boolean value,
421
  or None (but that it always exists).
422

423
  """
424
  val = getattr(op, name, None)
425
  if not (val is None or isinstance(val, bool)):
426
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427
                               (name, str(val)))
428
  setattr(op, name, val)
429

    
430

    
431
def _CheckNodeOnline(lu, node):
432
  """Ensure that a given node is online.
433

434
  @param lu: the LU on behalf of which we make the check
435
  @param node: the node to check
436
  @raise errors.OpPrereqError: if the node is offline
437

438
  """
439
  if lu.cfg.GetNodeInfo(node).offline:
440
    raise errors.OpPrereqError("Can't use offline node %s" % node)
441

    
442

    
443
def _CheckNodeNotDrained(lu, node):
444
  """Ensure that a given node is not drained.
445

446
  @param lu: the LU on behalf of which we make the check
447
  @param node: the node to check
448
  @raise errors.OpPrereqError: if the node is drained
449

450
  """
451
  if lu.cfg.GetNodeInfo(node).drained:
452
    raise errors.OpPrereqError("Can't use drained node %s" % node)
453

    
454

    
455
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456
                          memory, vcpus, nics, disk_template, disks):
457
  """Builds instance related env variables for hooks
458

459
  This builds the hook environment from individual variables.
460

461
  @type name: string
462
  @param name: the name of the instance
463
  @type primary_node: string
464
  @param primary_node: the name of the instance's primary node
465
  @type secondary_nodes: list
466
  @param secondary_nodes: list of secondary nodes as strings
467
  @type os_type: string
468
  @param os_type: the name of the instance's OS
469
  @type status: boolean
470
  @param status: the should_run status of the instance
471
  @type memory: string
472
  @param memory: the memory size of the instance
473
  @type vcpus: string
474
  @param vcpus: the count of VCPUs the instance has
475
  @type nics: list
476
  @param nics: list of tuples (ip, bridge, mac) representing
477
      the NICs the instance  has
478
  @type disk_template: string
479
  @param disk_template: the distk template of the instance
480
  @type disks: list
481
  @param disks: the list of (size, mode) pairs
482
  @rtype: dict
483
  @return: the hook environment for this instance
484

485
  """
486
  if status:
487
    str_status = "up"
488
  else:
489
    str_status = "down"
490
  env = {
491
    "OP_TARGET": name,
492
    "INSTANCE_NAME": name,
493
    "INSTANCE_PRIMARY": primary_node,
494
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495
    "INSTANCE_OS_TYPE": os_type,
496
    "INSTANCE_STATUS": str_status,
497
    "INSTANCE_MEMORY": memory,
498
    "INSTANCE_VCPUS": vcpus,
499
    "INSTANCE_DISK_TEMPLATE": disk_template,
500
  }
501

    
502
  if nics:
503
    nic_count = len(nics)
504
    for idx, (ip, mac, mode, link) in enumerate(nics):
505
      if ip is None:
506
        ip = ""
507
      env["INSTANCE_NIC%d_IP" % idx] = ip
508
      env["INSTANCE_NIC%d_MAC" % idx] = mac
509
      env["INSTANCE_NIC%d_MODE" % idx] = mode
510
      env["INSTANCE_NIC%d_LINK" % idx] = link
511
      if mode == constants.NIC_MODE_BRIDGED:
512
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513
  else:
514
    nic_count = 0
515

    
516
  env["INSTANCE_NIC_COUNT"] = nic_count
517

    
518
  if disks:
519
    disk_count = len(disks)
520
    for idx, (size, mode) in enumerate(disks):
521
      env["INSTANCE_DISK%d_SIZE" % idx] = size
522
      env["INSTANCE_DISK%d_MODE" % idx] = mode
523
  else:
524
    disk_count = 0
525

    
526
  env["INSTANCE_DISK_COUNT"] = disk_count
527

    
528
  return env
529

    
530
def _PreBuildNICHooksList(lu, nics):
531
  """Build a list of nic information tuples.
532

533
  This list is suitable to be passed to _BuildInstanceHookEnv.
534

535
  @type lu:  L{LogicalUnit}
536
  @param lu: the logical unit on whose behalf we execute
537
  @type nics: list of L{objects.NIC}
538
  @param nics: list of nics to convert to hooks tuples
539

540
  """
541
  hooks_nics = []
542
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543
  for nic in nics:
544
    ip = nic.ip
545
    mac = nic.mac
546
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547
    mode = filled_params[constants.NIC_MODE]
548
    link = filled_params[constants.NIC_LINK]
549
    hooks_nics.append((ip, mac, mode, link))
550
  return hooks_nics
551

    
552
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553
  """Builds instance related env variables for hooks from an object.
554

555
  @type lu: L{LogicalUnit}
556
  @param lu: the logical unit on whose behalf we execute
557
  @type instance: L{objects.Instance}
558
  @param instance: the instance for which we should build the
559
      environment
560
  @type override: dict
561
  @param override: dictionary with key/values that will override
562
      our values
563
  @rtype: dict
564
  @return: the hook environment dictionary
565

566
  """
567
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
568
  args = {
569
    'name': instance.name,
570
    'primary_node': instance.primary_node,
571
    'secondary_nodes': instance.secondary_nodes,
572
    'os_type': instance.os,
573
    'status': instance.admin_up,
574
    'memory': bep[constants.BE_MEMORY],
575
    'vcpus': bep[constants.BE_VCPUS],
576
    'nics': _PreBuildNICHooksList(lu, instance.nics),
577
    'disk_template': instance.disk_template,
578
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
579
  }
580
  if override:
581
    args.update(override)
582
  return _BuildInstanceHookEnv(**args)
583

    
584

    
585
def _AdjustCandidatePool(lu):
586
  """Adjust the candidate pool after node operations.
587

588
  """
589
  mod_list = lu.cfg.MaintainCandidatePool()
590
  if mod_list:
591
    lu.LogInfo("Promoted nodes to master candidate role: %s",
592
               ", ".join(node.name for node in mod_list))
593
    for name in mod_list:
594
      lu.context.ReaddNode(name)
595
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596
  if mc_now > mc_max:
597
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598
               (mc_now, mc_max))
599

    
600

    
601
def _CheckNicsBridgesExist(lu, target_nics, target_node,
602
                               profile=constants.PP_DEFAULT):
603
  """Check that the brigdes needed by a list of nics exist.
604

605
  """
606
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608
                for nic in target_nics]
609
  brlist = [params[constants.NIC_LINK] for params in paramslist
610
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611
  if brlist:
612
    result = lu.rpc.call_bridges_exist(target_node, brlist)
613
    result.Raise("Error checking bridges on destination node '%s'" %
614
                 target_node, prereq=True)
615

    
616

    
617
def _CheckInstanceBridgesExist(lu, instance, node=None):
618
  """Check that the brigdes needed by an instance exist.
619

620
  """
621
  if node is None:
622
    node=instance.primary_node
623
  _CheckNicsBridgesExist(lu, instance.nics, node)
624

    
625

    
626
class LUDestroyCluster(NoHooksLU):
627
  """Logical unit for destroying the cluster.
628

629
  """
630
  _OP_REQP = []
631

    
632
  def CheckPrereq(self):
633
    """Check prerequisites.
634

635
    This checks whether the cluster is empty.
636

637
    Any errors are signalled by raising errors.OpPrereqError.
638

639
    """
640
    master = self.cfg.GetMasterNode()
641

    
642
    nodelist = self.cfg.GetNodeList()
643
    if len(nodelist) != 1 or nodelist[0] != master:
644
      raise errors.OpPrereqError("There are still %d node(s) in"
645
                                 " this cluster." % (len(nodelist) - 1))
646
    instancelist = self.cfg.GetInstanceList()
647
    if instancelist:
648
      raise errors.OpPrereqError("There are still %d instance(s) in"
649
                                 " this cluster." % len(instancelist))
650

    
651
  def Exec(self, feedback_fn):
652
    """Destroys the cluster.
653

654
    """
655
    master = self.cfg.GetMasterNode()
656
    result = self.rpc.call_node_stop_master(master, False)
657
    result.Raise("Could not disable the master role")
658
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
659
    utils.CreateBackup(priv_key)
660
    utils.CreateBackup(pub_key)
661
    return master
662

    
663

    
664
class LUVerifyCluster(LogicalUnit):
665
  """Verifies the cluster status.
666

667
  """
668
  HPATH = "cluster-verify"
669
  HTYPE = constants.HTYPE_CLUSTER
670
  _OP_REQP = ["skip_checks"]
671
  REQ_BGL = False
672

    
673
  def ExpandNames(self):
674
    self.needed_locks = {
675
      locking.LEVEL_NODE: locking.ALL_SET,
676
      locking.LEVEL_INSTANCE: locking.ALL_SET,
677
    }
678
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
679

    
680
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
681
                  node_result, feedback_fn, master_files,
682
                  drbd_map, vg_name):
683
    """Run multiple tests against a node.
684

685
    Test list:
686

687
      - compares ganeti version
688
      - checks vg existance and size > 20G
689
      - checks config file checksum
690
      - checks ssh to other nodes
691

692
    @type nodeinfo: L{objects.Node}
693
    @param nodeinfo: the node to check
694
    @param file_list: required list of files
695
    @param local_cksum: dictionary of local files and their checksums
696
    @param node_result: the results from the node
697
    @param feedback_fn: function used to accumulate results
698
    @param master_files: list of files that only masters should have
699
    @param drbd_map: the useddrbd minors for this node, in
700
        form of minor: (instance, must_exist) which correspond to instances
701
        and their running status
702
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
703

704
    """
705
    node = nodeinfo.name
706

    
707
    # main result, node_result should be a non-empty dict
708
    if not node_result or not isinstance(node_result, dict):
709
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
710
      return True
711

    
712
    # compares ganeti version
713
    local_version = constants.PROTOCOL_VERSION
714
    remote_version = node_result.get('version', None)
715
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
716
            len(remote_version) == 2):
717
      feedback_fn("  - ERROR: connection to %s failed" % (node))
718
      return True
719

    
720
    if local_version != remote_version[0]:
721
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
722
                  " node %s %s" % (local_version, node, remote_version[0]))
723
      return True
724

    
725
    # node seems compatible, we can actually try to look into its results
726

    
727
    bad = False
728

    
729
    # full package version
730
    if constants.RELEASE_VERSION != remote_version[1]:
731
      feedback_fn("  - WARNING: software version mismatch: master %s,"
732
                  " node %s %s" %
733
                  (constants.RELEASE_VERSION, node, remote_version[1]))
734

    
735
    # checks vg existence and size > 20G
736
    if vg_name is not None:
737
      vglist = node_result.get(constants.NV_VGLIST, None)
738
      if not vglist:
739
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
740
                        (node,))
741
        bad = True
742
      else:
743
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
744
                                              constants.MIN_VG_SIZE)
745
        if vgstatus:
746
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
747
          bad = True
748

    
749
    # checks config file checksum
750

    
751
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
752
    if not isinstance(remote_cksum, dict):
753
      bad = True
754
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
755
    else:
756
      for file_name in file_list:
757
        node_is_mc = nodeinfo.master_candidate
758
        must_have_file = file_name not in master_files
759
        if file_name not in remote_cksum:
760
          if node_is_mc or must_have_file:
761
            bad = True
762
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
763
        elif remote_cksum[file_name] != local_cksum[file_name]:
764
          if node_is_mc or must_have_file:
765
            bad = True
766
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
767
          else:
768
            # not candidate and this is not a must-have file
769
            bad = True
770
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
771
                        " '%s'" % file_name)
772
        else:
773
          # all good, except non-master/non-must have combination
774
          if not node_is_mc and not must_have_file:
775
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
776
                        " candidates" % file_name)
777

    
778
    # checks ssh to any
779

    
780
    if constants.NV_NODELIST not in node_result:
781
      bad = True
782
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
783
    else:
784
      if node_result[constants.NV_NODELIST]:
785
        bad = True
786
        for node in node_result[constants.NV_NODELIST]:
787
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
788
                          (node, node_result[constants.NV_NODELIST][node]))
789

    
790
    if constants.NV_NODENETTEST not in node_result:
791
      bad = True
792
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
793
    else:
794
      if node_result[constants.NV_NODENETTEST]:
795
        bad = True
796
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
797
        for node in nlist:
798
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
799
                          (node, node_result[constants.NV_NODENETTEST][node]))
800

    
801
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
802
    if isinstance(hyp_result, dict):
803
      for hv_name, hv_result in hyp_result.iteritems():
804
        if hv_result is not None:
805
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
806
                      (hv_name, hv_result))
807

    
808
    # check used drbd list
809
    if vg_name is not None:
810
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
811
      if not isinstance(used_minors, (tuple, list)):
812
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
813
                    str(used_minors))
814
      else:
815
        for minor, (iname, must_exist) in drbd_map.items():
816
          if minor not in used_minors and must_exist:
817
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
818
                        " not active" % (minor, iname))
819
            bad = True
820
        for minor in used_minors:
821
          if minor not in drbd_map:
822
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
823
                        minor)
824
            bad = True
825

    
826
    return bad
827

    
828
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
829
                      node_instance, feedback_fn, n_offline):
830
    """Verify an instance.
831

832
    This function checks to see if the required block devices are
833
    available on the instance's node.
834

835
    """
836
    bad = False
837

    
838
    node_current = instanceconfig.primary_node
839

    
840
    node_vol_should = {}
841
    instanceconfig.MapLVsByNode(node_vol_should)
842

    
843
    for node in node_vol_should:
844
      if node in n_offline:
845
        # ignore missing volumes on offline nodes
846
        continue
847
      for volume in node_vol_should[node]:
848
        if node not in node_vol_is or volume not in node_vol_is[node]:
849
          feedback_fn("  - ERROR: volume %s missing on node %s" %
850
                          (volume, node))
851
          bad = True
852

    
853
    if instanceconfig.admin_up:
854
      if ((node_current not in node_instance or
855
          not instance in node_instance[node_current]) and
856
          node_current not in n_offline):
857
        feedback_fn("  - ERROR: instance %s not running on node %s" %
858
                        (instance, node_current))
859
        bad = True
860

    
861
    for node in node_instance:
862
      if (not node == node_current):
863
        if instance in node_instance[node]:
864
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
865
                          (instance, node))
866
          bad = True
867

    
868
    return bad
869

    
870
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
871
    """Verify if there are any unknown volumes in the cluster.
872

873
    The .os, .swap and backup volumes are ignored. All other volumes are
874
    reported as unknown.
875

876
    """
877
    bad = False
878

    
879
    for node in node_vol_is:
880
      for volume in node_vol_is[node]:
881
        if node not in node_vol_should or volume not in node_vol_should[node]:
882
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
883
                      (volume, node))
884
          bad = True
885
    return bad
886

    
887
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
888
    """Verify the list of running instances.
889

890
    This checks what instances are running but unknown to the cluster.
891

892
    """
893
    bad = False
894
    for node in node_instance:
895
      for runninginstance in node_instance[node]:
896
        if runninginstance not in instancelist:
897
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
898
                          (runninginstance, node))
899
          bad = True
900
    return bad
901

    
902
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
903
    """Verify N+1 Memory Resilience.
904

905
    Check that if one single node dies we can still start all the instances it
906
    was primary for.
907

908
    """
909
    bad = False
910

    
911
    for node, nodeinfo in node_info.iteritems():
912
      # This code checks that every node which is now listed as secondary has
913
      # enough memory to host all instances it is supposed to should a single
914
      # other node in the cluster fail.
915
      # FIXME: not ready for failover to an arbitrary node
916
      # FIXME: does not support file-backed instances
917
      # WARNING: we currently take into account down instances as well as up
918
      # ones, considering that even if they're down someone might want to start
919
      # them even in the event of a node failure.
920
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
921
        needed_mem = 0
922
        for instance in instances:
923
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
924
          if bep[constants.BE_AUTO_BALANCE]:
925
            needed_mem += bep[constants.BE_MEMORY]
926
        if nodeinfo['mfree'] < needed_mem:
927
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
928
                      " failovers should node %s fail" % (node, prinode))
929
          bad = True
930
    return bad
931

    
932
  def CheckPrereq(self):
933
    """Check prerequisites.
934

935
    Transform the list of checks we're going to skip into a set and check that
936
    all its members are valid.
937

938
    """
939
    self.skip_set = frozenset(self.op.skip_checks)
940
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
941
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
942

    
943
  def BuildHooksEnv(self):
944
    """Build hooks env.
945

946
    Cluster-Verify hooks just rone in the post phase and their failure makes
947
    the output be logged in the verify output and the verification to fail.
948

949
    """
950
    all_nodes = self.cfg.GetNodeList()
951
    env = {
952
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
953
      }
954
    for node in self.cfg.GetAllNodesInfo().values():
955
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
956

    
957
    return env, [], all_nodes
958

    
959
  def Exec(self, feedback_fn):
960
    """Verify integrity of cluster, performing various test on nodes.
961

962
    """
963
    bad = False
964
    feedback_fn("* Verifying global settings")
965
    for msg in self.cfg.VerifyConfig():
966
      feedback_fn("  - ERROR: %s" % msg)
967

    
968
    vg_name = self.cfg.GetVGName()
969
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
970
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
971
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
972
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
973
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
974
                        for iname in instancelist)
975
    i_non_redundant = [] # Non redundant instances
976
    i_non_a_balanced = [] # Non auto-balanced instances
977
    n_offline = [] # List of offline nodes
978
    n_drained = [] # List of nodes being drained
979
    node_volume = {}
980
    node_instance = {}
981
    node_info = {}
982
    instance_cfg = {}
983

    
984
    # FIXME: verify OS list
985
    # do local checksums
986
    master_files = [constants.CLUSTER_CONF_FILE]
987

    
988
    file_names = ssconf.SimpleStore().GetFileList()
989
    file_names.append(constants.SSL_CERT_FILE)
990
    file_names.append(constants.RAPI_CERT_FILE)
991
    file_names.extend(master_files)
992

    
993
    local_checksums = utils.FingerprintFiles(file_names)
994

    
995
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
996
    node_verify_param = {
997
      constants.NV_FILELIST: file_names,
998
      constants.NV_NODELIST: [node.name for node in nodeinfo
999
                              if not node.offline],
1000
      constants.NV_HYPERVISOR: hypervisors,
1001
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1002
                                  node.secondary_ip) for node in nodeinfo
1003
                                 if not node.offline],
1004
      constants.NV_INSTANCELIST: hypervisors,
1005
      constants.NV_VERSION: None,
1006
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1007
      }
1008
    if vg_name is not None:
1009
      node_verify_param[constants.NV_VGLIST] = None
1010
      node_verify_param[constants.NV_LVLIST] = vg_name
1011
      node_verify_param[constants.NV_DRBDLIST] = None
1012
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1013
                                           self.cfg.GetClusterName())
1014

    
1015
    cluster = self.cfg.GetClusterInfo()
1016
    master_node = self.cfg.GetMasterNode()
1017
    all_drbd_map = self.cfg.ComputeDRBDMap()
1018

    
1019
    for node_i in nodeinfo:
1020
      node = node_i.name
1021

    
1022
      if node_i.offline:
1023
        feedback_fn("* Skipping offline node %s" % (node,))
1024
        n_offline.append(node)
1025
        continue
1026

    
1027
      if node == master_node:
1028
        ntype = "master"
1029
      elif node_i.master_candidate:
1030
        ntype = "master candidate"
1031
      elif node_i.drained:
1032
        ntype = "drained"
1033
        n_drained.append(node)
1034
      else:
1035
        ntype = "regular"
1036
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1037

    
1038
      msg = all_nvinfo[node].fail_msg
1039
      if msg:
1040
        feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1041
        bad = True
1042
        continue
1043

    
1044
      nresult = all_nvinfo[node].payload
1045
      node_drbd = {}
1046
      for minor, instance in all_drbd_map[node].items():
1047
        if instance not in instanceinfo:
1048
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1049
                      instance)
1050
          # ghost instance should not be running, but otherwise we
1051
          # don't give double warnings (both ghost instance and
1052
          # unallocated minor in use)
1053
          node_drbd[minor] = (instance, False)
1054
        else:
1055
          instance = instanceinfo[instance]
1056
          node_drbd[minor] = (instance.name, instance.admin_up)
1057
      result = self._VerifyNode(node_i, file_names, local_checksums,
1058
                                nresult, feedback_fn, master_files,
1059
                                node_drbd, vg_name)
1060
      bad = bad or result
1061

    
1062
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1063
      if vg_name is None:
1064
        node_volume[node] = {}
1065
      elif isinstance(lvdata, basestring):
1066
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1067
                    (node, utils.SafeEncode(lvdata)))
1068
        bad = True
1069
        node_volume[node] = {}
1070
      elif not isinstance(lvdata, dict):
1071
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1072
        bad = True
1073
        continue
1074
      else:
1075
        node_volume[node] = lvdata
1076

    
1077
      # node_instance
1078
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1079
      if not isinstance(idata, list):
1080
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1081
                    (node,))
1082
        bad = True
1083
        continue
1084

    
1085
      node_instance[node] = idata
1086

    
1087
      # node_info
1088
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1089
      if not isinstance(nodeinfo, dict):
1090
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1091
        bad = True
1092
        continue
1093

    
1094
      try:
1095
        node_info[node] = {
1096
          "mfree": int(nodeinfo['memory_free']),
1097
          "pinst": [],
1098
          "sinst": [],
1099
          # dictionary holding all instances this node is secondary for,
1100
          # grouped by their primary node. Each key is a cluster node, and each
1101
          # value is a list of instances which have the key as primary and the
1102
          # current node as secondary.  this is handy to calculate N+1 memory
1103
          # availability if you can only failover from a primary to its
1104
          # secondary.
1105
          "sinst-by-pnode": {},
1106
        }
1107
        # FIXME: devise a free space model for file based instances as well
1108
        if vg_name is not None:
1109
          if (constants.NV_VGLIST not in nresult or
1110
              vg_name not in nresult[constants.NV_VGLIST]):
1111
            feedback_fn("  - ERROR: node %s didn't return data for the"
1112
                        " volume group '%s' - it is either missing or broken" %
1113
                        (node, vg_name))
1114
            bad = True
1115
            continue
1116
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1117
      except (ValueError, KeyError):
1118
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1119
                    " from node %s" % (node,))
1120
        bad = True
1121
        continue
1122

    
1123
    node_vol_should = {}
1124

    
1125
    for instance in instancelist:
1126
      feedback_fn("* Verifying instance %s" % instance)
1127
      inst_config = instanceinfo[instance]
1128
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1129
                                     node_instance, feedback_fn, n_offline)
1130
      bad = bad or result
1131
      inst_nodes_offline = []
1132

    
1133
      inst_config.MapLVsByNode(node_vol_should)
1134

    
1135
      instance_cfg[instance] = inst_config
1136

    
1137
      pnode = inst_config.primary_node
1138
      if pnode in node_info:
1139
        node_info[pnode]['pinst'].append(instance)
1140
      elif pnode not in n_offline:
1141
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1142
                    " %s failed" % (instance, pnode))
1143
        bad = True
1144

    
1145
      if pnode in n_offline:
1146
        inst_nodes_offline.append(pnode)
1147

    
1148
      # If the instance is non-redundant we cannot survive losing its primary
1149
      # node, so we are not N+1 compliant. On the other hand we have no disk
1150
      # templates with more than one secondary so that situation is not well
1151
      # supported either.
1152
      # FIXME: does not support file-backed instances
1153
      if len(inst_config.secondary_nodes) == 0:
1154
        i_non_redundant.append(instance)
1155
      elif len(inst_config.secondary_nodes) > 1:
1156
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1157
                    % instance)
1158

    
1159
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1160
        i_non_a_balanced.append(instance)
1161

    
1162
      for snode in inst_config.secondary_nodes:
1163
        if snode in node_info:
1164
          node_info[snode]['sinst'].append(instance)
1165
          if pnode not in node_info[snode]['sinst-by-pnode']:
1166
            node_info[snode]['sinst-by-pnode'][pnode] = []
1167
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1168
        elif snode not in n_offline:
1169
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1170
                      " %s failed" % (instance, snode))
1171
          bad = True
1172
        if snode in n_offline:
1173
          inst_nodes_offline.append(snode)
1174

    
1175
      if inst_nodes_offline:
1176
        # warn that the instance lives on offline nodes, and set bad=True
1177
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1178
                    ", ".join(inst_nodes_offline))
1179
        bad = True
1180

    
1181
    feedback_fn("* Verifying orphan volumes")
1182
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1183
                                       feedback_fn)
1184
    bad = bad or result
1185

    
1186
    feedback_fn("* Verifying remaining instances")
1187
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1188
                                         feedback_fn)
1189
    bad = bad or result
1190

    
1191
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1192
      feedback_fn("* Verifying N+1 Memory redundancy")
1193
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1194
      bad = bad or result
1195

    
1196
    feedback_fn("* Other Notes")
1197
    if i_non_redundant:
1198
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1199
                  % len(i_non_redundant))
1200

    
1201
    if i_non_a_balanced:
1202
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1203
                  % len(i_non_a_balanced))
1204

    
1205
    if n_offline:
1206
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1207

    
1208
    if n_drained:
1209
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1210

    
1211
    return not bad
1212

    
1213
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1214
    """Analize the post-hooks' result
1215

1216
    This method analyses the hook result, handles it, and sends some
1217
    nicely-formatted feedback back to the user.
1218

1219
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1220
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1221
    @param hooks_results: the results of the multi-node hooks rpc call
1222
    @param feedback_fn: function used send feedback back to the caller
1223
    @param lu_result: previous Exec result
1224
    @return: the new Exec result, based on the previous result
1225
        and hook results
1226

1227
    """
1228
    # We only really run POST phase hooks, and are only interested in
1229
    # their results
1230
    if phase == constants.HOOKS_PHASE_POST:
1231
      # Used to change hooks' output to proper indentation
1232
      indent_re = re.compile('^', re.M)
1233
      feedback_fn("* Hooks Results")
1234
      if not hooks_results:
1235
        feedback_fn("  - ERROR: general communication failure")
1236
        lu_result = 1
1237
      else:
1238
        for node_name in hooks_results:
1239
          show_node_header = True
1240
          res = hooks_results[node_name]
1241
          msg = res.fail_msg
1242
          if msg:
1243
            if res.offline:
1244
              # no need to warn or set fail return value
1245
              continue
1246
            feedback_fn("    Communication failure in hooks execution: %s" %
1247
                        msg)
1248
            lu_result = 1
1249
            continue
1250
          for script, hkr, output in res.payload:
1251
            if hkr == constants.HKR_FAIL:
1252
              # The node header is only shown once, if there are
1253
              # failing hooks on that node
1254
              if show_node_header:
1255
                feedback_fn("  Node %s:" % node_name)
1256
                show_node_header = False
1257
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1258
              output = indent_re.sub('      ', output)
1259
              feedback_fn("%s" % output)
1260
              lu_result = 1
1261

    
1262
      return lu_result
1263

    
1264

    
1265
class LUVerifyDisks(NoHooksLU):
1266
  """Verifies the cluster disks status.
1267

1268
  """
1269
  _OP_REQP = []
1270
  REQ_BGL = False
1271

    
1272
  def ExpandNames(self):
1273
    self.needed_locks = {
1274
      locking.LEVEL_NODE: locking.ALL_SET,
1275
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1276
    }
1277
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1278

    
1279
  def CheckPrereq(self):
1280
    """Check prerequisites.
1281

1282
    This has no prerequisites.
1283

1284
    """
1285
    pass
1286

    
1287
  def Exec(self, feedback_fn):
1288
    """Verify integrity of cluster disks.
1289

1290
    @rtype: tuple of three items
1291
    @return: a tuple of (dict of node-to-node_error, list of instances
1292
        which need activate-disks, dict of instance: (node, volume) for
1293
        missing volumes
1294

1295
    """
1296
    result = res_nodes, res_instances, res_missing = {}, [], {}
1297

    
1298
    vg_name = self.cfg.GetVGName()
1299
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1300
    instances = [self.cfg.GetInstanceInfo(name)
1301
                 for name in self.cfg.GetInstanceList()]
1302

    
1303
    nv_dict = {}
1304
    for inst in instances:
1305
      inst_lvs = {}
1306
      if (not inst.admin_up or
1307
          inst.disk_template not in constants.DTS_NET_MIRROR):
1308
        continue
1309
      inst.MapLVsByNode(inst_lvs)
1310
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1311
      for node, vol_list in inst_lvs.iteritems():
1312
        for vol in vol_list:
1313
          nv_dict[(node, vol)] = inst
1314

    
1315
    if not nv_dict:
1316
      return result
1317

    
1318
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1319

    
1320
    to_act = set()
1321
    for node in nodes:
1322
      # node_volume
1323
      node_res = node_lvs[node]
1324
      if node_res.offline:
1325
        continue
1326
      msg = node_res.fail_msg
1327
      if msg:
1328
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1329
        res_nodes[node] = msg
1330
        continue
1331

    
1332
      lvs = node_res.payload
1333
      for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1334
        inst = nv_dict.pop((node, lv_name), None)
1335
        if (not lv_online and inst is not None
1336
            and inst.name not in res_instances):
1337
          res_instances.append(inst.name)
1338

    
1339
    # any leftover items in nv_dict are missing LVs, let's arrange the
1340
    # data better
1341
    for key, inst in nv_dict.iteritems():
1342
      if inst.name not in res_missing:
1343
        res_missing[inst.name] = []
1344
      res_missing[inst.name].append(key)
1345

    
1346
    return result
1347

    
1348

    
1349
class LURenameCluster(LogicalUnit):
1350
  """Rename the cluster.
1351

1352
  """
1353
  HPATH = "cluster-rename"
1354
  HTYPE = constants.HTYPE_CLUSTER
1355
  _OP_REQP = ["name"]
1356

    
1357
  def BuildHooksEnv(self):
1358
    """Build hooks env.
1359

1360
    """
1361
    env = {
1362
      "OP_TARGET": self.cfg.GetClusterName(),
1363
      "NEW_NAME": self.op.name,
1364
      }
1365
    mn = self.cfg.GetMasterNode()
1366
    return env, [mn], [mn]
1367

    
1368
  def CheckPrereq(self):
1369
    """Verify that the passed name is a valid one.
1370

1371
    """
1372
    hostname = utils.HostInfo(self.op.name)
1373

    
1374
    new_name = hostname.name
1375
    self.ip = new_ip = hostname.ip
1376
    old_name = self.cfg.GetClusterName()
1377
    old_ip = self.cfg.GetMasterIP()
1378
    if new_name == old_name and new_ip == old_ip:
1379
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1380
                                 " cluster has changed")
1381
    if new_ip != old_ip:
1382
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1383
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1384
                                   " reachable on the network. Aborting." %
1385
                                   new_ip)
1386

    
1387
    self.op.name = new_name
1388

    
1389
  def Exec(self, feedback_fn):
1390
    """Rename the cluster.
1391

1392
    """
1393
    clustername = self.op.name
1394
    ip = self.ip
1395

    
1396
    # shutdown the master IP
1397
    master = self.cfg.GetMasterNode()
1398
    result = self.rpc.call_node_stop_master(master, False)
1399
    result.Raise("Could not disable the master role")
1400

    
1401
    try:
1402
      cluster = self.cfg.GetClusterInfo()
1403
      cluster.cluster_name = clustername
1404
      cluster.master_ip = ip
1405
      self.cfg.Update(cluster)
1406

    
1407
      # update the known hosts file
1408
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1409
      node_list = self.cfg.GetNodeList()
1410
      try:
1411
        node_list.remove(master)
1412
      except ValueError:
1413
        pass
1414
      result = self.rpc.call_upload_file(node_list,
1415
                                         constants.SSH_KNOWN_HOSTS_FILE)
1416
      for to_node, to_result in result.iteritems():
1417
         msg = to_result.fail_msg
1418
         if msg:
1419
           msg = ("Copy of file %s to node %s failed: %s" %
1420
                   (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1421
           self.proc.LogWarning(msg)
1422

    
1423
    finally:
1424
      result = self.rpc.call_node_start_master(master, False)
1425
      msg = result.fail_msg
1426
      if msg:
1427
        self.LogWarning("Could not re-enable the master role on"
1428
                        " the master, please restart manually: %s", msg)
1429

    
1430

    
1431
def _RecursiveCheckIfLVMBased(disk):
1432
  """Check if the given disk or its children are lvm-based.
1433

1434
  @type disk: L{objects.Disk}
1435
  @param disk: the disk to check
1436
  @rtype: booleean
1437
  @return: boolean indicating whether a LD_LV dev_type was found or not
1438

1439
  """
1440
  if disk.children:
1441
    for chdisk in disk.children:
1442
      if _RecursiveCheckIfLVMBased(chdisk):
1443
        return True
1444
  return disk.dev_type == constants.LD_LV
1445

    
1446

    
1447
class LUSetClusterParams(LogicalUnit):
1448
  """Change the parameters of the cluster.
1449

1450
  """
1451
  HPATH = "cluster-modify"
1452
  HTYPE = constants.HTYPE_CLUSTER
1453
  _OP_REQP = []
1454
  REQ_BGL = False
1455

    
1456
  def CheckArguments(self):
1457
    """Check parameters
1458

1459
    """
1460
    if not hasattr(self.op, "candidate_pool_size"):
1461
      self.op.candidate_pool_size = None
1462
    if self.op.candidate_pool_size is not None:
1463
      try:
1464
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1465
      except (ValueError, TypeError), err:
1466
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1467
                                   str(err))
1468
      if self.op.candidate_pool_size < 1:
1469
        raise errors.OpPrereqError("At least one master candidate needed")
1470

    
1471
  def ExpandNames(self):
1472
    # FIXME: in the future maybe other cluster params won't require checking on
1473
    # all nodes to be modified.
1474
    self.needed_locks = {
1475
      locking.LEVEL_NODE: locking.ALL_SET,
1476
    }
1477
    self.share_locks[locking.LEVEL_NODE] = 1
1478

    
1479
  def BuildHooksEnv(self):
1480
    """Build hooks env.
1481

1482
    """
1483
    env = {
1484
      "OP_TARGET": self.cfg.GetClusterName(),
1485
      "NEW_VG_NAME": self.op.vg_name,
1486
      }
1487
    mn = self.cfg.GetMasterNode()
1488
    return env, [mn], [mn]
1489

    
1490
  def CheckPrereq(self):
1491
    """Check prerequisites.
1492

1493
    This checks whether the given params don't conflict and
1494
    if the given volume group is valid.
1495

1496
    """
1497
    if self.op.vg_name is not None and not self.op.vg_name:
1498
      instances = self.cfg.GetAllInstancesInfo().values()
1499
      for inst in instances:
1500
        for disk in inst.disks:
1501
          if _RecursiveCheckIfLVMBased(disk):
1502
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1503
                                       " lvm-based instances exist")
1504

    
1505
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1506

    
1507
    # if vg_name not None, checks given volume group on all nodes
1508
    if self.op.vg_name:
1509
      vglist = self.rpc.call_vg_list(node_list)
1510
      for node in node_list:
1511
        msg = vglist[node].fail_msg
1512
        if msg:
1513
          # ignoring down node
1514
          self.LogWarning("Error while gathering data on node %s"
1515
                          " (ignoring node): %s", node, msg)
1516
          continue
1517
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1518
                                              self.op.vg_name,
1519
                                              constants.MIN_VG_SIZE)
1520
        if vgstatus:
1521
          raise errors.OpPrereqError("Error on node '%s': %s" %
1522
                                     (node, vgstatus))
1523

    
1524
    self.cluster = cluster = self.cfg.GetClusterInfo()
1525
    # validate params changes
1526
    if self.op.beparams:
1527
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1528
      self.new_beparams = objects.FillDict(
1529
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1530

    
1531
    if self.op.nicparams:
1532
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1533
      self.new_nicparams = objects.FillDict(
1534
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1535
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1536

    
1537
    # hypervisor list/parameters
1538
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1539
    if self.op.hvparams:
1540
      if not isinstance(self.op.hvparams, dict):
1541
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1542
      for hv_name, hv_dict in self.op.hvparams.items():
1543
        if hv_name not in self.new_hvparams:
1544
          self.new_hvparams[hv_name] = hv_dict
1545
        else:
1546
          self.new_hvparams[hv_name].update(hv_dict)
1547

    
1548
    if self.op.enabled_hypervisors is not None:
1549
      self.hv_list = self.op.enabled_hypervisors
1550
    else:
1551
      self.hv_list = cluster.enabled_hypervisors
1552

    
1553
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1554
      # either the enabled list has changed, or the parameters have, validate
1555
      for hv_name, hv_params in self.new_hvparams.items():
1556
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1557
            (self.op.enabled_hypervisors and
1558
             hv_name in self.op.enabled_hypervisors)):
1559
          # either this is a new hypervisor, or its parameters have changed
1560
          hv_class = hypervisor.GetHypervisor(hv_name)
1561
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1562
          hv_class.CheckParameterSyntax(hv_params)
1563
          _CheckHVParams(self, node_list, hv_name, hv_params)
1564

    
1565
  def Exec(self, feedback_fn):
1566
    """Change the parameters of the cluster.
1567

1568
    """
1569
    if self.op.vg_name is not None:
1570
      new_volume = self.op.vg_name
1571
      if not new_volume:
1572
        new_volume = None
1573
      if new_volume != self.cfg.GetVGName():
1574
        self.cfg.SetVGName(new_volume)
1575
      else:
1576
        feedback_fn("Cluster LVM configuration already in desired"
1577
                    " state, not changing")
1578
    if self.op.hvparams:
1579
      self.cluster.hvparams = self.new_hvparams
1580
    if self.op.enabled_hypervisors is not None:
1581
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1582
    if self.op.beparams:
1583
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1584
    if self.op.nicparams:
1585
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1586

    
1587
    if self.op.candidate_pool_size is not None:
1588
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1589

    
1590
    self.cfg.Update(self.cluster)
1591

    
1592
    # we want to update nodes after the cluster so that if any errors
1593
    # happen, we have recorded and saved the cluster info
1594
    if self.op.candidate_pool_size is not None:
1595
      _AdjustCandidatePool(self)
1596

    
1597

    
1598
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1599
  """Distribute additional files which are part of the cluster configuration.
1600

1601
  ConfigWriter takes care of distributing the config and ssconf files, but
1602
  there are more files which should be distributed to all nodes. This function
1603
  makes sure those are copied.
1604

1605
  @param lu: calling logical unit
1606
  @param additional_nodes: list of nodes not in the config to distribute to
1607

1608
  """
1609
  # 1. Gather target nodes
1610
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1611
  dist_nodes = lu.cfg.GetNodeList()
1612
  if additional_nodes is not None:
1613
    dist_nodes.extend(additional_nodes)
1614
  if myself.name in dist_nodes:
1615
    dist_nodes.remove(myself.name)
1616
  # 2. Gather files to distribute
1617
  dist_files = set([constants.ETC_HOSTS,
1618
                    constants.SSH_KNOWN_HOSTS_FILE,
1619
                    constants.RAPI_CERT_FILE,
1620
                    constants.RAPI_USERS_FILE,
1621
                   ])
1622

    
1623
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1624
  for hv_name in enabled_hypervisors:
1625
    hv_class = hypervisor.GetHypervisor(hv_name)
1626
    dist_files.update(hv_class.GetAncillaryFiles())
1627

    
1628
  # 3. Perform the files upload
1629
  for fname in dist_files:
1630
    if os.path.exists(fname):
1631
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1632
      for to_node, to_result in result.items():
1633
         msg = to_result.fail_msg
1634
         if msg:
1635
           msg = ("Copy of file %s to node %s failed: %s" %
1636
                   (fname, to_node, msg))
1637
           lu.proc.LogWarning(msg)
1638

    
1639

    
1640
class LURedistributeConfig(NoHooksLU):
1641
  """Force the redistribution of cluster configuration.
1642

1643
  This is a very simple LU.
1644

1645
  """
1646
  _OP_REQP = []
1647
  REQ_BGL = False
1648

    
1649
  def ExpandNames(self):
1650
    self.needed_locks = {
1651
      locking.LEVEL_NODE: locking.ALL_SET,
1652
    }
1653
    self.share_locks[locking.LEVEL_NODE] = 1
1654

    
1655
  def CheckPrereq(self):
1656
    """Check prerequisites.
1657

1658
    """
1659

    
1660
  def Exec(self, feedback_fn):
1661
    """Redistribute the configuration.
1662

1663
    """
1664
    self.cfg.Update(self.cfg.GetClusterInfo())
1665
    _RedistributeAncillaryFiles(self)
1666

    
1667

    
1668
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1669
  """Sleep and poll for an instance's disk to sync.
1670

1671
  """
1672
  if not instance.disks:
1673
    return True
1674

    
1675
  if not oneshot:
1676
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1677

    
1678
  node = instance.primary_node
1679

    
1680
  for dev in instance.disks:
1681
    lu.cfg.SetDiskID(dev, node)
1682

    
1683
  retries = 0
1684
  while True:
1685
    max_time = 0
1686
    done = True
1687
    cumul_degraded = False
1688
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1689
    msg = rstats.fail_msg
1690
    if msg:
1691
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1692
      retries += 1
1693
      if retries >= 10:
1694
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1695
                                 " aborting." % node)
1696
      time.sleep(6)
1697
      continue
1698
    rstats = rstats.payload
1699
    retries = 0
1700
    for i, mstat in enumerate(rstats):
1701
      if mstat is None:
1702
        lu.LogWarning("Can't compute data for node %s/%s",
1703
                           node, instance.disks[i].iv_name)
1704
        continue
1705
      # we ignore the ldisk parameter
1706
      perc_done, est_time, is_degraded, _ = mstat
1707
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1708
      if perc_done is not None:
1709
        done = False
1710
        if est_time is not None:
1711
          rem_time = "%d estimated seconds remaining" % est_time
1712
          max_time = est_time
1713
        else:
1714
          rem_time = "no time estimate"
1715
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1716
                        (instance.disks[i].iv_name, perc_done, rem_time))
1717
    if done or oneshot:
1718
      break
1719

    
1720
    time.sleep(min(60, max_time))
1721

    
1722
  if done:
1723
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1724
  return not cumul_degraded
1725

    
1726

    
1727
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1728
  """Check that mirrors are not degraded.
1729

1730
  The ldisk parameter, if True, will change the test from the
1731
  is_degraded attribute (which represents overall non-ok status for
1732
  the device(s)) to the ldisk (representing the local storage status).
1733

1734
  """
1735
  lu.cfg.SetDiskID(dev, node)
1736
  if ldisk:
1737
    idx = 6
1738
  else:
1739
    idx = 5
1740

    
1741
  result = True
1742
  if on_primary or dev.AssembleOnSecondary():
1743
    rstats = lu.rpc.call_blockdev_find(node, dev)
1744
    msg = rstats.fail_msg
1745
    if msg:
1746
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1747
      result = False
1748
    elif not rstats.payload:
1749
      lu.LogWarning("Can't find disk on node %s", node)
1750
      result = False
1751
    else:
1752
      result = result and (not rstats.payload[idx])
1753
  if dev.children:
1754
    for child in dev.children:
1755
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1756

    
1757
  return result
1758

    
1759

    
1760
class LUDiagnoseOS(NoHooksLU):
1761
  """Logical unit for OS diagnose/query.
1762

1763
  """
1764
  _OP_REQP = ["output_fields", "names"]
1765
  REQ_BGL = False
1766
  _FIELDS_STATIC = utils.FieldSet()
1767
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1768

    
1769
  def ExpandNames(self):
1770
    if self.op.names:
1771
      raise errors.OpPrereqError("Selective OS query not supported")
1772

    
1773
    _CheckOutputFields(static=self._FIELDS_STATIC,
1774
                       dynamic=self._FIELDS_DYNAMIC,
1775
                       selected=self.op.output_fields)
1776

    
1777
    # Lock all nodes, in shared mode
1778
    # Temporary removal of locks, should be reverted later
1779
    # TODO: reintroduce locks when they are lighter-weight
1780
    self.needed_locks = {}
1781
    #self.share_locks[locking.LEVEL_NODE] = 1
1782
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1783

    
1784
  def CheckPrereq(self):
1785
    """Check prerequisites.
1786

1787
    """
1788

    
1789
  @staticmethod
1790
  def _DiagnoseByOS(node_list, rlist):
1791
    """Remaps a per-node return list into an a per-os per-node dictionary
1792

1793
    @param node_list: a list with the names of all nodes
1794
    @param rlist: a map with node names as keys and OS objects as values
1795

1796
    @rtype: dict
1797
    @return: a dictionary with osnames as keys and as value another map, with
1798
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
1799

1800
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1801
                                     (/srv/..., False, "invalid api")],
1802
                           "node2": [(/srv/..., True, "")]}
1803
          }
1804

1805
    """
1806
    all_os = {}
1807
    # we build here the list of nodes that didn't fail the RPC (at RPC
1808
    # level), so that nodes with a non-responding node daemon don't
1809
    # make all OSes invalid
1810
    good_nodes = [node_name for node_name in rlist
1811
                  if not rlist[node_name].fail_msg]
1812
    for node_name, nr in rlist.items():
1813
      if nr.fail_msg or not nr.payload:
1814
        continue
1815
      for name, path, status, diagnose in nr.payload:
1816
        if name not in all_os:
1817
          # build a list of nodes for this os containing empty lists
1818
          # for each node in node_list
1819
          all_os[name] = {}
1820
          for nname in good_nodes:
1821
            all_os[name][nname] = []
1822
        all_os[name][node_name].append((path, status, diagnose))
1823
    return all_os
1824

    
1825
  def Exec(self, feedback_fn):
1826
    """Compute the list of OSes.
1827

1828
    """
1829
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1830
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1831
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1832
    output = []
1833
    for os_name, os_data in pol.items():
1834
      row = []
1835
      for field in self.op.output_fields:
1836
        if field == "name":
1837
          val = os_name
1838
        elif field == "valid":
1839
          val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1840
        elif field == "node_status":
1841
          # this is just a copy of the dict
1842
          val = {}
1843
          for node_name, nos_list in os_data.items():
1844
            val[node_name] = nos_list
1845
        else:
1846
          raise errors.ParameterError(field)
1847
        row.append(val)
1848
      output.append(row)
1849

    
1850
    return output
1851

    
1852

    
1853
class LURemoveNode(LogicalUnit):
1854
  """Logical unit for removing a node.
1855

1856
  """
1857
  HPATH = "node-remove"
1858
  HTYPE = constants.HTYPE_NODE
1859
  _OP_REQP = ["node_name"]
1860

    
1861
  def BuildHooksEnv(self):
1862
    """Build hooks env.
1863

1864
    This doesn't run on the target node in the pre phase as a failed
1865
    node would then be impossible to remove.
1866

1867
    """
1868
    env = {
1869
      "OP_TARGET": self.op.node_name,
1870
      "NODE_NAME": self.op.node_name,
1871
      }
1872
    all_nodes = self.cfg.GetNodeList()
1873
    all_nodes.remove(self.op.node_name)
1874
    return env, all_nodes, all_nodes
1875

    
1876
  def CheckPrereq(self):
1877
    """Check prerequisites.
1878

1879
    This checks:
1880
     - the node exists in the configuration
1881
     - it does not have primary or secondary instances
1882
     - it's not the master
1883

1884
    Any errors are signalled by raising errors.OpPrereqError.
1885

1886
    """
1887
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1888
    if node is None:
1889
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1890

    
1891
    instance_list = self.cfg.GetInstanceList()
1892

    
1893
    masternode = self.cfg.GetMasterNode()
1894
    if node.name == masternode:
1895
      raise errors.OpPrereqError("Node is the master node,"
1896
                                 " you need to failover first.")
1897

    
1898
    for instance_name in instance_list:
1899
      instance = self.cfg.GetInstanceInfo(instance_name)
1900
      if node.name in instance.all_nodes:
1901
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1902
                                   " please remove first." % instance_name)
1903
    self.op.node_name = node.name
1904
    self.node = node
1905

    
1906
  def Exec(self, feedback_fn):
1907
    """Removes the node from the cluster.
1908

1909
    """
1910
    node = self.node
1911
    logging.info("Stopping the node daemon and removing configs from node %s",
1912
                 node.name)
1913

    
1914
    self.context.RemoveNode(node.name)
1915

    
1916
    result = self.rpc.call_node_leave_cluster(node.name)
1917
    msg = result.fail_msg
1918
    if msg:
1919
      self.LogWarning("Errors encountered on the remote node while leaving"
1920
                      " the cluster: %s", msg)
1921

    
1922
    # Promote nodes to master candidate as needed
1923
    _AdjustCandidatePool(self)
1924

    
1925

    
1926
class LUQueryNodes(NoHooksLU):
1927
  """Logical unit for querying nodes.
1928

1929
  """
1930
  _OP_REQP = ["output_fields", "names", "use_locking"]
1931
  REQ_BGL = False
1932
  _FIELDS_DYNAMIC = utils.FieldSet(
1933
    "dtotal", "dfree",
1934
    "mtotal", "mnode", "mfree",
1935
    "bootid",
1936
    "ctotal", "cnodes", "csockets",
1937
    )
1938

    
1939
  _FIELDS_STATIC = utils.FieldSet(
1940
    "name", "pinst_cnt", "sinst_cnt",
1941
    "pinst_list", "sinst_list",
1942
    "pip", "sip", "tags",
1943
    "serial_no",
1944
    "master_candidate",
1945
    "master",
1946
    "offline",
1947
    "drained",
1948
    )
1949

    
1950
  def ExpandNames(self):
1951
    _CheckOutputFields(static=self._FIELDS_STATIC,
1952
                       dynamic=self._FIELDS_DYNAMIC,
1953
                       selected=self.op.output_fields)
1954

    
1955
    self.needed_locks = {}
1956
    self.share_locks[locking.LEVEL_NODE] = 1
1957

    
1958
    if self.op.names:
1959
      self.wanted = _GetWantedNodes(self, self.op.names)
1960
    else:
1961
      self.wanted = locking.ALL_SET
1962

    
1963
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1964
    self.do_locking = self.do_node_query and self.op.use_locking
1965
    if self.do_locking:
1966
      # if we don't request only static fields, we need to lock the nodes
1967
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1968

    
1969

    
1970
  def CheckPrereq(self):
1971
    """Check prerequisites.
1972

1973
    """
1974
    # The validation of the node list is done in the _GetWantedNodes,
1975
    # if non empty, and if empty, there's no validation to do
1976
    pass
1977

    
1978
  def Exec(self, feedback_fn):
1979
    """Computes the list of nodes and their attributes.
1980

1981
    """
1982
    all_info = self.cfg.GetAllNodesInfo()
1983
    if self.do_locking:
1984
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1985
    elif self.wanted != locking.ALL_SET:
1986
      nodenames = self.wanted
1987
      missing = set(nodenames).difference(all_info.keys())
1988
      if missing:
1989
        raise errors.OpExecError(
1990
          "Some nodes were removed before retrieving their data: %s" % missing)
1991
    else:
1992
      nodenames = all_info.keys()
1993

    
1994
    nodenames = utils.NiceSort(nodenames)
1995
    nodelist = [all_info[name] for name in nodenames]
1996

    
1997
    # begin data gathering
1998

    
1999
    if self.do_node_query:
2000
      live_data = {}
2001
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2002
                                          self.cfg.GetHypervisorType())
2003
      for name in nodenames:
2004
        nodeinfo = node_data[name]
2005
        if not nodeinfo.fail_msg and nodeinfo.payload:
2006
          nodeinfo = nodeinfo.payload
2007
          fn = utils.TryConvert
2008
          live_data[name] = {
2009
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2010
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2011
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2012
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2013
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2014
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2015
            "bootid": nodeinfo.get('bootid', None),
2016
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2017
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2018
            }
2019
        else:
2020
          live_data[name] = {}
2021
    else:
2022
      live_data = dict.fromkeys(nodenames, {})
2023

    
2024
    node_to_primary = dict([(name, set()) for name in nodenames])
2025
    node_to_secondary = dict([(name, set()) for name in nodenames])
2026

    
2027
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2028
                             "sinst_cnt", "sinst_list"))
2029
    if inst_fields & frozenset(self.op.output_fields):
2030
      instancelist = self.cfg.GetInstanceList()
2031

    
2032
      for instance_name in instancelist:
2033
        inst = self.cfg.GetInstanceInfo(instance_name)
2034
        if inst.primary_node in node_to_primary:
2035
          node_to_primary[inst.primary_node].add(inst.name)
2036
        for secnode in inst.secondary_nodes:
2037
          if secnode in node_to_secondary:
2038
            node_to_secondary[secnode].add(inst.name)
2039

    
2040
    master_node = self.cfg.GetMasterNode()
2041

    
2042
    # end data gathering
2043

    
2044
    output = []
2045
    for node in nodelist:
2046
      node_output = []
2047
      for field in self.op.output_fields:
2048
        if field == "name":
2049
          val = node.name
2050
        elif field == "pinst_list":
2051
          val = list(node_to_primary[node.name])
2052
        elif field == "sinst_list":
2053
          val = list(node_to_secondary[node.name])
2054
        elif field == "pinst_cnt":
2055
          val = len(node_to_primary[node.name])
2056
        elif field == "sinst_cnt":
2057
          val = len(node_to_secondary[node.name])
2058
        elif field == "pip":
2059
          val = node.primary_ip
2060
        elif field == "sip":
2061
          val = node.secondary_ip
2062
        elif field == "tags":
2063
          val = list(node.GetTags())
2064
        elif field == "serial_no":
2065
          val = node.serial_no
2066
        elif field == "master_candidate":
2067
          val = node.master_candidate
2068
        elif field == "master":
2069
          val = node.name == master_node
2070
        elif field == "offline":
2071
          val = node.offline
2072
        elif field == "drained":
2073
          val = node.drained
2074
        elif self._FIELDS_DYNAMIC.Matches(field):
2075
          val = live_data[node.name].get(field, None)
2076
        else:
2077
          raise errors.ParameterError(field)
2078
        node_output.append(val)
2079
      output.append(node_output)
2080

    
2081
    return output
2082

    
2083

    
2084
class LUQueryNodeVolumes(NoHooksLU):
2085
  """Logical unit for getting volumes on node(s).
2086

2087
  """
2088
  _OP_REQP = ["nodes", "output_fields"]
2089
  REQ_BGL = False
2090
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2091
  _FIELDS_STATIC = utils.FieldSet("node")
2092

    
2093
  def ExpandNames(self):
2094
    _CheckOutputFields(static=self._FIELDS_STATIC,
2095
                       dynamic=self._FIELDS_DYNAMIC,
2096
                       selected=self.op.output_fields)
2097

    
2098
    self.needed_locks = {}
2099
    self.share_locks[locking.LEVEL_NODE] = 1
2100
    if not self.op.nodes:
2101
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2102
    else:
2103
      self.needed_locks[locking.LEVEL_NODE] = \
2104
        _GetWantedNodes(self, self.op.nodes)
2105

    
2106
  def CheckPrereq(self):
2107
    """Check prerequisites.
2108

2109
    This checks that the fields required are valid output fields.
2110

2111
    """
2112
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2113

    
2114
  def Exec(self, feedback_fn):
2115
    """Computes the list of nodes and their attributes.
2116

2117
    """
2118
    nodenames = self.nodes
2119
    volumes = self.rpc.call_node_volumes(nodenames)
2120

    
2121
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2122
             in self.cfg.GetInstanceList()]
2123

    
2124
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2125

    
2126
    output = []
2127
    for node in nodenames:
2128
      nresult = volumes[node]
2129
      if nresult.offline:
2130
        continue
2131
      msg = nresult.fail_msg
2132
      if msg:
2133
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2134
        continue
2135

    
2136
      node_vols = nresult.payload[:]
2137
      node_vols.sort(key=lambda vol: vol['dev'])
2138

    
2139
      for vol in node_vols:
2140
        node_output = []
2141
        for field in self.op.output_fields:
2142
          if field == "node":
2143
            val = node
2144
          elif field == "phys":
2145
            val = vol['dev']
2146
          elif field == "vg":
2147
            val = vol['vg']
2148
          elif field == "name":
2149
            val = vol['name']
2150
          elif field == "size":
2151
            val = int(float(vol['size']))
2152
          elif field == "instance":
2153
            for inst in ilist:
2154
              if node not in lv_by_node[inst]:
2155
                continue
2156
              if vol['name'] in lv_by_node[inst][node]:
2157
                val = inst.name
2158
                break
2159
            else:
2160
              val = '-'
2161
          else:
2162
            raise errors.ParameterError(field)
2163
          node_output.append(str(val))
2164

    
2165
        output.append(node_output)
2166

    
2167
    return output
2168

    
2169

    
2170
class LUAddNode(LogicalUnit):
2171
  """Logical unit for adding node to the cluster.
2172

2173
  """
2174
  HPATH = "node-add"
2175
  HTYPE = constants.HTYPE_NODE
2176
  _OP_REQP = ["node_name"]
2177

    
2178
  def BuildHooksEnv(self):
2179
    """Build hooks env.
2180

2181
    This will run on all nodes before, and on all nodes + the new node after.
2182

2183
    """
2184
    env = {
2185
      "OP_TARGET": self.op.node_name,
2186
      "NODE_NAME": self.op.node_name,
2187
      "NODE_PIP": self.op.primary_ip,
2188
      "NODE_SIP": self.op.secondary_ip,
2189
      }
2190
    nodes_0 = self.cfg.GetNodeList()
2191
    nodes_1 = nodes_0 + [self.op.node_name, ]
2192
    return env, nodes_0, nodes_1
2193

    
2194
  def CheckPrereq(self):
2195
    """Check prerequisites.
2196

2197
    This checks:
2198
     - the new node is not already in the config
2199
     - it is resolvable
2200
     - its parameters (single/dual homed) matches the cluster
2201

2202
    Any errors are signalled by raising errors.OpPrereqError.
2203

2204
    """
2205
    node_name = self.op.node_name
2206
    cfg = self.cfg
2207

    
2208
    dns_data = utils.HostInfo(node_name)
2209

    
2210
    node = dns_data.name
2211
    primary_ip = self.op.primary_ip = dns_data.ip
2212
    secondary_ip = getattr(self.op, "secondary_ip", None)
2213
    if secondary_ip is None:
2214
      secondary_ip = primary_ip
2215
    if not utils.IsValidIP(secondary_ip):
2216
      raise errors.OpPrereqError("Invalid secondary IP given")
2217
    self.op.secondary_ip = secondary_ip
2218

    
2219
    node_list = cfg.GetNodeList()
2220
    if not self.op.readd and node in node_list:
2221
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2222
                                 node)
2223
    elif self.op.readd and node not in node_list:
2224
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2225

    
2226
    for existing_node_name in node_list:
2227
      existing_node = cfg.GetNodeInfo(existing_node_name)
2228

    
2229
      if self.op.readd and node == existing_node_name:
2230
        if (existing_node.primary_ip != primary_ip or
2231
            existing_node.secondary_ip != secondary_ip):
2232
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2233
                                     " address configuration as before")
2234
        continue
2235

    
2236
      if (existing_node.primary_ip == primary_ip or
2237
          existing_node.secondary_ip == primary_ip or
2238
          existing_node.primary_ip == secondary_ip or
2239
          existing_node.secondary_ip == secondary_ip):
2240
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2241
                                   " existing node %s" % existing_node.name)
2242

    
2243
    # check that the type of the node (single versus dual homed) is the
2244
    # same as for the master
2245
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2246
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2247
    newbie_singlehomed = secondary_ip == primary_ip
2248
    if master_singlehomed != newbie_singlehomed:
2249
      if master_singlehomed:
2250
        raise errors.OpPrereqError("The master has no private ip but the"
2251
                                   " new node has one")
2252
      else:
2253
        raise errors.OpPrereqError("The master has a private ip but the"
2254
                                   " new node doesn't have one")
2255

    
2256
    # checks reachablity
2257
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2258
      raise errors.OpPrereqError("Node not reachable by ping")
2259

    
2260
    if not newbie_singlehomed:
2261
      # check reachability from my secondary ip to newbie's secondary ip
2262
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2263
                           source=myself.secondary_ip):
2264
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2265
                                   " based ping to noded port")
2266

    
2267
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2268
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2269
    master_candidate = mc_now < cp_size
2270

    
2271
    self.new_node = objects.Node(name=node,
2272
                                 primary_ip=primary_ip,
2273
                                 secondary_ip=secondary_ip,
2274
                                 master_candidate=master_candidate,
2275
                                 offline=False, drained=False)
2276

    
2277
  def Exec(self, feedback_fn):
2278
    """Adds the new node to the cluster.
2279

2280
    """
2281
    new_node = self.new_node
2282
    node = new_node.name
2283

    
2284
    # check connectivity
2285
    result = self.rpc.call_version([node])[node]
2286
    result.Raise("Can't get version information from node %s" % node)
2287
    if constants.PROTOCOL_VERSION == result.payload:
2288
      logging.info("Communication to node %s fine, sw version %s match",
2289
                   node, result.payload)
2290
    else:
2291
      raise errors.OpExecError("Version mismatch master version %s,"
2292
                               " node version %s" %
2293
                               (constants.PROTOCOL_VERSION, result.payload))
2294

    
2295
    # setup ssh on node
2296
    logging.info("Copy ssh key to node %s", node)
2297
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2298
    keyarray = []
2299
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2300
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2301
                priv_key, pub_key]
2302

    
2303
    for i in keyfiles:
2304
      f = open(i, 'r')
2305
      try:
2306
        keyarray.append(f.read())
2307
      finally:
2308
        f.close()
2309

    
2310
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2311
                                    keyarray[2],
2312
                                    keyarray[3], keyarray[4], keyarray[5])
2313
    result.Raise("Cannot transfer ssh keys to the new node")
2314

    
2315
    # Add node to our /etc/hosts, and add key to known_hosts
2316
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2317
      utils.AddHostToEtcHosts(new_node.name)
2318

    
2319
    if new_node.secondary_ip != new_node.primary_ip:
2320
      result = self.rpc.call_node_has_ip_address(new_node.name,
2321
                                                 new_node.secondary_ip)
2322
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2323
                   prereq=True)
2324
      if not result.payload:
2325
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2326
                                 " you gave (%s). Please fix and re-run this"
2327
                                 " command." % new_node.secondary_ip)
2328

    
2329
    node_verify_list = [self.cfg.GetMasterNode()]
2330
    node_verify_param = {
2331
      'nodelist': [node],
2332
      # TODO: do a node-net-test as well?
2333
    }
2334

    
2335
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2336
                                       self.cfg.GetClusterName())
2337
    for verifier in node_verify_list:
2338
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2339
      nl_payload = result[verifier].payload['nodelist']
2340
      if nl_payload:
2341
        for failed in nl_payload:
2342
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2343
                      (verifier, nl_payload[failed]))
2344
        raise errors.OpExecError("ssh/hostname verification failed.")
2345

    
2346
    if self.op.readd:
2347
      _RedistributeAncillaryFiles(self)
2348
      self.context.ReaddNode(new_node)
2349
    else:
2350
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2351
      self.context.AddNode(new_node)
2352

    
2353

    
2354
class LUSetNodeParams(LogicalUnit):
2355
  """Modifies the parameters of a node.
2356

2357
  """
2358
  HPATH = "node-modify"
2359
  HTYPE = constants.HTYPE_NODE
2360
  _OP_REQP = ["node_name"]
2361
  REQ_BGL = False
2362

    
2363
  def CheckArguments(self):
2364
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2365
    if node_name is None:
2366
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2367
    self.op.node_name = node_name
2368
    _CheckBooleanOpField(self.op, 'master_candidate')
2369
    _CheckBooleanOpField(self.op, 'offline')
2370
    _CheckBooleanOpField(self.op, 'drained')
2371
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2372
    if all_mods.count(None) == 3:
2373
      raise errors.OpPrereqError("Please pass at least one modification")
2374
    if all_mods.count(True) > 1:
2375
      raise errors.OpPrereqError("Can't set the node into more than one"
2376
                                 " state at the same time")
2377

    
2378
  def ExpandNames(self):
2379
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2380

    
2381
  def BuildHooksEnv(self):
2382
    """Build hooks env.
2383

2384
    This runs on the master node.
2385

2386
    """
2387
    env = {
2388
      "OP_TARGET": self.op.node_name,
2389
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2390
      "OFFLINE": str(self.op.offline),
2391
      "DRAINED": str(self.op.drained),
2392
      }
2393
    nl = [self.cfg.GetMasterNode(),
2394
          self.op.node_name]
2395
    return env, nl, nl
2396

    
2397
  def CheckPrereq(self):
2398
    """Check prerequisites.
2399

2400
    This only checks the instance list against the existing names.
2401

2402
    """
2403
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2404

    
2405
    if ((self.op.master_candidate == False or self.op.offline == True or
2406
         self.op.drained == True) and node.master_candidate):
2407
      # we will demote the node from master_candidate
2408
      if self.op.node_name == self.cfg.GetMasterNode():
2409
        raise errors.OpPrereqError("The master node has to be a"
2410
                                   " master candidate, online and not drained")
2411
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2412
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2413
      if num_candidates <= cp_size:
2414
        msg = ("Not enough master candidates (desired"
2415
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2416
        if self.op.force:
2417
          self.LogWarning(msg)
2418
        else:
2419
          raise errors.OpPrereqError(msg)
2420

    
2421
    if (self.op.master_candidate == True and
2422
        ((node.offline and not self.op.offline == False) or
2423
         (node.drained and not self.op.drained == False))):
2424
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2425
                                 " to master_candidate" % node.name)
2426

    
2427
    return
2428

    
2429
  def Exec(self, feedback_fn):
2430
    """Modifies a node.
2431

2432
    """
2433
    node = self.node
2434

    
2435
    result = []
2436
    changed_mc = False
2437

    
2438
    if self.op.offline is not None:
2439
      node.offline = self.op.offline
2440
      result.append(("offline", str(self.op.offline)))
2441
      if self.op.offline == True:
2442
        if node.master_candidate:
2443
          node.master_candidate = False
2444
          changed_mc = True
2445
          result.append(("master_candidate", "auto-demotion due to offline"))
2446
        if node.drained:
2447
          node.drained = False
2448
          result.append(("drained", "clear drained status due to offline"))
2449

    
2450
    if self.op.master_candidate is not None:
2451
      node.master_candidate = self.op.master_candidate
2452
      changed_mc = True
2453
      result.append(("master_candidate", str(self.op.master_candidate)))
2454
      if self.op.master_candidate == False:
2455
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2456
        msg = rrc.fail_msg
2457
        if msg:
2458
          self.LogWarning("Node failed to demote itself: %s" % msg)
2459

    
2460
    if self.op.drained is not None:
2461
      node.drained = self.op.drained
2462
      result.append(("drained", str(self.op.drained)))
2463
      if self.op.drained == True:
2464
        if node.master_candidate:
2465
          node.master_candidate = False
2466
          changed_mc = True
2467
          result.append(("master_candidate", "auto-demotion due to drain"))
2468
        if node.offline:
2469
          node.offline = False
2470
          result.append(("offline", "clear offline status due to drain"))
2471

    
2472
    # this will trigger configuration file update, if needed
2473
    self.cfg.Update(node)
2474
    # this will trigger job queue propagation or cleanup
2475
    if changed_mc:
2476
      self.context.ReaddNode(node)
2477

    
2478
    return result
2479

    
2480

    
2481
class LUPowercycleNode(NoHooksLU):
2482
  """Powercycles a node.
2483

2484
  """
2485
  _OP_REQP = ["node_name", "force"]
2486
  REQ_BGL = False
2487

    
2488
  def CheckArguments(self):
2489
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2490
    if node_name is None:
2491
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2492
    self.op.node_name = node_name
2493
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
2494
      raise errors.OpPrereqError("The node is the master and the force"
2495
                                 " parameter was not set")
2496

    
2497
  def ExpandNames(self):
2498
    """Locking for PowercycleNode.
2499

2500
    This is a last-resource option and shouldn't block on other
2501
    jobs. Therefore, we grab no locks.
2502

2503
    """
2504
    self.needed_locks = {}
2505

    
2506
  def CheckPrereq(self):
2507
    """Check prerequisites.
2508

2509
    This LU has no prereqs.
2510

2511
    """
2512
    pass
2513

    
2514
  def Exec(self, feedback_fn):
2515
    """Reboots a node.
2516

2517
    """
2518
    result = self.rpc.call_node_powercycle(self.op.node_name,
2519
                                           self.cfg.GetHypervisorType())
2520
    result.Raise("Failed to schedule the reboot")
2521
    return result.payload
2522

    
2523

    
2524
class LUQueryClusterInfo(NoHooksLU):
2525
  """Query cluster configuration.
2526

2527
  """
2528
  _OP_REQP = []
2529
  REQ_BGL = False
2530

    
2531
  def ExpandNames(self):
2532
    self.needed_locks = {}
2533

    
2534
  def CheckPrereq(self):
2535
    """No prerequsites needed for this LU.
2536

2537
    """
2538
    pass
2539

    
2540
  def Exec(self, feedback_fn):
2541
    """Return cluster config.
2542

2543
    """
2544
    cluster = self.cfg.GetClusterInfo()
2545
    result = {
2546
      "software_version": constants.RELEASE_VERSION,
2547
      "protocol_version": constants.PROTOCOL_VERSION,
2548
      "config_version": constants.CONFIG_VERSION,
2549
      "os_api_version": constants.OS_API_VERSION,
2550
      "export_version": constants.EXPORT_VERSION,
2551
      "architecture": (platform.architecture()[0], platform.machine()),
2552
      "name": cluster.cluster_name,
2553
      "master": cluster.master_node,
2554
      "default_hypervisor": cluster.default_hypervisor,
2555
      "enabled_hypervisors": cluster.enabled_hypervisors,
2556
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2557
                        for hypervisor in cluster.enabled_hypervisors]),
2558
      "beparams": cluster.beparams,
2559
      "nicparams": cluster.nicparams,
2560
      "candidate_pool_size": cluster.candidate_pool_size,
2561
      "master_netdev": cluster.master_netdev,
2562
      "volume_group_name": cluster.volume_group_name,
2563
      "file_storage_dir": cluster.file_storage_dir,
2564
      }
2565

    
2566
    return result
2567

    
2568

    
2569
class LUQueryConfigValues(NoHooksLU):
2570
  """Return configuration values.
2571

2572
  """
2573
  _OP_REQP = []
2574
  REQ_BGL = False
2575
  _FIELDS_DYNAMIC = utils.FieldSet()
2576
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2577

    
2578
  def ExpandNames(self):
2579
    self.needed_locks = {}
2580

    
2581
    _CheckOutputFields(static=self._FIELDS_STATIC,
2582
                       dynamic=self._FIELDS_DYNAMIC,
2583
                       selected=self.op.output_fields)
2584

    
2585
  def CheckPrereq(self):
2586
    """No prerequisites.
2587

2588
    """
2589
    pass
2590

    
2591
  def Exec(self, feedback_fn):
2592
    """Dump a representation of the cluster config to the standard output.
2593

2594
    """
2595
    values = []
2596
    for field in self.op.output_fields:
2597
      if field == "cluster_name":
2598
        entry = self.cfg.GetClusterName()
2599
      elif field == "master_node":
2600
        entry = self.cfg.GetMasterNode()
2601
      elif field == "drain_flag":
2602
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2603
      else:
2604
        raise errors.ParameterError(field)
2605
      values.append(entry)
2606
    return values
2607

    
2608

    
2609
class LUActivateInstanceDisks(NoHooksLU):
2610
  """Bring up an instance's disks.
2611

2612
  """
2613
  _OP_REQP = ["instance_name"]
2614
  REQ_BGL = False
2615

    
2616
  def ExpandNames(self):
2617
    self._ExpandAndLockInstance()
2618
    self.needed_locks[locking.LEVEL_NODE] = []
2619
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2620

    
2621
  def DeclareLocks(self, level):
2622
    if level == locking.LEVEL_NODE:
2623
      self._LockInstancesNodes()
2624

    
2625
  def CheckPrereq(self):
2626
    """Check prerequisites.
2627

2628
    This checks that the instance is in the cluster.
2629

2630
    """
2631
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2632
    assert self.instance is not None, \
2633
      "Cannot retrieve locked instance %s" % self.op.instance_name
2634
    _CheckNodeOnline(self, self.instance.primary_node)
2635

    
2636
  def Exec(self, feedback_fn):
2637
    """Activate the disks.
2638

2639
    """
2640
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2641
    if not disks_ok:
2642
      raise errors.OpExecError("Cannot activate block devices")
2643

    
2644
    return disks_info
2645

    
2646

    
2647
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2648
  """Prepare the block devices for an instance.
2649

2650
  This sets up the block devices on all nodes.
2651

2652
  @type lu: L{LogicalUnit}
2653
  @param lu: the logical unit on whose behalf we execute
2654
  @type instance: L{objects.Instance}
2655
  @param instance: the instance for whose disks we assemble
2656
  @type ignore_secondaries: boolean
2657
  @param ignore_secondaries: if true, errors on secondary nodes
2658
      won't result in an error return from the function
2659
  @return: False if the operation failed, otherwise a list of
2660
      (host, instance_visible_name, node_visible_name)
2661
      with the mapping from node devices to instance devices
2662

2663
  """
2664
  device_info = []
2665
  disks_ok = True
2666
  iname = instance.name
2667
  # With the two passes mechanism we try to reduce the window of
2668
  # opportunity for the race condition of switching DRBD to primary
2669
  # before handshaking occured, but we do not eliminate it
2670

    
2671
  # The proper fix would be to wait (with some limits) until the
2672
  # connection has been made and drbd transitions from WFConnection
2673
  # into any other network-connected state (Connected, SyncTarget,
2674
  # SyncSource, etc.)
2675

    
2676
  # 1st pass, assemble on all nodes in secondary mode
2677
  for inst_disk in instance.disks:
2678
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2679
      lu.cfg.SetDiskID(node_disk, node)
2680
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2681
      msg = result.fail_msg
2682
      if msg:
2683
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2684
                           " (is_primary=False, pass=1): %s",
2685
                           inst_disk.iv_name, node, msg)
2686
        if not ignore_secondaries:
2687
          disks_ok = False
2688

    
2689
  # FIXME: race condition on drbd migration to primary
2690

    
2691
  # 2nd pass, do only the primary node
2692
  for inst_disk in instance.disks:
2693
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2694
      if node != instance.primary_node:
2695
        continue
2696
      lu.cfg.SetDiskID(node_disk, node)
2697
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2698
      msg = result.fail_msg
2699
      if msg:
2700
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2701
                           " (is_primary=True, pass=2): %s",
2702
                           inst_disk.iv_name, node, msg)
2703
        disks_ok = False
2704
    device_info.append((instance.primary_node, inst_disk.iv_name,
2705
                        result.payload))
2706

    
2707
  # leave the disks configured for the primary node
2708
  # this is a workaround that would be fixed better by
2709
  # improving the logical/physical id handling
2710
  for disk in instance.disks:
2711
    lu.cfg.SetDiskID(disk, instance.primary_node)
2712

    
2713
  return disks_ok, device_info
2714

    
2715

    
2716
def _StartInstanceDisks(lu, instance, force):
2717
  """Start the disks of an instance.
2718

2719
  """
2720
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2721
                                           ignore_secondaries=force)
2722
  if not disks_ok:
2723
    _ShutdownInstanceDisks(lu, instance)
2724
    if force is not None and not force:
2725
      lu.proc.LogWarning("", hint="If the message above refers to a"
2726
                         " secondary node,"
2727
                         " you can retry the operation using '--force'.")
2728
    raise errors.OpExecError("Disk consistency error")
2729

    
2730

    
2731
class LUDeactivateInstanceDisks(NoHooksLU):
2732
  """Shutdown an instance's disks.
2733

2734
  """
2735
  _OP_REQP = ["instance_name"]
2736
  REQ_BGL = False
2737

    
2738
  def ExpandNames(self):
2739
    self._ExpandAndLockInstance()
2740
    self.needed_locks[locking.LEVEL_NODE] = []
2741
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2742

    
2743
  def DeclareLocks(self, level):
2744
    if level == locking.LEVEL_NODE:
2745
      self._LockInstancesNodes()
2746

    
2747
  def CheckPrereq(self):
2748
    """Check prerequisites.
2749

2750
    This checks that the instance is in the cluster.
2751

2752
    """
2753
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2754
    assert self.instance is not None, \
2755
      "Cannot retrieve locked instance %s" % self.op.instance_name
2756

    
2757
  def Exec(self, feedback_fn):
2758
    """Deactivate the disks
2759

2760
    """
2761
    instance = self.instance
2762
    _SafeShutdownInstanceDisks(self, instance)
2763

    
2764

    
2765
def _SafeShutdownInstanceDisks(lu, instance):
2766
  """Shutdown block devices of an instance.
2767

2768
  This function checks if an instance is running, before calling
2769
  _ShutdownInstanceDisks.
2770

2771
  """
2772
  pnode = instance.primary_node
2773
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2774
  ins_l.Raise("Can't contact node %s" % pnode)
2775

    
2776
  if instance.name in ins_l.payload:
2777
    raise errors.OpExecError("Instance is running, can't shutdown"
2778
                             " block devices.")
2779

    
2780
  _ShutdownInstanceDisks(lu, instance)
2781

    
2782

    
2783
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2784
  """Shutdown block devices of an instance.
2785

2786
  This does the shutdown on all nodes of the instance.
2787

2788
  If the ignore_primary is false, errors on the primary node are
2789
  ignored.
2790

2791
  """
2792
  all_result = True
2793
  for disk in instance.disks:
2794
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2795
      lu.cfg.SetDiskID(top_disk, node)
2796
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2797
      msg = result.fail_msg
2798
      if msg:
2799
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2800
                      disk.iv_name, node, msg)
2801
        if not ignore_primary or node != instance.primary_node:
2802
          all_result = False
2803
  return all_result
2804

    
2805

    
2806
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2807
  """Checks if a node has enough free memory.
2808

2809
  This function check if a given node has the needed amount of free
2810
  memory. In case the node has less memory or we cannot get the
2811
  information from the node, this function raise an OpPrereqError
2812
  exception.
2813

2814
  @type lu: C{LogicalUnit}
2815
  @param lu: a logical unit from which we get configuration data
2816
  @type node: C{str}
2817
  @param node: the node to check
2818
  @type reason: C{str}
2819
  @param reason: string to use in the error message
2820
  @type requested: C{int}
2821
  @param requested: the amount of memory in MiB to check for
2822
  @type hypervisor_name: C{str}
2823
  @param hypervisor_name: the hypervisor to ask for memory stats
2824
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2825
      we cannot check the node
2826

2827
  """
2828
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2829
  nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2830
  free_mem = nodeinfo[node].payload.get('memory_free', None)
2831
  if not isinstance(free_mem, int):
2832
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2833
                               " was '%s'" % (node, free_mem))
2834
  if requested > free_mem:
2835
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2836
                               " needed %s MiB, available %s MiB" %
2837
                               (node, reason, requested, free_mem))
2838

    
2839

    
2840
class LUStartupInstance(LogicalUnit):
2841
  """Starts an instance.
2842

2843
  """
2844
  HPATH = "instance-start"
2845
  HTYPE = constants.HTYPE_INSTANCE
2846
  _OP_REQP = ["instance_name", "force"]
2847
  REQ_BGL = False
2848

    
2849
  def ExpandNames(self):
2850
    self._ExpandAndLockInstance()
2851

    
2852
  def BuildHooksEnv(self):
2853
    """Build hooks env.
2854

2855
    This runs on master, primary and secondary nodes of the instance.
2856

2857
    """
2858
    env = {
2859
      "FORCE": self.op.force,
2860
      }
2861
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2862
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2863
    return env, nl, nl
2864

    
2865
  def CheckPrereq(self):
2866
    """Check prerequisites.
2867

2868
    This checks that the instance is in the cluster.
2869

2870
    """
2871
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2872
    assert self.instance is not None, \
2873
      "Cannot retrieve locked instance %s" % self.op.instance_name
2874

    
2875
    # extra beparams
2876
    self.beparams = getattr(self.op, "beparams", {})
2877
    if self.beparams:
2878
      if not isinstance(self.beparams, dict):
2879
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2880
                                   " dict" % (type(self.beparams), ))
2881
      # fill the beparams dict
2882
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2883
      self.op.beparams = self.beparams
2884

    
2885
    # extra hvparams
2886
    self.hvparams = getattr(self.op, "hvparams", {})
2887
    if self.hvparams:
2888
      if not isinstance(self.hvparams, dict):
2889
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2890
                                   " dict" % (type(self.hvparams), ))
2891

    
2892
      # check hypervisor parameter syntax (locally)
2893
      cluster = self.cfg.GetClusterInfo()
2894
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2895
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2896
                                    instance.hvparams)
2897
      filled_hvp.update(self.hvparams)
2898
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2899
      hv_type.CheckParameterSyntax(filled_hvp)
2900
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2901
      self.op.hvparams = self.hvparams
2902

    
2903
    _CheckNodeOnline(self, instance.primary_node)
2904

    
2905
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2906
    # check bridges existance
2907
    _CheckInstanceBridgesExist(self, instance)
2908

    
2909
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2910
                                              instance.name,
2911
                                              instance.hypervisor)
2912
    remote_info.Raise("Error checking node %s" % instance.primary_node,
2913
                      prereq=True)
2914
    if not remote_info.payload: # not running already
2915
      _CheckNodeFreeMemory(self, instance.primary_node,
2916
                           "starting instance %s" % instance.name,
2917
                           bep[constants.BE_MEMORY], instance.hypervisor)
2918

    
2919
  def Exec(self, feedback_fn):
2920
    """Start the instance.
2921

2922
    """
2923
    instance = self.instance
2924
    force = self.op.force
2925

    
2926
    self.cfg.MarkInstanceUp(instance.name)
2927

    
2928
    node_current = instance.primary_node
2929

    
2930
    _StartInstanceDisks(self, instance, force)
2931

    
2932
    result = self.rpc.call_instance_start(node_current, instance,
2933
                                          self.hvparams, self.beparams)
2934
    msg = result.fail_msg
2935
    if msg:
2936
      _ShutdownInstanceDisks(self, instance)
2937
      raise errors.OpExecError("Could not start instance: %s" % msg)
2938

    
2939

    
2940
class LURebootInstance(LogicalUnit):
2941
  """Reboot an instance.
2942

2943
  """
2944
  HPATH = "instance-reboot"
2945
  HTYPE = constants.HTYPE_INSTANCE
2946
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2947
  REQ_BGL = False
2948

    
2949
  def ExpandNames(self):
2950
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2951
                                   constants.INSTANCE_REBOOT_HARD,
2952
                                   constants.INSTANCE_REBOOT_FULL]:
2953
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2954
                                  (constants.INSTANCE_REBOOT_SOFT,
2955
                                   constants.INSTANCE_REBOOT_HARD,
2956
                                   constants.INSTANCE_REBOOT_FULL))
2957
    self._ExpandAndLockInstance()
2958

    
2959
  def BuildHooksEnv(self):
2960
    """Build hooks env.
2961

2962
    This runs on master, primary and secondary nodes of the instance.
2963

2964
    """
2965
    env = {
2966
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2967
      "REBOOT_TYPE": self.op.reboot_type,
2968
      }
2969
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2970
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2971
    return env, nl, nl
2972

    
2973
  def CheckPrereq(self):
2974
    """Check prerequisites.
2975

2976
    This checks that the instance is in the cluster.
2977

2978
    """
2979
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2980
    assert self.instance is not None, \
2981
      "Cannot retrieve locked instance %s" % self.op.instance_name
2982

    
2983
    _CheckNodeOnline(self, instance.primary_node)
2984

    
2985
    # check bridges existance
2986
    _CheckInstanceBridgesExist(self, instance)
2987

    
2988
  def Exec(self, feedback_fn):
2989
    """Reboot the instance.
2990

2991
    """
2992
    instance = self.instance
2993
    ignore_secondaries = self.op.ignore_secondaries
2994
    reboot_type = self.op.reboot_type
2995

    
2996
    node_current = instance.primary_node
2997

    
2998
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2999
                       constants.INSTANCE_REBOOT_HARD]:
3000
      for disk in instance.disks:
3001
        self.cfg.SetDiskID(disk, node_current)
3002
      result = self.rpc.call_instance_reboot(node_current, instance,
3003
                                             reboot_type)
3004
      result.Raise("Could not reboot instance")
3005
    else:
3006
      result = self.rpc.call_instance_shutdown(node_current, instance)
3007
      result.Raise("Could not shutdown instance for full reboot")
3008
      _ShutdownInstanceDisks(self, instance)
3009
      _StartInstanceDisks(self, instance, ignore_secondaries)
3010
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3011
      msg = result.fail_msg
3012
      if msg:
3013
        _ShutdownInstanceDisks(self, instance)
3014
        raise errors.OpExecError("Could not start instance for"
3015
                                 " full reboot: %s" % msg)
3016

    
3017
    self.cfg.MarkInstanceUp(instance.name)
3018

    
3019

    
3020
class LUShutdownInstance(LogicalUnit):
3021
  """Shutdown an instance.
3022

3023
  """
3024
  HPATH = "instance-stop"
3025
  HTYPE = constants.HTYPE_INSTANCE
3026
  _OP_REQP = ["instance_name"]
3027
  REQ_BGL = False
3028

    
3029
  def ExpandNames(self):
3030
    self._ExpandAndLockInstance()
3031

    
3032
  def BuildHooksEnv(self):
3033
    """Build hooks env.
3034

3035
    This runs on master, primary and secondary nodes of the instance.
3036

3037
    """
3038
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3039
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3040
    return env, nl, nl
3041

    
3042
  def CheckPrereq(self):
3043
    """Check prerequisites.
3044

3045
    This checks that the instance is in the cluster.
3046

3047
    """
3048
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3049
    assert self.instance is not None, \
3050
      "Cannot retrieve locked instance %s" % self.op.instance_name
3051
    _CheckNodeOnline(self, self.instance.primary_node)
3052

    
3053
  def Exec(self, feedback_fn):
3054
    """Shutdown the instance.
3055

3056
    """
3057
    instance = self.instance
3058
    node_current = instance.primary_node
3059
    self.cfg.MarkInstanceDown(instance.name)
3060
    result = self.rpc.call_instance_shutdown(node_current, instance)
3061
    msg = result.fail_msg
3062
    if msg:
3063
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3064

    
3065
    _ShutdownInstanceDisks(self, instance)
3066

    
3067

    
3068
class LUReinstallInstance(LogicalUnit):
3069
  """Reinstall an instance.
3070

3071
  """
3072
  HPATH = "instance-reinstall"
3073
  HTYPE = constants.HTYPE_INSTANCE
3074
  _OP_REQP = ["instance_name"]
3075
  REQ_BGL = False
3076

    
3077
  def ExpandNames(self):
3078
    self._ExpandAndLockInstance()
3079

    
3080
  def BuildHooksEnv(self):
3081
    """Build hooks env.
3082

3083
    This runs on master, primary and secondary nodes of the instance.
3084

3085
    """
3086
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3087
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3088
    return env, nl, nl
3089

    
3090
  def CheckPrereq(self):
3091
    """Check prerequisites.
3092

3093
    This checks that the instance is in the cluster and is not running.
3094

3095
    """
3096
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3097
    assert instance is not None, \
3098
      "Cannot retrieve locked instance %s" % self.op.instance_name
3099
    _CheckNodeOnline(self, instance.primary_node)
3100

    
3101
    if instance.disk_template == constants.DT_DISKLESS:
3102
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3103
                                 self.op.instance_name)
3104
    if instance.admin_up:
3105
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3106
                                 self.op.instance_name)
3107
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3108
                                              instance.name,
3109
                                              instance.hypervisor)
3110
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3111
                      prereq=True)
3112
    if remote_info.payload:
3113
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3114
                                 (self.op.instance_name,
3115
                                  instance.primary_node))
3116

    
3117
    self.op.os_type = getattr(self.op, "os_type", None)
3118
    if self.op.os_type is not None:
3119
      # OS verification
3120
      pnode = self.cfg.GetNodeInfo(
3121
        self.cfg.ExpandNodeName(instance.primary_node))
3122
      if pnode is None:
3123
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3124
                                   self.op.pnode)
3125
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3126
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3127
                   (self.op.os_type, pnode.name), prereq=True)
3128

    
3129
    self.instance = instance
3130

    
3131
  def Exec(self, feedback_fn):
3132
    """Reinstall the instance.
3133

3134
    """
3135
    inst = self.instance
3136

    
3137
    if self.op.os_type is not None:
3138
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3139
      inst.os = self.op.os_type
3140
      self.cfg.Update(inst)
3141

    
3142
    _StartInstanceDisks(self, inst, None)
3143
    try:
3144
      feedback_fn("Running the instance OS create scripts...")
3145
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3146
      result.Raise("Could not install OS for instance %s on node %s" %
3147
                   (inst.name, inst.primary_node))
3148
    finally:
3149
      _ShutdownInstanceDisks(self, inst)
3150

    
3151

    
3152
class LURenameInstance(LogicalUnit):
3153
  """Rename an instance.
3154

3155
  """
3156
  HPATH = "instance-rename"
3157
  HTYPE = constants.HTYPE_INSTANCE
3158
  _OP_REQP = ["instance_name", "new_name"]
3159

    
3160
  def BuildHooksEnv(self):
3161
    """Build hooks env.
3162

3163
    This runs on master, primary and secondary nodes of the instance.
3164

3165
    """
3166
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3167
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3168
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3169
    return env, nl, nl
3170

    
3171
  def CheckPrereq(self):
3172
    """Check prerequisites.
3173

3174
    This checks that the instance is in the cluster and is not running.
3175

3176
    """
3177
    instance = self.cfg.GetInstanceInfo(
3178
      self.cfg.ExpandInstanceName(self.op.instance_name))
3179
    if instance is None:
3180
      raise errors.OpPrereqError("Instance '%s' not known" %
3181
                                 self.op.instance_name)
3182
    _CheckNodeOnline(self, instance.primary_node)
3183

    
3184
    if instance.admin_up:
3185
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3186
                                 self.op.instance_name)
3187
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3188
                                              instance.name,
3189
                                              instance.hypervisor)
3190
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3191
                      prereq=True)
3192
    if remote_info.payload:
3193
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3194
                                 (self.op.instance_name,
3195
                                  instance.primary_node))
3196
    self.instance = instance
3197

    
3198
    # new name verification
3199
    name_info = utils.HostInfo(self.op.new_name)
3200

    
3201
    self.op.new_name = new_name = name_info.name
3202
    instance_list = self.cfg.GetInstanceList()
3203
    if new_name in instance_list:
3204
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3205
                                 new_name)
3206

    
3207
    if not getattr(self.op, "ignore_ip", False):
3208
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3209
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3210
                                   (name_info.ip, new_name))
3211

    
3212

    
3213
  def Exec(self, feedback_fn):
3214
    """Reinstall the instance.
3215

3216
    """
3217
    inst = self.instance
3218
    old_name = inst.name
3219

    
3220
    if inst.disk_template == constants.DT_FILE:
3221
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3222

    
3223
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3224
    # Change the instance lock. This is definitely safe while we hold the BGL
3225
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3226
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3227

    
3228
    # re-read the instance from the configuration after rename
3229
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3230

    
3231
    if inst.disk_template == constants.DT_FILE:
3232
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3233
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3234
                                                     old_file_storage_dir,
3235
                                                     new_file_storage_dir)
3236
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
3237
                   " (but the instance has been renamed in Ganeti)" %
3238
                   (inst.primary_node, old_file_storage_dir,
3239
                    new_file_storage_dir))
3240

    
3241
    _StartInstanceDisks(self, inst, None)
3242
    try:
3243
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3244
                                                 old_name)
3245
      msg = result.fail_msg
3246
      if msg:
3247
        msg = ("Could not run OS rename script for instance %s on node %s"
3248
               " (but the instance has been renamed in Ganeti): %s" %
3249
               (inst.name, inst.primary_node, msg))
3250
        self.proc.LogWarning(msg)
3251
    finally:
3252
      _ShutdownInstanceDisks(self, inst)
3253

    
3254

    
3255
class LURemoveInstance(LogicalUnit):
3256
  """Remove an instance.
3257

3258
  """
3259
  HPATH = "instance-remove"
3260
  HTYPE = constants.HTYPE_INSTANCE
3261
  _OP_REQP = ["instance_name", "ignore_failures"]
3262
  REQ_BGL = False
3263

    
3264
  def ExpandNames(self):
3265
    self._ExpandAndLockInstance()
3266
    self.needed_locks[locking.LEVEL_NODE] = []
3267
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3268

    
3269
  def DeclareLocks(self, level):
3270
    if level == locking.LEVEL_NODE:
3271
      self._LockInstancesNodes()
3272

    
3273
  def BuildHooksEnv(self):
3274
    """Build hooks env.
3275

3276
    This runs on master, primary and secondary nodes of the instance.
3277

3278
    """
3279
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3280
    nl = [self.cfg.GetMasterNode()]
3281
    return env, nl, nl
3282

    
3283
  def CheckPrereq(self):
3284
    """Check prerequisites.
3285

3286
    This checks that the instance is in the cluster.
3287

3288
    """
3289
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3290
    assert self.instance is not None, \
3291
      "Cannot retrieve locked instance %s" % self.op.instance_name
3292

    
3293
  def Exec(self, feedback_fn):
3294
    """Remove the instance.
3295

3296
    """
3297
    instance = self.instance
3298
    logging.info("Shutting down instance %s on node %s",
3299
                 instance.name, instance.primary_node)
3300

    
3301
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3302
    msg = result.fail_msg
3303
    if msg:
3304
      if self.op.ignore_failures:
3305
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3306
      else:
3307
        raise errors.OpExecError("Could not shutdown instance %s on"
3308
                                 " node %s: %s" %
3309
                                 (instance.name, instance.primary_node, msg))
3310

    
3311
    logging.info("Removing block devices for instance %s", instance.name)
3312

    
3313
    if not _RemoveDisks(self, instance):
3314
      if self.op.ignore_failures:
3315
        feedback_fn("Warning: can't remove instance's disks")
3316
      else:
3317
        raise errors.OpExecError("Can't remove instance's disks")
3318

    
3319
    logging.info("Removing instance %s out of cluster config", instance.name)
3320

    
3321
    self.cfg.RemoveInstance(instance.name)
3322
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3323

    
3324

    
3325
class LUQueryInstances(NoHooksLU):
3326
  """Logical unit for querying instances.
3327

3328
  """
3329
  _OP_REQP = ["output_fields", "names", "use_locking"]
3330
  REQ_BGL = False
3331
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3332
                                    "admin_state",
3333
                                    "disk_template", "ip", "mac", "bridge",
3334
                                    "sda_size", "sdb_size", "vcpus", "tags",
3335
                                    "network_port", "beparams",
3336
                                    r"(disk)\.(size)/([0-9]+)",
3337
                                    r"(disk)\.(sizes)", "disk_usage",
3338
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3339
                                    r"(nic)\.(macs|ips|bridges)",
3340
                                    r"(disk|nic)\.(count)",
3341
                                    "serial_no", "hypervisor", "hvparams",] +
3342
                                  ["hv/%s" % name
3343
                                   for name in constants.HVS_PARAMETERS] +
3344
                                  ["be/%s" % name
3345
                                   for name in constants.BES_PARAMETERS])
3346
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3347

    
3348

    
3349
  def ExpandNames(self):
3350
    _CheckOutputFields(static=self._FIELDS_STATIC,
3351
                       dynamic=self._FIELDS_DYNAMIC,
3352
                       selected=self.op.output_fields)
3353

    
3354
    self.needed_locks = {}
3355
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3356
    self.share_locks[locking.LEVEL_NODE] = 1
3357

    
3358
    if self.op.names:
3359
      self.wanted = _GetWantedInstances(self, self.op.names)
3360
    else:
3361
      self.wanted = locking.ALL_SET
3362

    
3363
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3364
    self.do_locking = self.do_node_query and self.op.use_locking
3365
    if self.do_locking:
3366
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3367
      self.needed_locks[locking.LEVEL_NODE] = []
3368
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3369

    
3370
  def DeclareLocks(self, level):
3371
    if level == locking.LEVEL_NODE and self.do_locking:
3372
      self._LockInstancesNodes()
3373

    
3374
  def CheckPrereq(self):
3375
    """Check prerequisites.
3376

3377
    """
3378
    pass
3379

    
3380
  def Exec(self, feedback_fn):
3381
    """Computes the list of nodes and their attributes.
3382

3383
    """
3384
    all_info = self.cfg.GetAllInstancesInfo()
3385
    if self.wanted == locking.ALL_SET:
3386
      # caller didn't specify instance names, so ordering is not important
3387
      if self.do_locking:
3388
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3389
      else:
3390
        instance_names = all_info.keys()
3391
      instance_names = utils.NiceSort(instance_names)
3392
    else:
3393
      # caller did specify names, so we must keep the ordering
3394
      if self.do_locking:
3395
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3396
      else:
3397
        tgt_set = all_info.keys()
3398
      missing = set(self.wanted).difference(tgt_set)
3399
      if missing:
3400
        raise errors.OpExecError("Some instances were removed before"
3401
                                 " retrieving their data: %s" % missing)
3402
      instance_names = self.wanted
3403

    
3404
    instance_list = [all_info[iname] for iname in instance_names]
3405

    
3406
    # begin data gathering
3407

    
3408
    nodes = frozenset([inst.primary_node for inst in instance_list])
3409
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3410

    
3411
    bad_nodes = []
3412
    off_nodes = []
3413
    if self.do_node_query:
3414
      live_data = {}
3415
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3416
      for name in nodes:
3417
        result = node_data[name]
3418
        if result.offline:
3419
          # offline nodes will be in both lists
3420
          off_nodes.append(name)
3421
        if result.failed or result.fail_msg:
3422
          bad_nodes.append(name)
3423
        else:
3424
          if result.payload:
3425
            live_data.update(result.payload)
3426
          # else no instance is alive
3427
    else:
3428
      live_data = dict([(name, {}) for name in instance_names])
3429

    
3430
    # end data gathering
3431

    
3432
    HVPREFIX = "hv/"
3433
    BEPREFIX = "be/"
3434
    output = []
3435
    for instance in instance_list:
3436
      iout = []
3437
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3438
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3439
      for field in self.op.output_fields:
3440
        st_match = self._FIELDS_STATIC.Matches(field)
3441
        if field == "name":
3442
          val = instance.name
3443
        elif field == "os":
3444
          val = instance.os
3445
        elif field == "pnode":
3446
          val = instance.primary_node
3447
        elif field == "snodes":
3448
          val = list(instance.secondary_nodes)
3449
        elif field == "admin_state":
3450
          val = instance.admin_up
3451
        elif field == "oper_state":
3452
          if instance.primary_node in bad_nodes:
3453
            val = None
3454
          else:
3455
            val = bool(live_data.get(instance.name))
3456
        elif field == "status":
3457
          if instance.primary_node in off_nodes:
3458
            val = "ERROR_nodeoffline"
3459
          elif instance.primary_node in bad_nodes:
3460
            val = "ERROR_nodedown"
3461
          else:
3462
            running = bool(live_data.get(instance.name))
3463
            if running:
3464
              if instance.admin_up:
3465
                val = "running"
3466
              else:
3467
                val = "ERROR_up"
3468
            else:
3469
              if instance.admin_up:
3470
                val = "ERROR_down"
3471
              else:
3472
                val = "ADMIN_down"
3473
        elif field == "oper_ram":
3474
          if instance.primary_node in bad_nodes:
3475
            val = None
3476
          elif instance.name in live_data:
3477
            val = live_data[instance.name].get("memory", "?")
3478
          else:
3479
            val = "-"
3480
        elif field == "disk_template":
3481
          val = instance.disk_template
3482
        elif field == "ip":
3483
          val = instance.nics[0].ip
3484
        elif field == "bridge":
3485
          val = instance.nics[0].bridge
3486
        elif field == "mac":
3487
          val = instance.nics[0].mac
3488
        elif field == "sda_size" or field == "sdb_size":
3489
          idx = ord(field[2]) - ord('a')
3490
          try:
3491
            val = instance.FindDisk(idx).size
3492
          except errors.OpPrereqError:
3493
            val = None
3494
        elif field == "disk_usage": # total disk usage per node
3495
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3496
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3497
        elif field == "tags":
3498
          val = list(instance.GetTags())
3499
        elif field == "serial_no":
3500
          val = instance.serial_no
3501
        elif field == "network_port":
3502
          val = instance.network_port
3503
        elif field == "hypervisor":
3504
          val = instance.hypervisor
3505
        elif field == "hvparams":
3506
          val = i_hv
3507
        elif (field.startswith(HVPREFIX) and
3508
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3509
          val = i_hv.get(field[len(HVPREFIX):], None)
3510
        elif field == "beparams":
3511
          val = i_be
3512
        elif (field.startswith(BEPREFIX) and
3513
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3514
          val = i_be.get(field[len(BEPREFIX):], None)
3515
        elif st_match and st_match.groups():
3516
          # matches a variable list
3517
          st_groups = st_match.groups()
3518
          if st_groups and st_groups[0] == "disk":
3519
            if st_groups[1] == "count":
3520
              val = len(instance.disks)
3521
            elif st_groups[1] == "sizes":
3522
              val = [disk.size for disk in instance.disks]
3523
            elif st_groups[1] == "size":
3524
              try:
3525
                val = instance.FindDisk(st_groups[2]).size
3526
              except errors.OpPrereqError:
3527
                val = None
3528
            else:
3529
              assert False, "Unhandled disk parameter"
3530
          elif st_groups[0] == "nic":
3531
            if st_groups[1] == "count":
3532
              val = len(instance.nics)
3533
            elif st_groups[1] == "macs":
3534
              val = [nic.mac for nic in instance.nics]
3535
            elif st_groups[1] == "ips":
3536
              val = [nic.ip for nic in instance.nics]
3537
            elif st_groups[1] == "bridges":
3538
              val = [nic.bridge for nic in instance.nics]
3539
            else:
3540
              # index-based item
3541
              nic_idx = int(st_groups[2])
3542
              if nic_idx >= len(instance.nics):
3543
                val = None
3544
              else:
3545
                if st_groups[1] == "mac":
3546
                  val = instance.nics[nic_idx].mac
3547
                elif st_groups[1] == "ip":
3548
                  val = instance.nics[nic_idx].ip
3549
                elif st_groups[1] == "bridge":
3550
                  val = instance.nics[nic_idx].bridge
3551
                else:
3552
                  assert False, "Unhandled NIC parameter"
3553
          else:
3554
            assert False, "Unhandled variable parameter"
3555
        else:
3556
          raise errors.ParameterError(field)
3557
        iout.append(val)
3558
      output.append(iout)
3559

    
3560
    return output
3561

    
3562

    
3563
class LUFailoverInstance(LogicalUnit):
3564
  """Failover an instance.
3565

3566
  """
3567
  HPATH = "instance-failover"
3568
  HTYPE = constants.HTYPE_INSTANCE
3569
  _OP_REQP = ["instance_name", "ignore_consistency"]
3570
  REQ_BGL = False
3571

    
3572
  def ExpandNames(self):
3573
    self._ExpandAndLockInstance()
3574
    self.needed_locks[locking.LEVEL_NODE] = []
3575
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3576

    
3577
  def DeclareLocks(self, level):
3578
    if level == locking.LEVEL_NODE:
3579
      self._LockInstancesNodes()
3580

    
3581
  def BuildHooksEnv(self):
3582
    """Build hooks env.
3583

3584
    This runs on master, primary and secondary nodes of the instance.
3585

3586
    """
3587
    env = {
3588
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3589
      }
3590
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3591
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3592
    return env, nl, nl
3593

    
3594
  def CheckPrereq(self):
3595
    """Check prerequisites.
3596

3597
    This checks that the instance is in the cluster.
3598

3599
    """
3600
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3601
    assert self.instance is not None, \
3602
      "Cannot retrieve locked instance %s" % self.op.instance_name
3603

    
3604
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3605
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3606
      raise errors.OpPrereqError("Instance's disk layout is not"
3607
                                 " network mirrored, cannot failover.")
3608

    
3609
    secondary_nodes = instance.secondary_nodes
3610
    if not secondary_nodes:
3611
      raise errors.ProgrammerError("no secondary node but using "
3612
                                   "a mirrored disk template")
3613

    
3614
    target_node = secondary_nodes[0]
3615
    _CheckNodeOnline(self, target_node)
3616
    _CheckNodeNotDrained(self, target_node)
3617
    # check memory requirements on the secondary node
3618
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3619
                         instance.name, bep[constants.BE_MEMORY],
3620
                         instance.hypervisor)
3621
    # check bridge existance
3622
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3623

    
3624
  def Exec(self, feedback_fn):
3625
    """Failover an instance.
3626

3627
    The failover is done by shutting it down on its present node and
3628
    starting it on the secondary.
3629

3630
    """
3631
    instance = self.instance
3632

    
3633
    source_node = instance.primary_node
3634
    target_node = instance.secondary_nodes[0]
3635

    
3636
    feedback_fn("* checking disk consistency between source and target")
3637
    for dev in instance.disks:
3638
      # for drbd, these are drbd over lvm
3639
      if not _CheckDiskConsistency(self, dev, target_node, False):
3640
        if instance.admin_up and not self.op.ignore_consistency:
3641
          raise errors.OpExecError("Disk %s is degraded on target node,"
3642
                                   " aborting failover." % dev.iv_name)
3643

    
3644
    feedback_fn("* shutting down instance on source node")
3645
    logging.info("Shutting down instance %s on node %s",
3646
                 instance.name, source_node)
3647

    
3648
    result = self.rpc.call_instance_shutdown(source_node, instance)
3649
    msg = result.fail_msg
3650
    if msg:
3651
      if self.op.ignore_consistency:
3652
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3653
                             " Proceeding anyway. Please make sure node"
3654
                             " %s is down. Error details: %s",
3655
                             instance.name, source_node, source_node, msg)
3656
      else:
3657
        raise errors.OpExecError("Could not shutdown instance %s on"
3658
                                 " node %s: %s" %
3659
                                 (instance.name, source_node, msg))
3660

    
3661
    feedback_fn("* deactivating the instance's disks on source node")
3662
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3663
      raise errors.OpExecError("Can't shut down the instance's disks.")
3664

    
3665
    instance.primary_node = target_node
3666
    # distribute new instance config to the other nodes
3667
    self.cfg.Update(instance)
3668

    
3669
    # Only start the instance if it's marked as up
3670
    if instance.admin_up:
3671
      feedback_fn("* activating the instance's disks on target node")
3672
      logging.info("Starting instance %s on node %s",
3673
                   instance.name, target_node)
3674

    
3675
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3676
                                               ignore_secondaries=True)
3677
      if not disks_ok:
3678
        _ShutdownInstanceDisks(self, instance)
3679
        raise errors.OpExecError("Can't activate the instance's disks")
3680

    
3681
      feedback_fn("* starting the instance on the target node")
3682
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3683
      msg = result.fail_msg
3684
      if msg:
3685
        _ShutdownInstanceDisks(self, instance)
3686
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3687
                                 (instance.name, target_node, msg))
3688

    
3689

    
3690
class LUMigrateInstance(LogicalUnit):
3691
  """Migrate an instance.
3692

3693
  This is migration without shutting down, compared to the failover,
3694
  which is done with shutdown.
3695

3696
  """
3697
  HPATH = "instance-migrate"
3698
  HTYPE = constants.HTYPE_INSTANCE
3699
  _OP_REQP = ["instance_name", "live", "cleanup"]
3700

    
3701
  REQ_BGL = False
3702

    
3703
  def ExpandNames(self):
3704
    self._ExpandAndLockInstance()
3705
    self.needed_locks[locking.LEVEL_NODE] = []
3706
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3707

    
3708
  def DeclareLocks(self, level):
3709
    if level == locking.LEVEL_NODE:
3710
      self._LockInstancesNodes()
3711

    
3712
  def BuildHooksEnv(self):
3713
    """Build hooks env.
3714

3715
    This runs on master, primary and secondary nodes of the instance.
3716

3717
    """
3718
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3719
    env["MIGRATE_LIVE"] = self.op.live
3720
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3721
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3722
    return env, nl, nl
3723

    
3724
  def CheckPrereq(self):
3725
    """Check prerequisites.
3726

3727
    This checks that the instance is in the cluster.
3728

3729
    """
3730
    instance = self.cfg.GetInstanceInfo(
3731
      self.cfg.ExpandInstanceName(self.op.instance_name))
3732
    if instance is None:
3733
      raise errors.OpPrereqError("Instance '%s' not known" %
3734
                                 self.op.instance_name)
3735

    
3736
    if instance.disk_template != constants.DT_DRBD8:
3737
      raise errors.OpPrereqError("Instance's disk layout is not"
3738
                                 " drbd8, cannot migrate.")
3739

    
3740
    secondary_nodes = instance.secondary_nodes
3741
    if not secondary_nodes:
3742
      raise errors.ConfigurationError("No secondary node but using"
3743
                                      " drbd8 disk template")
3744

    
3745
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3746

    
3747
    target_node = secondary_nodes[0]
3748
    # check memory requirements on the secondary node
3749
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3750
                         instance.name, i_be[constants.BE_MEMORY],
3751
                         instance.hypervisor)
3752

    
3753
    # check bridge existance
3754
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3755

    
3756
    if not self.op.cleanup:
3757
      _CheckNodeNotDrained(self, target_node)
3758
      result = self.rpc.call_instance_migratable(instance.primary_node,
3759
                                                 instance)
3760
      result.Raise("Can't migrate, please use failover", prereq=True)
3761

    
3762
    self.instance = instance
3763

    
3764
  def _WaitUntilSync(self):
3765
    """Poll with custom rpc for disk sync.
3766

3767
    This uses our own step-based rpc call.
3768

3769
    """
3770
    self.feedback_fn("* wait until resync is done")
3771
    all_done = False
3772
    while not all_done:
3773
      all_done = True
3774
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3775
                                            self.nodes_ip,
3776
                                            self.instance.disks)
3777
      min_percent = 100
3778
      for node, nres in result.items():
3779
        nres.Raise("Cannot resync disks on node %s" % node)
3780
        node_done, node_percent = nres.payload
3781
        all_done = all_done and node_done
3782
        if node_percent is not None:
3783
          min_percent = min(min_percent, node_percent)
3784
      if not all_done:
3785
        if min_percent < 100:
3786
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3787
        time.sleep(2)
3788

    
3789
  def _EnsureSecondary(self, node):
3790
    """Demote a node to secondary.
3791

3792
    """
3793
    self.feedback_fn("* switching node %s to secondary mode" % node)
3794

    
3795
    for dev in self.instance.disks:
3796
      self.cfg.SetDiskID(dev, node)
3797

    
3798
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3799
                                          self.instance.disks)
3800
    result.Raise("Cannot change disk to secondary on node %s" % node)
3801

    
3802
  def _GoStandalone(self):
3803
    """Disconnect from the network.
3804

3805
    """
3806
    self.feedback_fn("* changing into standalone mode")
3807
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3808
                                               self.instance.disks)
3809
    for node, nres in result.items():
3810
      nres.Raise("Cannot disconnect disks node %s" % node)
3811

    
3812
  def _GoReconnect(self, multimaster):
3813
    """Reconnect to the network.
3814

3815
    """
3816
    if multimaster:
3817
      msg = "dual-master"
3818
    else:
3819
      msg = "single-master"
3820
    self.feedback_fn("* changing disks into %s mode" % msg)
3821
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3822
                                           self.instance.disks,
3823
                                           self.instance.name, multimaster)
3824
    for node, nres in result.items():
3825
      nres.Raise("Cannot change disks config on node %s" % node)
3826

    
3827
  def _ExecCleanup(self):
3828
    """Try to cleanup after a failed migration.
3829

3830
    The cleanup is done by:
3831
      - check that the instance is running only on one node
3832
        (and update the config if needed)
3833
      - change disks on its secondary node to secondary
3834
      - wait until disks are fully synchronized
3835
      - disconnect from the network
3836
      - change disks into single-master mode
3837
      - wait again until disks are fully synchronized
3838

3839
    """
3840
    instance = self.instance
3841
    target_node = self.target_node
3842
    source_node = self.source_node
3843

    
3844
    # check running on only one node
3845
    self.feedback_fn("* checking where the instance actually runs"
3846
                     " (if this hangs, the hypervisor might be in"
3847
                     " a bad state)")
3848
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3849
    for node, result in ins_l.items():
3850
      result.Raise("Can't contact node %s" % node)
3851

    
3852
    runningon_source = instance.name in ins_l[source_node].payload
3853
    runningon_target = instance.name in ins_l[target_node].payload
3854

    
3855
    if runningon_source and runningon_target:
3856
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3857
                               " or the hypervisor is confused. You will have"
3858
                               " to ensure manually that it runs only on one"
3859
                               " and restart this operation.")
3860

    
3861
    if not (runningon_source or runningon_target):
3862
      raise errors.OpExecError("Instance does not seem to be running at all."
3863
                               " In this case, it's safer to repair by"
3864
                               " running 'gnt-instance stop' to ensure disk"
3865
                               " shutdown, and then restarting it.")
3866

    
3867
    if runningon_target:
3868
      # the migration has actually succeeded, we need to update the config
3869
      self.feedback_fn("* instance running on secondary node (%s),"
3870
                       " updating config" % target_node)
3871
      instance.primary_node = target_node
3872
      self.cfg.Update(instance)
3873
      demoted_node = source_node
3874
    else:
3875
      self.feedback_fn("* instance confirmed to be running on its"
3876
                       " primary node (%s)" % source_node)
3877
      demoted_node = target_node
3878

    
3879
    self._EnsureSecondary(demoted_node)
3880
    try:
3881
      self._WaitUntilSync()
3882
    except errors.OpExecError:
3883
      # we ignore here errors, since if the device is standalone, it
3884
      # won't be able to sync
3885
      pass
3886
    self._GoStandalone()
3887
    self._GoReconnect(False)
3888
    self._WaitUntilSync()
3889

    
3890
    self.feedback_fn("* done")
3891

    
3892
  def _RevertDiskStatus(self):
3893
    """Try to revert the disk status after a failed migration.
3894

3895
    """
3896
    target_node = self.target_node
3897
    try:
3898
      self._EnsureSecondary(target_node)
3899
      self._GoStandalone()
3900
      self._GoReconnect(False)
3901
      self._WaitUntilSync()
3902
    except errors.OpExecError, err:
3903
      self.LogWarning("Migration failed and I can't reconnect the"
3904
                      " drives: error '%s'\n"
3905
                      "Please look and recover the instance status" %
3906
                      str(err))
3907

    
3908
  def _AbortMigration(self):
3909
    """Call the hypervisor code to abort a started migration.
3910

3911
    """
3912
    instance = self.instance
3913
    target_node = self.target_node
3914
    migration_info = self.migration_info
3915

    
3916
    abort_result = self.rpc.call_finalize_migration(target_node,
3917
                                                    instance,
3918
                                                    migration_info,
3919
                                                    False)
3920
    abort_msg = abort_result.fail_msg
3921
    if abort_msg:
3922
      logging.error("Aborting migration failed on target node %s: %s" %
3923
                    (target_node, abort_msg))
3924
      # Don't raise an exception here, as we stil have to try to revert the
3925
      # disk status, even if this step failed.
3926

    
3927
  def _ExecMigration(self):
3928
    """Migrate an instance.
3929

3930
    The migrate is done by:
3931
      - change the disks into dual-master mode
3932
      - wait until disks are fully synchronized again
3933
      - migrate the instance
3934
      - change disks on the new secondary node (the old primary) to secondary
3935
      - wait until disks are fully synchronized
3936
      - change disks into single-master mode
3937

3938
    """
3939
    instance = self.instance
3940
    target_node = self.target_node
3941
    source_node = self.source_node
3942

    
3943
    self.feedback_fn("* checking disk consistency between source and target")
3944
    for dev in instance.disks:
3945
      if not _CheckDiskConsistency(self, dev, target_node, False):
3946
        raise errors.OpExecError("Disk %s is degraded or not fully"
3947
                                 " synchronized on target node,"
3948
                                 " aborting migrate." % dev.iv_name)
3949

    
3950
    # First get the migration information from the remote node
3951
    result = self.rpc.call_migration_info(source_node, instance)
3952
    msg = result.fail_msg
3953
    if msg:
3954
      log_err = ("Failed fetching source migration information from %s: %s" %
3955
                 (source_node, msg))
3956
      logging.error(log_err)
3957
      raise errors.OpExecError(log_err)
3958

    
3959
    self.migration_info = migration_info = result.payload
3960

    
3961
    # Then switch the disks to master/master mode
3962
    self._EnsureSecondary(target_node)
3963
    self._GoStandalone()
3964
    self._GoReconnect(True)
3965
    self._WaitUntilSync()
3966

    
3967
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3968
    result = self.rpc.call_accept_instance(target_node,
3969
                                           instance,
3970
                                           migration_info,
3971
                                           self.nodes_ip[target_node])
3972

    
3973
    msg = result.fail_msg
3974
    if msg:
3975
      logging.error("Instance pre-migration failed, trying to revert"
3976
                    " disk status: %s", msg)
3977
      self._AbortMigration()
3978
      self._RevertDiskStatus()
3979
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3980
                               (instance.name, msg))
3981

    
3982
    self.feedback_fn("* migrating instance to %s" % target_node)
3983
    time.sleep(10)
3984
    result = self.rpc.call_instance_migrate(source_node, instance,
3985
                                            self.nodes_ip[target_node],
3986
                                            self.op.live)
3987
    msg = result.fail_msg
3988
    if msg:
3989
      logging.error("Instance migration failed, trying to revert"
3990
                    " disk status: %s", msg)