Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 9ebe9556

History | View | Annotate | Download (249.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import tempfile
30
import re
31
import platform
32
import logging
33
import copy
34
import random
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45
from ganeti import ssconf
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overriden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98
    self.CheckArguments()
99

    
100
  def __GetSSH(self):
101
    """Returns the SshRunner object
102

103
    """
104
    if not self.__ssh:
105
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106
    return self.__ssh
107

    
108
  ssh = property(fget=__GetSSH)
109

    
110
  def CheckArguments(self):
111
    """Check syntactic validity for the opcode arguments.
112

113
    This method is for doing a simple syntactic check and ensure
114
    validity of opcode parameters, without any cluster-related
115
    checks. While the same can be accomplished in ExpandNames and/or
116
    CheckPrereq, doing these separate is better because:
117

118
      - ExpandNames is left as as purely a lock-related function
119
      - CheckPrereq is run after we have aquired locks (and possible
120
        waited for them)
121

122
    The function is allowed to change the self.op attribute so that
123
    later methods can no longer worry about missing parameters.
124

125
    """
126
    pass
127

    
128
  def ExpandNames(self):
129
    """Expand names for this LU.
130

131
    This method is called before starting to execute the opcode, and it should
132
    update all the parameters of the opcode to their canonical form (e.g. a
133
    short node name must be fully expanded after this method has successfully
134
    completed). This way locking, hooks, logging, ecc. can work correctly.
135

136
    LUs which implement this method must also populate the self.needed_locks
137
    member, as a dict with lock levels as keys, and a list of needed lock names
138
    as values. Rules:
139

140
      - use an empty dict if you don't need any lock
141
      - if you don't need any lock at a particular level omit that level
142
      - don't put anything for the BGL level
143
      - if you want all locks at a level use locking.ALL_SET as a value
144

145
    If you need to share locks (rather than acquire them exclusively) at one
146
    level you can modify self.share_locks, setting a true value (usually 1) for
147
    that level. By default locks are not shared.
148

149
    Examples::
150

151
      # Acquire all nodes and one instance
152
      self.needed_locks = {
153
        locking.LEVEL_NODE: locking.ALL_SET,
154
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155
      }
156
      # Acquire just two nodes
157
      self.needed_locks = {
158
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159
      }
160
      # Acquire no locks
161
      self.needed_locks = {} # No, you can't leave it to the default value None
162

163
    """
164
    # The implementation of this method is mandatory only if the new LU is
165
    # concurrent, so that old LUs don't need to be changed all at the same
166
    # time.
167
    if self.REQ_BGL:
168
      self.needed_locks = {} # Exclusive LUs don't need locks.
169
    else:
170
      raise NotImplementedError
171

    
172
  def DeclareLocks(self, level):
173
    """Declare LU locking needs for a level
174

175
    While most LUs can just declare their locking needs at ExpandNames time,
176
    sometimes there's the need to calculate some locks after having acquired
177
    the ones before. This function is called just before acquiring locks at a
178
    particular level, but after acquiring the ones at lower levels, and permits
179
    such calculations. It can be used to modify self.needed_locks, and by
180
    default it does nothing.
181

182
    This function is only called if you have something already set in
183
    self.needed_locks for the level.
184

185
    @param level: Locking level which is going to be locked
186
    @type level: member of ganeti.locking.LEVELS
187

188
    """
189

    
190
  def CheckPrereq(self):
191
    """Check prerequisites for this LU.
192

193
    This method should check that the prerequisites for the execution
194
    of this LU are fulfilled. It can do internode communication, but
195
    it should be idempotent - no cluster or system changes are
196
    allowed.
197

198
    The method should raise errors.OpPrereqError in case something is
199
    not fulfilled. Its return value is ignored.
200

201
    This method should also update all the parameters of the opcode to
202
    their canonical form if it hasn't been done by ExpandNames before.
203

204
    """
205
    raise NotImplementedError
206

    
207
  def Exec(self, feedback_fn):
208
    """Execute the LU.
209

210
    This method should implement the actual work. It should raise
211
    errors.OpExecError for failures that are somewhat dealt with in
212
    code, or expected.
213

214
    """
215
    raise NotImplementedError
216

    
217
  def BuildHooksEnv(self):
218
    """Build hooks environment for this LU.
219

220
    This method should return a three-node tuple consisting of: a dict
221
    containing the environment that will be used for running the
222
    specific hook for this LU, a list of node names on which the hook
223
    should run before the execution, and a list of node names on which
224
    the hook should run after the execution.
225

226
    The keys of the dict must not have 'GANETI_' prefixed as this will
227
    be handled in the hooks runner. Also note additional keys will be
228
    added by the hooks runner. If the LU doesn't define any
229
    environment, an empty dict (and not None) should be returned.
230

231
    No nodes should be returned as an empty list (and not None).
232

233
    Note that if the HPATH for a LU class is None, this function will
234
    not be called.
235

236
    """
237
    raise NotImplementedError
238

    
239
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240
    """Notify the LU about the results of its hooks.
241

242
    This method is called every time a hooks phase is executed, and notifies
243
    the Logical Unit about the hooks' result. The LU can then use it to alter
244
    its result based on the hooks.  By default the method does nothing and the
245
    previous result is passed back unchanged but any LU can define it if it
246
    wants to use the local cluster hook-scripts somehow.
247

248
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
249
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250
    @param hook_results: the results of the multi-node hooks rpc call
251
    @param feedback_fn: function used send feedback back to the caller
252
    @param lu_result: the previous Exec result this LU had, or None
253
        in the PRE phase
254
    @return: the new Exec result, based on the previous result
255
        and hook results
256

257
    """
258
    return lu_result
259

    
260
  def _ExpandAndLockInstance(self):
261
    """Helper function to expand and lock an instance.
262

263
    Many LUs that work on an instance take its name in self.op.instance_name
264
    and need to expand it and then declare the expanded name for locking. This
265
    function does it, and then updates self.op.instance_name to the expanded
266
    name. It also initializes needed_locks as a dict, if this hasn't been done
267
    before.
268

269
    """
270
    if self.needed_locks is None:
271
      self.needed_locks = {}
272
    else:
273
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274
        "_ExpandAndLockInstance called with instance-level locks set"
275
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276
    if expanded_name is None:
277
      raise errors.OpPrereqError("Instance '%s' not known" %
278
                                  self.op.instance_name)
279
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280
    self.op.instance_name = expanded_name
281

    
282
  def _LockInstancesNodes(self, primary_only=False):
283
    """Helper function to declare instances' nodes for locking.
284

285
    This function should be called after locking one or more instances to lock
286
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287
    with all primary or secondary nodes for instances already locked and
288
    present in self.needed_locks[locking.LEVEL_INSTANCE].
289

290
    It should be called from DeclareLocks, and for safety only works if
291
    self.recalculate_locks[locking.LEVEL_NODE] is set.
292

293
    In the future it may grow parameters to just lock some instance's nodes, or
294
    to just lock primaries or secondary nodes, if needed.
295

296
    If should be called in DeclareLocks in a way similar to::
297

298
      if level == locking.LEVEL_NODE:
299
        self._LockInstancesNodes()
300

301
    @type primary_only: boolean
302
    @param primary_only: only lock primary nodes of locked instances
303

304
    """
305
    assert locking.LEVEL_NODE in self.recalculate_locks, \
306
      "_LockInstancesNodes helper function called with no nodes to recalculate"
307

    
308
    # TODO: check if we're really been called with the instance locks held
309

    
310
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311
    # future we might want to have different behaviors depending on the value
312
    # of self.recalculate_locks[locking.LEVEL_NODE]
313
    wanted_nodes = []
314
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315
      instance = self.context.cfg.GetInstanceInfo(instance_name)
316
      wanted_nodes.append(instance.primary_node)
317
      if not primary_only:
318
        wanted_nodes.extend(instance.secondary_nodes)
319

    
320
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324

    
325
    del self.recalculate_locks[locking.LEVEL_NODE]
326

    
327

    
328
class NoHooksLU(LogicalUnit):
329
  """Simple LU which runs no hooks.
330

331
  This LU is intended as a parent for other LogicalUnits which will
332
  run no hooks, in order to reduce duplicate code.
333

334
  """
335
  HPATH = None
336
  HTYPE = None
337

    
338

    
339
def _GetWantedNodes(lu, nodes):
340
  """Returns list of checked and expanded node names.
341

342
  @type lu: L{LogicalUnit}
343
  @param lu: the logical unit on whose behalf we execute
344
  @type nodes: list
345
  @param nodes: list of node names or None for all nodes
346
  @rtype: list
347
  @return: the list of nodes, sorted
348
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349

350
  """
351
  if not isinstance(nodes, list):
352
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
353

    
354
  if not nodes:
355
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356
      " non-empty list of nodes whose name is to be expanded.")
357

    
358
  wanted = []
359
  for name in nodes:
360
    node = lu.cfg.ExpandNodeName(name)
361
    if node is None:
362
      raise errors.OpPrereqError("No such node name '%s'" % name)
363
    wanted.append(node)
364

    
365
  return utils.NiceSort(wanted)
366

    
367

    
368
def _GetWantedInstances(lu, instances):
369
  """Returns list of checked and expanded instance names.
370

371
  @type lu: L{LogicalUnit}
372
  @param lu: the logical unit on whose behalf we execute
373
  @type instances: list
374
  @param instances: list of instance names or None for all instances
375
  @rtype: list
376
  @return: the list of instances, sorted
377
  @raise errors.OpPrereqError: if the instances parameter is wrong type
378
  @raise errors.OpPrereqError: if any of the passed instances is not found
379

380
  """
381
  if not isinstance(instances, list):
382
    raise errors.OpPrereqError("Invalid argument type 'instances'")
383

    
384
  if instances:
385
    wanted = []
386

    
387
    for name in instances:
388
      instance = lu.cfg.ExpandInstanceName(name)
389
      if instance is None:
390
        raise errors.OpPrereqError("No such instance name '%s'" % name)
391
      wanted.append(instance)
392

    
393
  else:
394
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395
  return wanted
396

    
397

    
398
def _CheckOutputFields(static, dynamic, selected):
399
  """Checks whether all selected fields are valid.
400

401
  @type static: L{utils.FieldSet}
402
  @param static: static fields set
403
  @type dynamic: L{utils.FieldSet}
404
  @param dynamic: dynamic fields set
405

406
  """
407
  f = utils.FieldSet()
408
  f.Extend(static)
409
  f.Extend(dynamic)
410

    
411
  delta = f.NonMatching(selected)
412
  if delta:
413
    raise errors.OpPrereqError("Unknown output fields selected: %s"
414
                               % ",".join(delta))
415

    
416

    
417
def _CheckBooleanOpField(op, name):
418
  """Validates boolean opcode parameters.
419

420
  This will ensure that an opcode parameter is either a boolean value,
421
  or None (but that it always exists).
422

423
  """
424
  val = getattr(op, name, None)
425
  if not (val is None or isinstance(val, bool)):
426
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427
                               (name, str(val)))
428
  setattr(op, name, val)
429

    
430

    
431
def _CheckNodeOnline(lu, node):
432
  """Ensure that a given node is online.
433

434
  @param lu: the LU on behalf of which we make the check
435
  @param node: the node to check
436
  @raise errors.OpPrereqError: if the node is offline
437

438
  """
439
  if lu.cfg.GetNodeInfo(node).offline:
440
    raise errors.OpPrereqError("Can't use offline node %s" % node)
441

    
442

    
443
def _CheckNodeNotDrained(lu, node):
444
  """Ensure that a given node is not drained.
445

446
  @param lu: the LU on behalf of which we make the check
447
  @param node: the node to check
448
  @raise errors.OpPrereqError: if the node is drained
449

450
  """
451
  if lu.cfg.GetNodeInfo(node).drained:
452
    raise errors.OpPrereqError("Can't use drained node %s" % node)
453

    
454

    
455
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456
                          memory, vcpus, nics, disk_template, disks):
457
  """Builds instance related env variables for hooks
458

459
  This builds the hook environment from individual variables.
460

461
  @type name: string
462
  @param name: the name of the instance
463
  @type primary_node: string
464
  @param primary_node: the name of the instance's primary node
465
  @type secondary_nodes: list
466
  @param secondary_nodes: list of secondary nodes as strings
467
  @type os_type: string
468
  @param os_type: the name of the instance's OS
469
  @type status: boolean
470
  @param status: the should_run status of the instance
471
  @type memory: string
472
  @param memory: the memory size of the instance
473
  @type vcpus: string
474
  @param vcpus: the count of VCPUs the instance has
475
  @type nics: list
476
  @param nics: list of tuples (ip, bridge, mac) representing
477
      the NICs the instance  has
478
  @type disk_template: string
479
  @param disk_template: the distk template of the instance
480
  @type disks: list
481
  @param disks: the list of (size, mode) pairs
482
  @rtype: dict
483
  @return: the hook environment for this instance
484

485
  """
486
  if status:
487
    str_status = "up"
488
  else:
489
    str_status = "down"
490
  env = {
491
    "OP_TARGET": name,
492
    "INSTANCE_NAME": name,
493
    "INSTANCE_PRIMARY": primary_node,
494
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495
    "INSTANCE_OS_TYPE": os_type,
496
    "INSTANCE_STATUS": str_status,
497
    "INSTANCE_MEMORY": memory,
498
    "INSTANCE_VCPUS": vcpus,
499
    "INSTANCE_DISK_TEMPLATE": disk_template,
500
  }
501

    
502
  if nics:
503
    nic_count = len(nics)
504
    for idx, (ip, bridge, mac) in enumerate(nics):
505
      if ip is None:
506
        ip = ""
507
      env["INSTANCE_NIC%d_IP" % idx] = ip
508
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509
      env["INSTANCE_NIC%d_MAC" % idx] = mac
510
  else:
511
    nic_count = 0
512

    
513
  env["INSTANCE_NIC_COUNT"] = nic_count
514

    
515
  if disks:
516
    disk_count = len(disks)
517
    for idx, (size, mode) in enumerate(disks):
518
      env["INSTANCE_DISK%d_SIZE" % idx] = size
519
      env["INSTANCE_DISK%d_MODE" % idx] = mode
520
  else:
521
    disk_count = 0
522

    
523
  env["INSTANCE_DISK_COUNT"] = disk_count
524

    
525
  return env
526

    
527

    
528
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529
  """Builds instance related env variables for hooks from an object.
530

531
  @type lu: L{LogicalUnit}
532
  @param lu: the logical unit on whose behalf we execute
533
  @type instance: L{objects.Instance}
534
  @param instance: the instance for which we should build the
535
      environment
536
  @type override: dict
537
  @param override: dictionary with key/values that will override
538
      our values
539
  @rtype: dict
540
  @return: the hook environment dictionary
541

542
  """
543
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
544
  args = {
545
    'name': instance.name,
546
    'primary_node': instance.primary_node,
547
    'secondary_nodes': instance.secondary_nodes,
548
    'os_type': instance.os,
549
    'status': instance.admin_up,
550
    'memory': bep[constants.BE_MEMORY],
551
    'vcpus': bep[constants.BE_VCPUS],
552
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553
    'disk_template': instance.disk_template,
554
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
555
  }
556
  if override:
557
    args.update(override)
558
  return _BuildInstanceHookEnv(**args)
559

    
560

    
561
def _AdjustCandidatePool(lu):
562
  """Adjust the candidate pool after node operations.
563

564
  """
565
  mod_list = lu.cfg.MaintainCandidatePool()
566
  if mod_list:
567
    lu.LogInfo("Promoted nodes to master candidate role: %s",
568
               ", ".join(node.name for node in mod_list))
569
    for name in mod_list:
570
      lu.context.ReaddNode(name)
571
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
572
  if mc_now > mc_max:
573
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
574
               (mc_now, mc_max))
575

    
576

    
577
def _CheckNicsBridgesExist(lu, target_nics, target_node,
578
                               profile=constants.PP_DEFAULT):
579
  """Check that the brigdes needed by a list of nics exist.
580

581
  """
582
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
583
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
584
                for nic in target_nics]
585
  brlist = [params[constants.NIC_LINK] for params in paramslist
586
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
587
  if brlist:
588
    result = lu.rpc.call_bridges_exist(target_node, brlist)
589
    result.Raise()
590
    if not result.data:
591
      raise errors.OpPrereqError("One or more target bridges %s does not"
592
                                 " exist on destination node '%s'" %
593
                                 (brlist, target_node))
594

    
595

    
596
def _CheckInstanceBridgesExist(lu, instance, node=None):
597
  """Check that the brigdes needed by an instance exist.
598

599
  """
600
  if node is None:
601
    node=instance.primary_node
602
  _CheckNicsBridgesExist(lu, instance.nics, node)
603

    
604

    
605
class LUDestroyCluster(NoHooksLU):
606
  """Logical unit for destroying the cluster.
607

608
  """
609
  _OP_REQP = []
610

    
611
  def CheckPrereq(self):
612
    """Check prerequisites.
613

614
    This checks whether the cluster is empty.
615

616
    Any errors are signalled by raising errors.OpPrereqError.
617

618
    """
619
    master = self.cfg.GetMasterNode()
620

    
621
    nodelist = self.cfg.GetNodeList()
622
    if len(nodelist) != 1 or nodelist[0] != master:
623
      raise errors.OpPrereqError("There are still %d node(s) in"
624
                                 " this cluster." % (len(nodelist) - 1))
625
    instancelist = self.cfg.GetInstanceList()
626
    if instancelist:
627
      raise errors.OpPrereqError("There are still %d instance(s) in"
628
                                 " this cluster." % len(instancelist))
629

    
630
  def Exec(self, feedback_fn):
631
    """Destroys the cluster.
632

633
    """
634
    master = self.cfg.GetMasterNode()
635
    result = self.rpc.call_node_stop_master(master, False)
636
    result.Raise()
637
    if not result.data:
638
      raise errors.OpExecError("Could not disable the master role")
639
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640
    utils.CreateBackup(priv_key)
641
    utils.CreateBackup(pub_key)
642
    return master
643

    
644

    
645
class LUVerifyCluster(LogicalUnit):
646
  """Verifies the cluster status.
647

648
  """
649
  HPATH = "cluster-verify"
650
  HTYPE = constants.HTYPE_CLUSTER
651
  _OP_REQP = ["skip_checks"]
652
  REQ_BGL = False
653

    
654
  def ExpandNames(self):
655
    self.needed_locks = {
656
      locking.LEVEL_NODE: locking.ALL_SET,
657
      locking.LEVEL_INSTANCE: locking.ALL_SET,
658
    }
659
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
660

    
661
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
662
                  node_result, feedback_fn, master_files,
663
                  drbd_map, vg_name):
664
    """Run multiple tests against a node.
665

666
    Test list:
667

668
      - compares ganeti version
669
      - checks vg existance and size > 20G
670
      - checks config file checksum
671
      - checks ssh to other nodes
672

673
    @type nodeinfo: L{objects.Node}
674
    @param nodeinfo: the node to check
675
    @param file_list: required list of files
676
    @param local_cksum: dictionary of local files and their checksums
677
    @param node_result: the results from the node
678
    @param feedback_fn: function used to accumulate results
679
    @param master_files: list of files that only masters should have
680
    @param drbd_map: the useddrbd minors for this node, in
681
        form of minor: (instance, must_exist) which correspond to instances
682
        and their running status
683
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
684

685
    """
686
    node = nodeinfo.name
687

    
688
    # main result, node_result should be a non-empty dict
689
    if not node_result or not isinstance(node_result, dict):
690
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
691
      return True
692

    
693
    # compares ganeti version
694
    local_version = constants.PROTOCOL_VERSION
695
    remote_version = node_result.get('version', None)
696
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
697
            len(remote_version) == 2):
698
      feedback_fn("  - ERROR: connection to %s failed" % (node))
699
      return True
700

    
701
    if local_version != remote_version[0]:
702
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
703
                  " node %s %s" % (local_version, node, remote_version[0]))
704
      return True
705

    
706
    # node seems compatible, we can actually try to look into its results
707

    
708
    bad = False
709

    
710
    # full package version
711
    if constants.RELEASE_VERSION != remote_version[1]:
712
      feedback_fn("  - WARNING: software version mismatch: master %s,"
713
                  " node %s %s" %
714
                  (constants.RELEASE_VERSION, node, remote_version[1]))
715

    
716
    # checks vg existence and size > 20G
717
    if vg_name is not None:
718
      vglist = node_result.get(constants.NV_VGLIST, None)
719
      if not vglist:
720
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
721
                        (node,))
722
        bad = True
723
      else:
724
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
725
                                              constants.MIN_VG_SIZE)
726
        if vgstatus:
727
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728
          bad = True
729

    
730
    # checks config file checksum
731

    
732
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
733
    if not isinstance(remote_cksum, dict):
734
      bad = True
735
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
736
    else:
737
      for file_name in file_list:
738
        node_is_mc = nodeinfo.master_candidate
739
        must_have_file = file_name not in master_files
740
        if file_name not in remote_cksum:
741
          if node_is_mc or must_have_file:
742
            bad = True
743
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
744
        elif remote_cksum[file_name] != local_cksum[file_name]:
745
          if node_is_mc or must_have_file:
746
            bad = True
747
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
748
          else:
749
            # not candidate and this is not a must-have file
750
            bad = True
751
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
752
                        " '%s'" % file_name)
753
        else:
754
          # all good, except non-master/non-must have combination
755
          if not node_is_mc and not must_have_file:
756
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
757
                        " candidates" % file_name)
758

    
759
    # checks ssh to any
760

    
761
    if constants.NV_NODELIST not in node_result:
762
      bad = True
763
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
764
    else:
765
      if node_result[constants.NV_NODELIST]:
766
        bad = True
767
        for node in node_result[constants.NV_NODELIST]:
768
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
769
                          (node, node_result[constants.NV_NODELIST][node]))
770

    
771
    if constants.NV_NODENETTEST not in node_result:
772
      bad = True
773
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
774
    else:
775
      if node_result[constants.NV_NODENETTEST]:
776
        bad = True
777
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
778
        for node in nlist:
779
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
780
                          (node, node_result[constants.NV_NODENETTEST][node]))
781

    
782
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
783
    if isinstance(hyp_result, dict):
784
      for hv_name, hv_result in hyp_result.iteritems():
785
        if hv_result is not None:
786
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
787
                      (hv_name, hv_result))
788

    
789
    # check used drbd list
790
    if vg_name is not None:
791
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
792
      if not isinstance(used_minors, (tuple, list)):
793
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
794
                    str(used_minors))
795
      else:
796
        for minor, (iname, must_exist) in drbd_map.items():
797
          if minor not in used_minors and must_exist:
798
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
799
                        " not active" % (minor, iname))
800
            bad = True
801
        for minor in used_minors:
802
          if minor not in drbd_map:
803
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
804
                        minor)
805
            bad = True
806

    
807
    return bad
808

    
809
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
810
                      node_instance, feedback_fn, n_offline):
811
    """Verify an instance.
812

813
    This function checks to see if the required block devices are
814
    available on the instance's node.
815

816
    """
817
    bad = False
818

    
819
    node_current = instanceconfig.primary_node
820

    
821
    node_vol_should = {}
822
    instanceconfig.MapLVsByNode(node_vol_should)
823

    
824
    for node in node_vol_should:
825
      if node in n_offline:
826
        # ignore missing volumes on offline nodes
827
        continue
828
      for volume in node_vol_should[node]:
829
        if node not in node_vol_is or volume not in node_vol_is[node]:
830
          feedback_fn("  - ERROR: volume %s missing on node %s" %
831
                          (volume, node))
832
          bad = True
833

    
834
    if instanceconfig.admin_up:
835
      if ((node_current not in node_instance or
836
          not instance in node_instance[node_current]) and
837
          node_current not in n_offline):
838
        feedback_fn("  - ERROR: instance %s not running on node %s" %
839
                        (instance, node_current))
840
        bad = True
841

    
842
    for node in node_instance:
843
      if (not node == node_current):
844
        if instance in node_instance[node]:
845
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
846
                          (instance, node))
847
          bad = True
848

    
849
    return bad
850

    
851
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
852
    """Verify if there are any unknown volumes in the cluster.
853

854
    The .os, .swap and backup volumes are ignored. All other volumes are
855
    reported as unknown.
856

857
    """
858
    bad = False
859

    
860
    for node in node_vol_is:
861
      for volume in node_vol_is[node]:
862
        if node not in node_vol_should or volume not in node_vol_should[node]:
863
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
864
                      (volume, node))
865
          bad = True
866
    return bad
867

    
868
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
869
    """Verify the list of running instances.
870

871
    This checks what instances are running but unknown to the cluster.
872

873
    """
874
    bad = False
875
    for node in node_instance:
876
      for runninginstance in node_instance[node]:
877
        if runninginstance not in instancelist:
878
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
879
                          (runninginstance, node))
880
          bad = True
881
    return bad
882

    
883
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
884
    """Verify N+1 Memory Resilience.
885

886
    Check that if one single node dies we can still start all the instances it
887
    was primary for.
888

889
    """
890
    bad = False
891

    
892
    for node, nodeinfo in node_info.iteritems():
893
      # This code checks that every node which is now listed as secondary has
894
      # enough memory to host all instances it is supposed to should a single
895
      # other node in the cluster fail.
896
      # FIXME: not ready for failover to an arbitrary node
897
      # FIXME: does not support file-backed instances
898
      # WARNING: we currently take into account down instances as well as up
899
      # ones, considering that even if they're down someone might want to start
900
      # them even in the event of a node failure.
901
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
902
        needed_mem = 0
903
        for instance in instances:
904
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
905
          if bep[constants.BE_AUTO_BALANCE]:
906
            needed_mem += bep[constants.BE_MEMORY]
907
        if nodeinfo['mfree'] < needed_mem:
908
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
909
                      " failovers should node %s fail" % (node, prinode))
910
          bad = True
911
    return bad
912

    
913
  def CheckPrereq(self):
914
    """Check prerequisites.
915

916
    Transform the list of checks we're going to skip into a set and check that
917
    all its members are valid.
918

919
    """
920
    self.skip_set = frozenset(self.op.skip_checks)
921
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
922
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
923

    
924
  def BuildHooksEnv(self):
925
    """Build hooks env.
926

927
    Cluster-Verify hooks just rone in the post phase and their failure makes
928
    the output be logged in the verify output and the verification to fail.
929

930
    """
931
    all_nodes = self.cfg.GetNodeList()
932
    env = {
933
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
934
      }
935
    for node in self.cfg.GetAllNodesInfo().values():
936
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
937

    
938
    return env, [], all_nodes
939

    
940
  def Exec(self, feedback_fn):
941
    """Verify integrity of cluster, performing various test on nodes.
942

943
    """
944
    bad = False
945
    feedback_fn("* Verifying global settings")
946
    for msg in self.cfg.VerifyConfig():
947
      feedback_fn("  - ERROR: %s" % msg)
948

    
949
    vg_name = self.cfg.GetVGName()
950
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
951
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
952
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
953
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
954
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
955
                        for iname in instancelist)
956
    i_non_redundant = [] # Non redundant instances
957
    i_non_a_balanced = [] # Non auto-balanced instances
958
    n_offline = [] # List of offline nodes
959
    n_drained = [] # List of nodes being drained
960
    node_volume = {}
961
    node_instance = {}
962
    node_info = {}
963
    instance_cfg = {}
964

    
965
    # FIXME: verify OS list
966
    # do local checksums
967
    master_files = [constants.CLUSTER_CONF_FILE]
968

    
969
    file_names = ssconf.SimpleStore().GetFileList()
970
    file_names.append(constants.SSL_CERT_FILE)
971
    file_names.append(constants.RAPI_CERT_FILE)
972
    file_names.extend(master_files)
973

    
974
    local_checksums = utils.FingerprintFiles(file_names)
975

    
976
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
977
    node_verify_param = {
978
      constants.NV_FILELIST: file_names,
979
      constants.NV_NODELIST: [node.name for node in nodeinfo
980
                              if not node.offline],
981
      constants.NV_HYPERVISOR: hypervisors,
982
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
983
                                  node.secondary_ip) for node in nodeinfo
984
                                 if not node.offline],
985
      constants.NV_INSTANCELIST: hypervisors,
986
      constants.NV_VERSION: None,
987
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
988
      }
989
    if vg_name is not None:
990
      node_verify_param[constants.NV_VGLIST] = None
991
      node_verify_param[constants.NV_LVLIST] = vg_name
992
      node_verify_param[constants.NV_DRBDLIST] = None
993
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
994
                                           self.cfg.GetClusterName())
995

    
996
    cluster = self.cfg.GetClusterInfo()
997
    master_node = self.cfg.GetMasterNode()
998
    all_drbd_map = self.cfg.ComputeDRBDMap()
999

    
1000
    for node_i in nodeinfo:
1001
      node = node_i.name
1002
      nresult = all_nvinfo[node].data
1003

    
1004
      if node_i.offline:
1005
        feedback_fn("* Skipping offline node %s" % (node,))
1006
        n_offline.append(node)
1007
        continue
1008

    
1009
      if node == master_node:
1010
        ntype = "master"
1011
      elif node_i.master_candidate:
1012
        ntype = "master candidate"
1013
      elif node_i.drained:
1014
        ntype = "drained"
1015
        n_drained.append(node)
1016
      else:
1017
        ntype = "regular"
1018
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1019

    
1020
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1021
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1022
        bad = True
1023
        continue
1024

    
1025
      node_drbd = {}
1026
      for minor, instance in all_drbd_map[node].items():
1027
        if instance not in instanceinfo:
1028
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1029
                      instance)
1030
          # ghost instance should not be running, but otherwise we
1031
          # don't give double warnings (both ghost instance and
1032
          # unallocated minor in use)
1033
          node_drbd[minor] = (instance, False)
1034
        else:
1035
          instance = instanceinfo[instance]
1036
          node_drbd[minor] = (instance.name, instance.admin_up)
1037
      result = self._VerifyNode(node_i, file_names, local_checksums,
1038
                                nresult, feedback_fn, master_files,
1039
                                node_drbd, vg_name)
1040
      bad = bad or result
1041

    
1042
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1043
      if vg_name is None:
1044
        node_volume[node] = {}
1045
      elif isinstance(lvdata, basestring):
1046
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1047
                    (node, utils.SafeEncode(lvdata)))
1048
        bad = True
1049
        node_volume[node] = {}
1050
      elif not isinstance(lvdata, dict):
1051
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1052
        bad = True
1053
        continue
1054
      else:
1055
        node_volume[node] = lvdata
1056

    
1057
      # node_instance
1058
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1059
      if not isinstance(idata, list):
1060
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1061
                    (node,))
1062
        bad = True
1063
        continue
1064

    
1065
      node_instance[node] = idata
1066

    
1067
      # node_info
1068
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1069
      if not isinstance(nodeinfo, dict):
1070
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1071
        bad = True
1072
        continue
1073

    
1074
      try:
1075
        node_info[node] = {
1076
          "mfree": int(nodeinfo['memory_free']),
1077
          "pinst": [],
1078
          "sinst": [],
1079
          # dictionary holding all instances this node is secondary for,
1080
          # grouped by their primary node. Each key is a cluster node, and each
1081
          # value is a list of instances which have the key as primary and the
1082
          # current node as secondary.  this is handy to calculate N+1 memory
1083
          # availability if you can only failover from a primary to its
1084
          # secondary.
1085
          "sinst-by-pnode": {},
1086
        }
1087
        # FIXME: devise a free space model for file based instances as well
1088
        if vg_name is not None:
1089
          if (constants.NV_VGLIST not in nresult or
1090
              vg_name not in nresult[constants.NV_VGLIST]):
1091
            feedback_fn("  - ERROR: node %s didn't return data for the"
1092
                        " volume group '%s' - it is either missing or broken" %
1093
                        (node, vg_name))
1094
            bad = True
1095
            continue
1096
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1097
      except (ValueError, KeyError):
1098
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1099
                    " from node %s" % (node,))
1100
        bad = True
1101
        continue
1102

    
1103
    node_vol_should = {}
1104

    
1105
    for instance in instancelist:
1106
      feedback_fn("* Verifying instance %s" % instance)
1107
      inst_config = instanceinfo[instance]
1108
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1109
                                     node_instance, feedback_fn, n_offline)
1110
      bad = bad or result
1111
      inst_nodes_offline = []
1112

    
1113
      inst_config.MapLVsByNode(node_vol_should)
1114

    
1115
      instance_cfg[instance] = inst_config
1116

    
1117
      pnode = inst_config.primary_node
1118
      if pnode in node_info:
1119
        node_info[pnode]['pinst'].append(instance)
1120
      elif pnode not in n_offline:
1121
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1122
                    " %s failed" % (instance, pnode))
1123
        bad = True
1124

    
1125
      if pnode in n_offline:
1126
        inst_nodes_offline.append(pnode)
1127

    
1128
      # If the instance is non-redundant we cannot survive losing its primary
1129
      # node, so we are not N+1 compliant. On the other hand we have no disk
1130
      # templates with more than one secondary so that situation is not well
1131
      # supported either.
1132
      # FIXME: does not support file-backed instances
1133
      if len(inst_config.secondary_nodes) == 0:
1134
        i_non_redundant.append(instance)
1135
      elif len(inst_config.secondary_nodes) > 1:
1136
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1137
                    % instance)
1138

    
1139
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1140
        i_non_a_balanced.append(instance)
1141

    
1142
      for snode in inst_config.secondary_nodes:
1143
        if snode in node_info:
1144
          node_info[snode]['sinst'].append(instance)
1145
          if pnode not in node_info[snode]['sinst-by-pnode']:
1146
            node_info[snode]['sinst-by-pnode'][pnode] = []
1147
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1148
        elif snode not in n_offline:
1149
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1150
                      " %s failed" % (instance, snode))
1151
          bad = True
1152
        if snode in n_offline:
1153
          inst_nodes_offline.append(snode)
1154

    
1155
      if inst_nodes_offline:
1156
        # warn that the instance lives on offline nodes, and set bad=True
1157
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1158
                    ", ".join(inst_nodes_offline))
1159
        bad = True
1160

    
1161
    feedback_fn("* Verifying orphan volumes")
1162
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1163
                                       feedback_fn)
1164
    bad = bad or result
1165

    
1166
    feedback_fn("* Verifying remaining instances")
1167
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1168
                                         feedback_fn)
1169
    bad = bad or result
1170

    
1171
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1172
      feedback_fn("* Verifying N+1 Memory redundancy")
1173
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1174
      bad = bad or result
1175

    
1176
    feedback_fn("* Other Notes")
1177
    if i_non_redundant:
1178
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1179
                  % len(i_non_redundant))
1180

    
1181
    if i_non_a_balanced:
1182
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1183
                  % len(i_non_a_balanced))
1184

    
1185
    if n_offline:
1186
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1187

    
1188
    if n_drained:
1189
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1190

    
1191
    return not bad
1192

    
1193
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1194
    """Analize the post-hooks' result
1195

1196
    This method analyses the hook result, handles it, and sends some
1197
    nicely-formatted feedback back to the user.
1198

1199
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1200
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1201
    @param hooks_results: the results of the multi-node hooks rpc call
1202
    @param feedback_fn: function used send feedback back to the caller
1203
    @param lu_result: previous Exec result
1204
    @return: the new Exec result, based on the previous result
1205
        and hook results
1206

1207
    """
1208
    # We only really run POST phase hooks, and are only interested in
1209
    # their results
1210
    if phase == constants.HOOKS_PHASE_POST:
1211
      # Used to change hooks' output to proper indentation
1212
      indent_re = re.compile('^', re.M)
1213
      feedback_fn("* Hooks Results")
1214
      if not hooks_results:
1215
        feedback_fn("  - ERROR: general communication failure")
1216
        lu_result = 1
1217
      else:
1218
        for node_name in hooks_results:
1219
          show_node_header = True
1220
          res = hooks_results[node_name]
1221
          if res.failed or res.data is False or not isinstance(res.data, list):
1222
            if res.offline:
1223
              # no need to warn or set fail return value
1224
              continue
1225
            feedback_fn("    Communication failure in hooks execution")
1226
            lu_result = 1
1227
            continue
1228
          for script, hkr, output in res.data:
1229
            if hkr == constants.HKR_FAIL:
1230
              # The node header is only shown once, if there are
1231
              # failing hooks on that node
1232
              if show_node_header:
1233
                feedback_fn("  Node %s:" % node_name)
1234
                show_node_header = False
1235
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1236
              output = indent_re.sub('      ', output)
1237
              feedback_fn("%s" % output)
1238
              lu_result = 1
1239

    
1240
      return lu_result
1241

    
1242

    
1243
class LUVerifyDisks(NoHooksLU):
1244
  """Verifies the cluster disks status.
1245

1246
  """
1247
  _OP_REQP = []
1248
  REQ_BGL = False
1249

    
1250
  def ExpandNames(self):
1251
    self.needed_locks = {
1252
      locking.LEVEL_NODE: locking.ALL_SET,
1253
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1254
    }
1255
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1256

    
1257
  def CheckPrereq(self):
1258
    """Check prerequisites.
1259

1260
    This has no prerequisites.
1261

1262
    """
1263
    pass
1264

    
1265
  def Exec(self, feedback_fn):
1266
    """Verify integrity of cluster disks.
1267

1268
    """
1269
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1270

    
1271
    vg_name = self.cfg.GetVGName()
1272
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1273
    instances = [self.cfg.GetInstanceInfo(name)
1274
                 for name in self.cfg.GetInstanceList()]
1275

    
1276
    nv_dict = {}
1277
    for inst in instances:
1278
      inst_lvs = {}
1279
      if (not inst.admin_up or
1280
          inst.disk_template not in constants.DTS_NET_MIRROR):
1281
        continue
1282
      inst.MapLVsByNode(inst_lvs)
1283
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1284
      for node, vol_list in inst_lvs.iteritems():
1285
        for vol in vol_list:
1286
          nv_dict[(node, vol)] = inst
1287

    
1288
    if not nv_dict:
1289
      return result
1290

    
1291
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1292

    
1293
    to_act = set()
1294
    for node in nodes:
1295
      # node_volume
1296
      lvs = node_lvs[node]
1297
      if lvs.failed:
1298
        if not lvs.offline:
1299
          self.LogWarning("Connection to node %s failed: %s" %
1300
                          (node, lvs.data))
1301
        continue
1302
      lvs = lvs.data
1303
      if isinstance(lvs, basestring):
1304
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1305
        res_nlvm[node] = lvs
1306
        continue
1307
      elif not isinstance(lvs, dict):
1308
        logging.warning("Connection to node %s failed or invalid data"
1309
                        " returned", node)
1310
        res_nodes.append(node)
1311
        continue
1312

    
1313
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1314
        inst = nv_dict.pop((node, lv_name), None)
1315
        if (not lv_online and inst is not None
1316
            and inst.name not in res_instances):
1317
          res_instances.append(inst.name)
1318

    
1319
    # any leftover items in nv_dict are missing LVs, let's arrange the
1320
    # data better
1321
    for key, inst in nv_dict.iteritems():
1322
      if inst.name not in res_missing:
1323
        res_missing[inst.name] = []
1324
      res_missing[inst.name].append(key)
1325

    
1326
    return result
1327

    
1328

    
1329
class LURenameCluster(LogicalUnit):
1330
  """Rename the cluster.
1331

1332
  """
1333
  HPATH = "cluster-rename"
1334
  HTYPE = constants.HTYPE_CLUSTER
1335
  _OP_REQP = ["name"]
1336

    
1337
  def BuildHooksEnv(self):
1338
    """Build hooks env.
1339

1340
    """
1341
    env = {
1342
      "OP_TARGET": self.cfg.GetClusterName(),
1343
      "NEW_NAME": self.op.name,
1344
      }
1345
    mn = self.cfg.GetMasterNode()
1346
    return env, [mn], [mn]
1347

    
1348
  def CheckPrereq(self):
1349
    """Verify that the passed name is a valid one.
1350

1351
    """
1352
    hostname = utils.HostInfo(self.op.name)
1353

    
1354
    new_name = hostname.name
1355
    self.ip = new_ip = hostname.ip
1356
    old_name = self.cfg.GetClusterName()
1357
    old_ip = self.cfg.GetMasterIP()
1358
    if new_name == old_name and new_ip == old_ip:
1359
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1360
                                 " cluster has changed")
1361
    if new_ip != old_ip:
1362
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1363
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1364
                                   " reachable on the network. Aborting." %
1365
                                   new_ip)
1366

    
1367
    self.op.name = new_name
1368

    
1369
  def Exec(self, feedback_fn):
1370
    """Rename the cluster.
1371

1372
    """
1373
    clustername = self.op.name
1374
    ip = self.ip
1375

    
1376
    # shutdown the master IP
1377
    master = self.cfg.GetMasterNode()
1378
    result = self.rpc.call_node_stop_master(master, False)
1379
    if result.failed or not result.data:
1380
      raise errors.OpExecError("Could not disable the master role")
1381

    
1382
    try:
1383
      cluster = self.cfg.GetClusterInfo()
1384
      cluster.cluster_name = clustername
1385
      cluster.master_ip = ip
1386
      self.cfg.Update(cluster)
1387

    
1388
      # update the known hosts file
1389
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1390
      node_list = self.cfg.GetNodeList()
1391
      try:
1392
        node_list.remove(master)
1393
      except ValueError:
1394
        pass
1395
      result = self.rpc.call_upload_file(node_list,
1396
                                         constants.SSH_KNOWN_HOSTS_FILE)
1397
      for to_node, to_result in result.iteritems():
1398
         msg = to_result.RemoteFailMsg()
1399
         if msg:
1400
           msg = ("Copy of file %s to node %s failed: %s" %
1401
                   (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1402
           self.proc.LogWarning(msg)
1403

    
1404
    finally:
1405
      result = self.rpc.call_node_start_master(master, False)
1406
      if result.failed or not result.data:
1407
        self.LogWarning("Could not re-enable the master role on"
1408
                        " the master, please restart manually.")
1409

    
1410

    
1411
def _RecursiveCheckIfLVMBased(disk):
1412
  """Check if the given disk or its children are lvm-based.
1413

1414
  @type disk: L{objects.Disk}
1415
  @param disk: the disk to check
1416
  @rtype: booleean
1417
  @return: boolean indicating whether a LD_LV dev_type was found or not
1418

1419
  """
1420
  if disk.children:
1421
    for chdisk in disk.children:
1422
      if _RecursiveCheckIfLVMBased(chdisk):
1423
        return True
1424
  return disk.dev_type == constants.LD_LV
1425

    
1426

    
1427
class LUSetClusterParams(LogicalUnit):
1428
  """Change the parameters of the cluster.
1429

1430
  """
1431
  HPATH = "cluster-modify"
1432
  HTYPE = constants.HTYPE_CLUSTER
1433
  _OP_REQP = []
1434
  REQ_BGL = False
1435

    
1436
  def CheckArguments(self):
1437
    """Check parameters
1438

1439
    """
1440
    if not hasattr(self.op, "candidate_pool_size"):
1441
      self.op.candidate_pool_size = None
1442
    if self.op.candidate_pool_size is not None:
1443
      try:
1444
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1445
      except (ValueError, TypeError), err:
1446
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1447
                                   str(err))
1448
      if self.op.candidate_pool_size < 1:
1449
        raise errors.OpPrereqError("At least one master candidate needed")
1450

    
1451
  def ExpandNames(self):
1452
    # FIXME: in the future maybe other cluster params won't require checking on
1453
    # all nodes to be modified.
1454
    self.needed_locks = {
1455
      locking.LEVEL_NODE: locking.ALL_SET,
1456
    }
1457
    self.share_locks[locking.LEVEL_NODE] = 1
1458

    
1459
  def BuildHooksEnv(self):
1460
    """Build hooks env.
1461

1462
    """
1463
    env = {
1464
      "OP_TARGET": self.cfg.GetClusterName(),
1465
      "NEW_VG_NAME": self.op.vg_name,
1466
      }
1467
    mn = self.cfg.GetMasterNode()
1468
    return env, [mn], [mn]
1469

    
1470
  def CheckPrereq(self):
1471
    """Check prerequisites.
1472

1473
    This checks whether the given params don't conflict and
1474
    if the given volume group is valid.
1475

1476
    """
1477
    if self.op.vg_name is not None and not self.op.vg_name:
1478
      instances = self.cfg.GetAllInstancesInfo().values()
1479
      for inst in instances:
1480
        for disk in inst.disks:
1481
          if _RecursiveCheckIfLVMBased(disk):
1482
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1483
                                       " lvm-based instances exist")
1484

    
1485
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1486

    
1487
    # if vg_name not None, checks given volume group on all nodes
1488
    if self.op.vg_name:
1489
      vglist = self.rpc.call_vg_list(node_list)
1490
      for node in node_list:
1491
        if vglist[node].failed:
1492
          # ignoring down node
1493
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1494
          continue
1495
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1496
                                              self.op.vg_name,
1497
                                              constants.MIN_VG_SIZE)
1498
        if vgstatus:
1499
          raise errors.OpPrereqError("Error on node '%s': %s" %
1500
                                     (node, vgstatus))
1501

    
1502
    self.cluster = cluster = self.cfg.GetClusterInfo()
1503
    # validate params changes
1504
    if self.op.beparams:
1505
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1506
      self.new_beparams = objects.FillDict(
1507
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1508

    
1509
    if self.op.nicparams:
1510
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1511
      self.new_nicparams = objects.FillDict(
1512
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1513
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1514

    
1515
    # hypervisor list/parameters
1516
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1517
    if self.op.hvparams:
1518
      if not isinstance(self.op.hvparams, dict):
1519
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1520
      for hv_name, hv_dict in self.op.hvparams.items():
1521
        if hv_name not in self.new_hvparams:
1522
          self.new_hvparams[hv_name] = hv_dict
1523
        else:
1524
          self.new_hvparams[hv_name].update(hv_dict)
1525

    
1526
    if self.op.enabled_hypervisors is not None:
1527
      self.hv_list = self.op.enabled_hypervisors
1528
    else:
1529
      self.hv_list = cluster.enabled_hypervisors
1530

    
1531
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1532
      # either the enabled list has changed, or the parameters have, validate
1533
      for hv_name, hv_params in self.new_hvparams.items():
1534
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1535
            (self.op.enabled_hypervisors and
1536
             hv_name in self.op.enabled_hypervisors)):
1537
          # either this is a new hypervisor, or its parameters have changed
1538
          hv_class = hypervisor.GetHypervisor(hv_name)
1539
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1540
          hv_class.CheckParameterSyntax(hv_params)
1541
          _CheckHVParams(self, node_list, hv_name, hv_params)
1542

    
1543
  def Exec(self, feedback_fn):
1544
    """Change the parameters of the cluster.
1545

1546
    """
1547
    if self.op.vg_name is not None:
1548
      new_volume = self.op.vg_name
1549
      if not new_volume:
1550
        new_volume = None
1551
      if new_volume != self.cfg.GetVGName():
1552
        self.cfg.SetVGName(new_volume)
1553
      else:
1554
        feedback_fn("Cluster LVM configuration already in desired"
1555
                    " state, not changing")
1556
    if self.op.hvparams:
1557
      self.cluster.hvparams = self.new_hvparams
1558
    if self.op.enabled_hypervisors is not None:
1559
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1560
    if self.op.beparams:
1561
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1562
    if self.op.nicparams:
1563
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1564

    
1565
    if self.op.candidate_pool_size is not None:
1566
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1567

    
1568
    self.cfg.Update(self.cluster)
1569

    
1570
    # we want to update nodes after the cluster so that if any errors
1571
    # happen, we have recorded and saved the cluster info
1572
    if self.op.candidate_pool_size is not None:
1573
      _AdjustCandidatePool(self)
1574

    
1575

    
1576
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1577
  """Distribute additional files which are part of the cluster configuration.
1578

1579
  ConfigWriter takes care of distributing the config and ssconf files, but
1580
  there are more files which should be distributed to all nodes. This function
1581
  makes sure those are copied.
1582

1583
  @param lu: calling logical unit
1584
  @param additional_nodes: list of nodes not in the config to distribute to
1585

1586
  """
1587
  # 1. Gather target nodes
1588
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1589
  dist_nodes = lu.cfg.GetNodeList()
1590
  if additional_nodes is not None:
1591
    dist_nodes.extend(additional_nodes)
1592
  if myself.name in dist_nodes:
1593
    dist_nodes.remove(myself.name)
1594
  # 2. Gather files to distribute
1595
  dist_files = set([constants.ETC_HOSTS,
1596
                    constants.SSH_KNOWN_HOSTS_FILE,
1597
                    constants.RAPI_CERT_FILE,
1598
                    constants.RAPI_USERS_FILE,
1599
                   ])
1600

    
1601
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1602
  for hv_name in enabled_hypervisors:
1603
    hv_class = hypervisor.GetHypervisor(hv_name)
1604
    dist_files.update(hv_class.GetAncillaryFiles())
1605

    
1606
  # 3. Perform the files upload
1607
  for fname in dist_files:
1608
    if os.path.exists(fname):
1609
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1610
      for to_node, to_result in result.items():
1611
         msg = to_result.RemoteFailMsg()
1612
         if msg:
1613
           msg = ("Copy of file %s to node %s failed: %s" %
1614
                   (fname, to_node, msg))
1615
           lu.proc.LogWarning(msg)
1616

    
1617

    
1618
class LURedistributeConfig(NoHooksLU):
1619
  """Force the redistribution of cluster configuration.
1620

1621
  This is a very simple LU.
1622

1623
  """
1624
  _OP_REQP = []
1625
  REQ_BGL = False
1626

    
1627
  def ExpandNames(self):
1628
    self.needed_locks = {
1629
      locking.LEVEL_NODE: locking.ALL_SET,
1630
    }
1631
    self.share_locks[locking.LEVEL_NODE] = 1
1632

    
1633
  def CheckPrereq(self):
1634
    """Check prerequisites.
1635

1636
    """
1637

    
1638
  def Exec(self, feedback_fn):
1639
    """Redistribute the configuration.
1640

1641
    """
1642
    self.cfg.Update(self.cfg.GetClusterInfo())
1643
    _RedistributeAncillaryFiles(self)
1644

    
1645

    
1646
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1647
  """Sleep and poll for an instance's disk to sync.
1648

1649
  """
1650
  if not instance.disks:
1651
    return True
1652

    
1653
  if not oneshot:
1654
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1655

    
1656
  node = instance.primary_node
1657

    
1658
  for dev in instance.disks:
1659
    lu.cfg.SetDiskID(dev, node)
1660

    
1661
  retries = 0
1662
  while True:
1663
    max_time = 0
1664
    done = True
1665
    cumul_degraded = False
1666
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1667
    if rstats.failed or not rstats.data:
1668
      lu.LogWarning("Can't get any data from node %s", node)
1669
      retries += 1
1670
      if retries >= 10:
1671
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1672
                                 " aborting." % node)
1673
      time.sleep(6)
1674
      continue
1675
    rstats = rstats.data
1676
    retries = 0
1677
    for i, mstat in enumerate(rstats):
1678
      if mstat is None:
1679
        lu.LogWarning("Can't compute data for node %s/%s",
1680
                           node, instance.disks[i].iv_name)
1681
        continue
1682
      # we ignore the ldisk parameter
1683
      perc_done, est_time, is_degraded, _ = mstat
1684
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1685
      if perc_done is not None:
1686
        done = False
1687
        if est_time is not None:
1688
          rem_time = "%d estimated seconds remaining" % est_time
1689
          max_time = est_time
1690
        else:
1691
          rem_time = "no time estimate"
1692
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1693
                        (instance.disks[i].iv_name, perc_done, rem_time))
1694
    if done or oneshot:
1695
      break
1696

    
1697
    time.sleep(min(60, max_time))
1698

    
1699
  if done:
1700
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1701
  return not cumul_degraded
1702

    
1703

    
1704
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1705
  """Check that mirrors are not degraded.
1706

1707
  The ldisk parameter, if True, will change the test from the
1708
  is_degraded attribute (which represents overall non-ok status for
1709
  the device(s)) to the ldisk (representing the local storage status).
1710

1711
  """
1712
  lu.cfg.SetDiskID(dev, node)
1713
  if ldisk:
1714
    idx = 6
1715
  else:
1716
    idx = 5
1717

    
1718
  result = True
1719
  if on_primary or dev.AssembleOnSecondary():
1720
    rstats = lu.rpc.call_blockdev_find(node, dev)
1721
    msg = rstats.RemoteFailMsg()
1722
    if msg:
1723
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1724
      result = False
1725
    elif not rstats.payload:
1726
      lu.LogWarning("Can't find disk on node %s", node)
1727
      result = False
1728
    else:
1729
      result = result and (not rstats.payload[idx])
1730
  if dev.children:
1731
    for child in dev.children:
1732
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1733

    
1734
  return result
1735

    
1736

    
1737
class LUDiagnoseOS(NoHooksLU):
1738
  """Logical unit for OS diagnose/query.
1739

1740
  """
1741
  _OP_REQP = ["output_fields", "names"]
1742
  REQ_BGL = False
1743
  _FIELDS_STATIC = utils.FieldSet()
1744
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1745

    
1746
  def ExpandNames(self):
1747
    if self.op.names:
1748
      raise errors.OpPrereqError("Selective OS query not supported")
1749

    
1750
    _CheckOutputFields(static=self._FIELDS_STATIC,
1751
                       dynamic=self._FIELDS_DYNAMIC,
1752
                       selected=self.op.output_fields)
1753

    
1754
    # Lock all nodes, in shared mode
1755
    # Temporary removal of locks, should be reverted later
1756
    # TODO: reintroduce locks when they are lighter-weight
1757
    self.needed_locks = {}
1758
    #self.share_locks[locking.LEVEL_NODE] = 1
1759
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1760

    
1761
  def CheckPrereq(self):
1762
    """Check prerequisites.
1763

1764
    """
1765

    
1766
  @staticmethod
1767
  def _DiagnoseByOS(node_list, rlist):
1768
    """Remaps a per-node return list into an a per-os per-node dictionary
1769

1770
    @param node_list: a list with the names of all nodes
1771
    @param rlist: a map with node names as keys and OS objects as values
1772

1773
    @rtype: dict
1774
    @return: a dictionary with osnames as keys and as value another map, with
1775
        nodes as keys and list of OS objects as values, eg::
1776

1777
          {"debian-etch": {"node1": [<object>,...],
1778
                           "node2": [<object>,]}
1779
          }
1780

1781
    """
1782
    all_os = {}
1783
    # we build here the list of nodes that didn't fail the RPC (at RPC
1784
    # level), so that nodes with a non-responding node daemon don't
1785
    # make all OSes invalid
1786
    good_nodes = [node_name for node_name in rlist
1787
                  if not rlist[node_name].failed]
1788
    for node_name, nr in rlist.iteritems():
1789
      if nr.failed or not nr.data:
1790
        continue
1791
      for os_obj in nr.data:
1792
        if os_obj.name not in all_os:
1793
          # build a list of nodes for this os containing empty lists
1794
          # for each node in node_list
1795
          all_os[os_obj.name] = {}
1796
          for nname in good_nodes:
1797
            all_os[os_obj.name][nname] = []
1798
        all_os[os_obj.name][node_name].append(os_obj)
1799
    return all_os
1800

    
1801
  def Exec(self, feedback_fn):
1802
    """Compute the list of OSes.
1803

1804
    """
1805
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1806
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1807
    if node_data == False:
1808
      raise errors.OpExecError("Can't gather the list of OSes")
1809
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1810
    output = []
1811
    for os_name, os_data in pol.iteritems():
1812
      row = []
1813
      for field in self.op.output_fields:
1814
        if field == "name":
1815
          val = os_name
1816
        elif field == "valid":
1817
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1818
        elif field == "node_status":
1819
          val = {}
1820
          for node_name, nos_list in os_data.iteritems():
1821
            val[node_name] = [(v.status, v.path) for v in nos_list]
1822
        else:
1823
          raise errors.ParameterError(field)
1824
        row.append(val)
1825
      output.append(row)
1826

    
1827
    return output
1828

    
1829

    
1830
class LURemoveNode(LogicalUnit):
1831
  """Logical unit for removing a node.
1832

1833
  """
1834
  HPATH = "node-remove"
1835
  HTYPE = constants.HTYPE_NODE
1836
  _OP_REQP = ["node_name"]
1837

    
1838
  def BuildHooksEnv(self):
1839
    """Build hooks env.
1840

1841
    This doesn't run on the target node in the pre phase as a failed
1842
    node would then be impossible to remove.
1843

1844
    """
1845
    env = {
1846
      "OP_TARGET": self.op.node_name,
1847
      "NODE_NAME": self.op.node_name,
1848
      }
1849
    all_nodes = self.cfg.GetNodeList()
1850
    all_nodes.remove(self.op.node_name)
1851
    return env, all_nodes, all_nodes
1852

    
1853
  def CheckPrereq(self):
1854
    """Check prerequisites.
1855

1856
    This checks:
1857
     - the node exists in the configuration
1858
     - it does not have primary or secondary instances
1859
     - it's not the master
1860

1861
    Any errors are signalled by raising errors.OpPrereqError.
1862

1863
    """
1864
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1865
    if node is None:
1866
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1867

    
1868
    instance_list = self.cfg.GetInstanceList()
1869

    
1870
    masternode = self.cfg.GetMasterNode()
1871
    if node.name == masternode:
1872
      raise errors.OpPrereqError("Node is the master node,"
1873
                                 " you need to failover first.")
1874

    
1875
    for instance_name in instance_list:
1876
      instance = self.cfg.GetInstanceInfo(instance_name)
1877
      if node.name in instance.all_nodes:
1878
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1879
                                   " please remove first." % instance_name)
1880
    self.op.node_name = node.name
1881
    self.node = node
1882

    
1883
  def Exec(self, feedback_fn):
1884
    """Removes the node from the cluster.
1885

1886
    """
1887
    node = self.node
1888
    logging.info("Stopping the node daemon and removing configs from node %s",
1889
                 node.name)
1890

    
1891
    self.context.RemoveNode(node.name)
1892

    
1893
    self.rpc.call_node_leave_cluster(node.name)
1894

    
1895
    # Promote nodes to master candidate as needed
1896
    _AdjustCandidatePool(self)
1897

    
1898

    
1899
class LUQueryNodes(NoHooksLU):
1900
  """Logical unit for querying nodes.
1901

1902
  """
1903
  _OP_REQP = ["output_fields", "names", "use_locking"]
1904
  REQ_BGL = False
1905
  _FIELDS_DYNAMIC = utils.FieldSet(
1906
    "dtotal", "dfree",
1907
    "mtotal", "mnode", "mfree",
1908
    "bootid",
1909
    "ctotal", "cnodes", "csockets",
1910
    )
1911

    
1912
  _FIELDS_STATIC = utils.FieldSet(
1913
    "name", "pinst_cnt", "sinst_cnt",
1914
    "pinst_list", "sinst_list",
1915
    "pip", "sip", "tags",
1916
    "serial_no",
1917
    "master_candidate",
1918
    "master",
1919
    "offline",
1920
    "drained",
1921
    )
1922

    
1923
  def ExpandNames(self):
1924
    _CheckOutputFields(static=self._FIELDS_STATIC,
1925
                       dynamic=self._FIELDS_DYNAMIC,
1926
                       selected=self.op.output_fields)
1927

    
1928
    self.needed_locks = {}
1929
    self.share_locks[locking.LEVEL_NODE] = 1
1930

    
1931
    if self.op.names:
1932
      self.wanted = _GetWantedNodes(self, self.op.names)
1933
    else:
1934
      self.wanted = locking.ALL_SET
1935

    
1936
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1937
    self.do_locking = self.do_node_query and self.op.use_locking
1938
    if self.do_locking:
1939
      # if we don't request only static fields, we need to lock the nodes
1940
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1941

    
1942

    
1943
  def CheckPrereq(self):
1944
    """Check prerequisites.
1945

1946
    """
1947
    # The validation of the node list is done in the _GetWantedNodes,
1948
    # if non empty, and if empty, there's no validation to do
1949
    pass
1950

    
1951
  def Exec(self, feedback_fn):
1952
    """Computes the list of nodes and their attributes.
1953

1954
    """
1955
    all_info = self.cfg.GetAllNodesInfo()
1956
    if self.do_locking:
1957
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1958
    elif self.wanted != locking.ALL_SET:
1959
      nodenames = self.wanted
1960
      missing = set(nodenames).difference(all_info.keys())
1961
      if missing:
1962
        raise errors.OpExecError(
1963
          "Some nodes were removed before retrieving their data: %s" % missing)
1964
    else:
1965
      nodenames = all_info.keys()
1966

    
1967
    nodenames = utils.NiceSort(nodenames)
1968
    nodelist = [all_info[name] for name in nodenames]
1969

    
1970
    # begin data gathering
1971

    
1972
    if self.do_node_query:
1973
      live_data = {}
1974
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1975
                                          self.cfg.GetHypervisorType())
1976
      for name in nodenames:
1977
        nodeinfo = node_data[name]
1978
        if not nodeinfo.failed and nodeinfo.data:
1979
          nodeinfo = nodeinfo.data
1980
          fn = utils.TryConvert
1981
          live_data[name] = {
1982
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1983
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1984
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1985
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1986
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1987
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1988
            "bootid": nodeinfo.get('bootid', None),
1989
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1990
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1991
            }
1992
        else:
1993
          live_data[name] = {}
1994
    else:
1995
      live_data = dict.fromkeys(nodenames, {})
1996

    
1997
    node_to_primary = dict([(name, set()) for name in nodenames])
1998
    node_to_secondary = dict([(name, set()) for name in nodenames])
1999

    
2000
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2001
                             "sinst_cnt", "sinst_list"))
2002
    if inst_fields & frozenset(self.op.output_fields):
2003
      instancelist = self.cfg.GetInstanceList()
2004

    
2005
      for instance_name in instancelist:
2006
        inst = self.cfg.GetInstanceInfo(instance_name)
2007
        if inst.primary_node in node_to_primary:
2008
          node_to_primary[inst.primary_node].add(inst.name)
2009
        for secnode in inst.secondary_nodes:
2010
          if secnode in node_to_secondary:
2011
            node_to_secondary[secnode].add(inst.name)
2012

    
2013
    master_node = self.cfg.GetMasterNode()
2014

    
2015
    # end data gathering
2016

    
2017
    output = []
2018
    for node in nodelist:
2019
      node_output = []
2020
      for field in self.op.output_fields:
2021
        if field == "name":
2022
          val = node.name
2023
        elif field == "pinst_list":
2024
          val = list(node_to_primary[node.name])
2025
        elif field == "sinst_list":
2026
          val = list(node_to_secondary[node.name])
2027
        elif field == "pinst_cnt":
2028
          val = len(node_to_primary[node.name])
2029
        elif field == "sinst_cnt":
2030
          val = len(node_to_secondary[node.name])
2031
        elif field == "pip":
2032
          val = node.primary_ip
2033
        elif field == "sip":
2034
          val = node.secondary_ip
2035
        elif field == "tags":
2036
          val = list(node.GetTags())
2037
        elif field == "serial_no":
2038
          val = node.serial_no
2039
        elif field == "master_candidate":
2040
          val = node.master_candidate
2041
        elif field == "master":
2042
          val = node.name == master_node
2043
        elif field == "offline":
2044
          val = node.offline
2045
        elif field == "drained":
2046
          val = node.drained
2047
        elif self._FIELDS_DYNAMIC.Matches(field):
2048
          val = live_data[node.name].get(field, None)
2049
        else:
2050
          raise errors.ParameterError(field)
2051
        node_output.append(val)
2052
      output.append(node_output)
2053

    
2054
    return output
2055

    
2056

    
2057
class LUQueryNodeVolumes(NoHooksLU):
2058
  """Logical unit for getting volumes on node(s).
2059

2060
  """
2061
  _OP_REQP = ["nodes", "output_fields"]
2062
  REQ_BGL = False
2063
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2064
  _FIELDS_STATIC = utils.FieldSet("node")
2065

    
2066
  def ExpandNames(self):
2067
    _CheckOutputFields(static=self._FIELDS_STATIC,
2068
                       dynamic=self._FIELDS_DYNAMIC,
2069
                       selected=self.op.output_fields)
2070

    
2071
    self.needed_locks = {}
2072
    self.share_locks[locking.LEVEL_NODE] = 1
2073
    if not self.op.nodes:
2074
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2075
    else:
2076
      self.needed_locks[locking.LEVEL_NODE] = \
2077
        _GetWantedNodes(self, self.op.nodes)
2078

    
2079
  def CheckPrereq(self):
2080
    """Check prerequisites.
2081

2082
    This checks that the fields required are valid output fields.
2083

2084
    """
2085
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2086

    
2087
  def Exec(self, feedback_fn):
2088
    """Computes the list of nodes and their attributes.
2089

2090
    """
2091
    nodenames = self.nodes
2092
    volumes = self.rpc.call_node_volumes(nodenames)
2093

    
2094
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2095
             in self.cfg.GetInstanceList()]
2096

    
2097
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2098

    
2099
    output = []
2100
    for node in nodenames:
2101
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2102
        continue
2103

    
2104
      node_vols = volumes[node].data[:]
2105
      node_vols.sort(key=lambda vol: vol['dev'])
2106

    
2107
      for vol in node_vols:
2108
        node_output = []
2109
        for field in self.op.output_fields:
2110
          if field == "node":
2111
            val = node
2112
          elif field == "phys":
2113
            val = vol['dev']
2114
          elif field == "vg":
2115
            val = vol['vg']
2116
          elif field == "name":
2117
            val = vol['name']
2118
          elif field == "size":
2119
            val = int(float(vol['size']))
2120
          elif field == "instance":
2121
            for inst in ilist:
2122
              if node not in lv_by_node[inst]:
2123
                continue
2124
              if vol['name'] in lv_by_node[inst][node]:
2125
                val = inst.name
2126
                break
2127
            else:
2128
              val = '-'
2129
          else:
2130
            raise errors.ParameterError(field)
2131
          node_output.append(str(val))
2132

    
2133
        output.append(node_output)
2134

    
2135
    return output
2136

    
2137

    
2138
class LUAddNode(LogicalUnit):
2139
  """Logical unit for adding node to the cluster.
2140

2141
  """
2142
  HPATH = "node-add"
2143
  HTYPE = constants.HTYPE_NODE
2144
  _OP_REQP = ["node_name"]
2145

    
2146
  def BuildHooksEnv(self):
2147
    """Build hooks env.
2148

2149
    This will run on all nodes before, and on all nodes + the new node after.
2150

2151
    """
2152
    env = {
2153
      "OP_TARGET": self.op.node_name,
2154
      "NODE_NAME": self.op.node_name,
2155
      "NODE_PIP": self.op.primary_ip,
2156
      "NODE_SIP": self.op.secondary_ip,
2157
      }
2158
    nodes_0 = self.cfg.GetNodeList()
2159
    nodes_1 = nodes_0 + [self.op.node_name, ]
2160
    return env, nodes_0, nodes_1
2161

    
2162
  def CheckPrereq(self):
2163
    """Check prerequisites.
2164

2165
    This checks:
2166
     - the new node is not already in the config
2167
     - it is resolvable
2168
     - its parameters (single/dual homed) matches the cluster
2169

2170
    Any errors are signalled by raising errors.OpPrereqError.
2171

2172
    """
2173
    node_name = self.op.node_name
2174
    cfg = self.cfg
2175

    
2176
    dns_data = utils.HostInfo(node_name)
2177

    
2178
    node = dns_data.name
2179
    primary_ip = self.op.primary_ip = dns_data.ip
2180
    secondary_ip = getattr(self.op, "secondary_ip", None)
2181
    if secondary_ip is None:
2182
      secondary_ip = primary_ip
2183
    if not utils.IsValidIP(secondary_ip):
2184
      raise errors.OpPrereqError("Invalid secondary IP given")
2185
    self.op.secondary_ip = secondary_ip
2186

    
2187
    node_list = cfg.GetNodeList()
2188
    if not self.op.readd and node in node_list:
2189
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2190
                                 node)
2191
    elif self.op.readd and node not in node_list:
2192
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2193

    
2194
    for existing_node_name in node_list:
2195
      existing_node = cfg.GetNodeInfo(existing_node_name)
2196

    
2197
      if self.op.readd and node == existing_node_name:
2198
        if (existing_node.primary_ip != primary_ip or
2199
            existing_node.secondary_ip != secondary_ip):
2200
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2201
                                     " address configuration as before")
2202
        continue
2203

    
2204
      if (existing_node.primary_ip == primary_ip or
2205
          existing_node.secondary_ip == primary_ip or
2206
          existing_node.primary_ip == secondary_ip or
2207
          existing_node.secondary_ip == secondary_ip):
2208
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2209
                                   " existing node %s" % existing_node.name)
2210

    
2211
    # check that the type of the node (single versus dual homed) is the
2212
    # same as for the master
2213
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2214
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2215
    newbie_singlehomed = secondary_ip == primary_ip
2216
    if master_singlehomed != newbie_singlehomed:
2217
      if master_singlehomed:
2218
        raise errors.OpPrereqError("The master has no private ip but the"
2219
                                   " new node has one")
2220
      else:
2221
        raise errors.OpPrereqError("The master has a private ip but the"
2222
                                   " new node doesn't have one")
2223

    
2224
    # checks reachablity
2225
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2226
      raise errors.OpPrereqError("Node not reachable by ping")
2227

    
2228
    if not newbie_singlehomed:
2229
      # check reachability from my secondary ip to newbie's secondary ip
2230
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2231
                           source=myself.secondary_ip):
2232
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2233
                                   " based ping to noded port")
2234

    
2235
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2236
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2237
    master_candidate = mc_now < cp_size
2238

    
2239
    self.new_node = objects.Node(name=node,
2240
                                 primary_ip=primary_ip,
2241
                                 secondary_ip=secondary_ip,
2242
                                 master_candidate=master_candidate,
2243
                                 offline=False, drained=False)
2244

    
2245
  def Exec(self, feedback_fn):
2246
    """Adds the new node to the cluster.
2247

2248
    """
2249
    new_node = self.new_node
2250
    node = new_node.name
2251

    
2252
    # check connectivity
2253
    result = self.rpc.call_version([node])[node]
2254
    result.Raise()
2255
    if result.data:
2256
      if constants.PROTOCOL_VERSION == result.data:
2257
        logging.info("Communication to node %s fine, sw version %s match",
2258
                     node, result.data)
2259
      else:
2260
        raise errors.OpExecError("Version mismatch master version %s,"
2261
                                 " node version %s" %
2262
                                 (constants.PROTOCOL_VERSION, result.data))
2263
    else:
2264
      raise errors.OpExecError("Cannot get version from the new node")
2265

    
2266
    # setup ssh on node
2267
    logging.info("Copy ssh key to node %s", node)
2268
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2269
    keyarray = []
2270
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2271
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2272
                priv_key, pub_key]
2273

    
2274
    for i in keyfiles:
2275
      f = open(i, 'r')
2276
      try:
2277
        keyarray.append(f.read())
2278
      finally:
2279
        f.close()
2280

    
2281
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2282
                                    keyarray[2],
2283
                                    keyarray[3], keyarray[4], keyarray[5])
2284

    
2285
    msg = result.RemoteFailMsg()
2286
    if msg:
2287
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2288
                               " new node: %s" % msg)
2289

    
2290
    # Add node to our /etc/hosts, and add key to known_hosts
2291
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2292
      utils.AddHostToEtcHosts(new_node.name)
2293

    
2294
    if new_node.secondary_ip != new_node.primary_ip:
2295
      result = self.rpc.call_node_has_ip_address(new_node.name,
2296
                                                 new_node.secondary_ip)
2297
      if result.failed or not result.data:
2298
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2299
                                 " you gave (%s). Please fix and re-run this"
2300
                                 " command." % new_node.secondary_ip)
2301

    
2302
    node_verify_list = [self.cfg.GetMasterNode()]
2303
    node_verify_param = {
2304
      'nodelist': [node],
2305
      # TODO: do a node-net-test as well?
2306
    }
2307

    
2308
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2309
                                       self.cfg.GetClusterName())
2310
    for verifier in node_verify_list:
2311
      if result[verifier].failed or not result[verifier].data:
2312
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2313
                                 " for remote verification" % verifier)
2314
      if result[verifier].data['nodelist']:
2315
        for failed in result[verifier].data['nodelist']:
2316
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2317
                      (verifier, result[verifier].data['nodelist'][failed]))
2318
        raise errors.OpExecError("ssh/hostname verification failed.")
2319

    
2320
    if self.op.readd:
2321
      _RedistributeAncillaryFiles(self)
2322
      self.context.ReaddNode(new_node)
2323
    else:
2324
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2325
      self.context.AddNode(new_node)
2326

    
2327

    
2328
class LUSetNodeParams(LogicalUnit):
2329
  """Modifies the parameters of a node.
2330

2331
  """
2332
  HPATH = "node-modify"
2333
  HTYPE = constants.HTYPE_NODE
2334
  _OP_REQP = ["node_name"]
2335
  REQ_BGL = False
2336

    
2337
  def CheckArguments(self):
2338
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2339
    if node_name is None:
2340
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2341
    self.op.node_name = node_name
2342
    _CheckBooleanOpField(self.op, 'master_candidate')
2343
    _CheckBooleanOpField(self.op, 'offline')
2344
    _CheckBooleanOpField(self.op, 'drained')
2345
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2346
    if all_mods.count(None) == 3:
2347
      raise errors.OpPrereqError("Please pass at least one modification")
2348
    if all_mods.count(True) > 1:
2349
      raise errors.OpPrereqError("Can't set the node into more than one"
2350
                                 " state at the same time")
2351

    
2352
  def ExpandNames(self):
2353
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2354

    
2355
  def BuildHooksEnv(self):
2356
    """Build hooks env.
2357

2358
    This runs on the master node.
2359

2360
    """
2361
    env = {
2362
      "OP_TARGET": self.op.node_name,
2363
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2364
      "OFFLINE": str(self.op.offline),
2365
      "DRAINED": str(self.op.drained),
2366
      }
2367
    nl = [self.cfg.GetMasterNode(),
2368
          self.op.node_name]
2369
    return env, nl, nl
2370

    
2371
  def CheckPrereq(self):
2372
    """Check prerequisites.
2373

2374
    This only checks the instance list against the existing names.
2375

2376
    """
2377
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2378

    
2379
    if ((self.op.master_candidate == False or self.op.offline == True or
2380
         self.op.drained == True) and node.master_candidate):
2381
      # we will demote the node from master_candidate
2382
      if self.op.node_name == self.cfg.GetMasterNode():
2383
        raise errors.OpPrereqError("The master node has to be a"
2384
                                   " master candidate, online and not drained")
2385
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2386
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2387
      if num_candidates <= cp_size:
2388
        msg = ("Not enough master candidates (desired"
2389
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2390
        if self.op.force:
2391
          self.LogWarning(msg)
2392
        else:
2393
          raise errors.OpPrereqError(msg)
2394

    
2395
    if (self.op.master_candidate == True and
2396
        ((node.offline and not self.op.offline == False) or
2397
         (node.drained and not self.op.drained == False))):
2398
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2399
                                 " to master_candidate" % node.name)
2400

    
2401
    return
2402

    
2403
  def Exec(self, feedback_fn):
2404
    """Modifies a node.
2405

2406
    """
2407
    node = self.node
2408

    
2409
    result = []
2410
    changed_mc = False
2411

    
2412
    if self.op.offline is not None:
2413
      node.offline = self.op.offline
2414
      result.append(("offline", str(self.op.offline)))
2415
      if self.op.offline == True:
2416
        if node.master_candidate:
2417
          node.master_candidate = False
2418
          changed_mc = True
2419
          result.append(("master_candidate", "auto-demotion due to offline"))
2420
        if node.drained:
2421
          node.drained = False
2422
          result.append(("drained", "clear drained status due to offline"))
2423

    
2424
    if self.op.master_candidate is not None:
2425
      node.master_candidate = self.op.master_candidate
2426
      changed_mc = True
2427
      result.append(("master_candidate", str(self.op.master_candidate)))
2428
      if self.op.master_candidate == False:
2429
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2430
        msg = rrc.RemoteFailMsg()
2431
        if msg:
2432
          self.LogWarning("Node failed to demote itself: %s" % msg)
2433

    
2434
    if self.op.drained is not None:
2435
      node.drained = self.op.drained
2436
      result.append(("drained", str(self.op.drained)))
2437
      if self.op.drained == True:
2438
        if node.master_candidate:
2439
          node.master_candidate = False
2440
          changed_mc = True
2441
          result.append(("master_candidate", "auto-demotion due to drain"))
2442
        if node.offline:
2443
          node.offline = False
2444
          result.append(("offline", "clear offline status due to drain"))
2445

    
2446
    # this will trigger configuration file update, if needed
2447
    self.cfg.Update(node)
2448
    # this will trigger job queue propagation or cleanup
2449
    if changed_mc:
2450
      self.context.ReaddNode(node)
2451

    
2452
    return result
2453

    
2454

    
2455
class LUPowercycleNode(NoHooksLU):
2456
  """Powercycles a node.
2457

2458
  """
2459
  _OP_REQP = ["node_name", "force"]
2460
  REQ_BGL = False
2461

    
2462
  def CheckArguments(self):
2463
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2464
    if node_name is None:
2465
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2466
    self.op.node_name = node_name
2467
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
2468
      raise errors.OpPrereqError("The node is the master and the force"
2469
                                 " parameter was not set")
2470

    
2471
  def ExpandNames(self):
2472
    """Locking for PowercycleNode.
2473

2474
    This is a last-resource option and shouldn't block on other
2475
    jobs. Therefore, we grab no locks.
2476

2477
    """
2478
    self.needed_locks = {}
2479

    
2480
  def CheckPrereq(self):
2481
    """Check prerequisites.
2482

2483
    This LU has no prereqs.
2484

2485
    """
2486
    pass
2487

    
2488
  def Exec(self, feedback_fn):
2489
    """Reboots a node.
2490

2491
    """
2492
    result = self.rpc.call_node_powercycle(self.op.node_name,
2493
                                           self.cfg.GetHypervisorType())
2494
    msg = result.RemoteFailMsg()
2495
    if msg:
2496
      raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2497
    return result.payload
2498

    
2499

    
2500
class LUQueryClusterInfo(NoHooksLU):
2501
  """Query cluster configuration.
2502

2503
  """
2504
  _OP_REQP = []
2505
  REQ_BGL = False
2506

    
2507
  def ExpandNames(self):
2508
    self.needed_locks = {}
2509

    
2510
  def CheckPrereq(self):
2511
    """No prerequsites needed for this LU.
2512

2513
    """
2514
    pass
2515

    
2516
  def Exec(self, feedback_fn):
2517
    """Return cluster config.
2518

2519
    """
2520
    cluster = self.cfg.GetClusterInfo()
2521
    result = {
2522
      "software_version": constants.RELEASE_VERSION,
2523
      "protocol_version": constants.PROTOCOL_VERSION,
2524
      "config_version": constants.CONFIG_VERSION,
2525
      "os_api_version": constants.OS_API_VERSION,
2526
      "export_version": constants.EXPORT_VERSION,
2527
      "architecture": (platform.architecture()[0], platform.machine()),
2528
      "name": cluster.cluster_name,
2529
      "master": cluster.master_node,
2530
      "default_hypervisor": cluster.default_hypervisor,
2531
      "enabled_hypervisors": cluster.enabled_hypervisors,
2532
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2533
                        for hypervisor in cluster.enabled_hypervisors]),
2534
      "beparams": cluster.beparams,
2535
      "nicparams": cluster.nicparams,
2536
      "candidate_pool_size": cluster.candidate_pool_size,
2537
      "master_netdev": cluster.master_netdev,
2538
      "volume_group_name": cluster.volume_group_name,
2539
      "file_storage_dir": cluster.file_storage_dir,
2540
      }
2541

    
2542
    return result
2543

    
2544

    
2545
class LUQueryConfigValues(NoHooksLU):
2546
  """Return configuration values.
2547

2548
  """
2549
  _OP_REQP = []
2550
  REQ_BGL = False
2551
  _FIELDS_DYNAMIC = utils.FieldSet()
2552
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2553

    
2554
  def ExpandNames(self):
2555
    self.needed_locks = {}
2556

    
2557
    _CheckOutputFields(static=self._FIELDS_STATIC,
2558
                       dynamic=self._FIELDS_DYNAMIC,
2559
                       selected=self.op.output_fields)
2560

    
2561
  def CheckPrereq(self):
2562
    """No prerequisites.
2563

2564
    """
2565
    pass
2566

    
2567
  def Exec(self, feedback_fn):
2568
    """Dump a representation of the cluster config to the standard output.
2569

2570
    """
2571
    values = []
2572
    for field in self.op.output_fields:
2573
      if field == "cluster_name":
2574
        entry = self.cfg.GetClusterName()
2575
      elif field == "master_node":
2576
        entry = self.cfg.GetMasterNode()
2577
      elif field == "drain_flag":
2578
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2579
      else:
2580
        raise errors.ParameterError(field)
2581
      values.append(entry)
2582
    return values
2583

    
2584

    
2585
class LUActivateInstanceDisks(NoHooksLU):
2586
  """Bring up an instance's disks.
2587

2588
  """
2589
  _OP_REQP = ["instance_name"]
2590
  REQ_BGL = False
2591

    
2592
  def ExpandNames(self):
2593
    self._ExpandAndLockInstance()
2594
    self.needed_locks[locking.LEVEL_NODE] = []
2595
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2596

    
2597
  def DeclareLocks(self, level):
2598
    if level == locking.LEVEL_NODE:
2599
      self._LockInstancesNodes()
2600

    
2601
  def CheckPrereq(self):
2602
    """Check prerequisites.
2603

2604
    This checks that the instance is in the cluster.
2605

2606
    """
2607
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2608
    assert self.instance is not None, \
2609
      "Cannot retrieve locked instance %s" % self.op.instance_name
2610
    _CheckNodeOnline(self, self.instance.primary_node)
2611

    
2612
  def Exec(self, feedback_fn):
2613
    """Activate the disks.
2614

2615
    """
2616
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2617
    if not disks_ok:
2618
      raise errors.OpExecError("Cannot activate block devices")
2619

    
2620
    return disks_info
2621

    
2622

    
2623
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2624
  """Prepare the block devices for an instance.
2625

2626
  This sets up the block devices on all nodes.
2627

2628
  @type lu: L{LogicalUnit}
2629
  @param lu: the logical unit on whose behalf we execute
2630
  @type instance: L{objects.Instance}
2631
  @param instance: the instance for whose disks we assemble
2632
  @type ignore_secondaries: boolean
2633
  @param ignore_secondaries: if true, errors on secondary nodes
2634
      won't result in an error return from the function
2635
  @return: False if the operation failed, otherwise a list of
2636
      (host, instance_visible_name, node_visible_name)
2637
      with the mapping from node devices to instance devices
2638

2639
  """
2640
  device_info = []
2641
  disks_ok = True
2642
  iname = instance.name
2643
  # With the two passes mechanism we try to reduce the window of
2644
  # opportunity for the race condition of switching DRBD to primary
2645
  # before handshaking occured, but we do not eliminate it
2646

    
2647
  # The proper fix would be to wait (with some limits) until the
2648
  # connection has been made and drbd transitions from WFConnection
2649
  # into any other network-connected state (Connected, SyncTarget,
2650
  # SyncSource, etc.)
2651

    
2652
  # 1st pass, assemble on all nodes in secondary mode
2653
  for inst_disk in instance.disks:
2654
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2655
      lu.cfg.SetDiskID(node_disk, node)
2656
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2657
      msg = result.RemoteFailMsg()
2658
      if msg:
2659
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2660
                           " (is_primary=False, pass=1): %s",
2661
                           inst_disk.iv_name, node, msg)
2662
        if not ignore_secondaries:
2663
          disks_ok = False
2664

    
2665
  # FIXME: race condition on drbd migration to primary
2666

    
2667
  # 2nd pass, do only the primary node
2668
  for inst_disk in instance.disks:
2669
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2670
      if node != instance.primary_node:
2671
        continue
2672
      lu.cfg.SetDiskID(node_disk, node)
2673
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2674
      msg = result.RemoteFailMsg()
2675
      if msg:
2676
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2677
                           " (is_primary=True, pass=2): %s",
2678
                           inst_disk.iv_name, node, msg)
2679
        disks_ok = False
2680
    device_info.append((instance.primary_node, inst_disk.iv_name,
2681
                        result.payload))
2682

    
2683
  # leave the disks configured for the primary node
2684
  # this is a workaround that would be fixed better by
2685
  # improving the logical/physical id handling
2686
  for disk in instance.disks:
2687
    lu.cfg.SetDiskID(disk, instance.primary_node)
2688

    
2689
  return disks_ok, device_info
2690

    
2691

    
2692
def _StartInstanceDisks(lu, instance, force):
2693
  """Start the disks of an instance.
2694

2695
  """
2696
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2697
                                           ignore_secondaries=force)
2698
  if not disks_ok:
2699
    _ShutdownInstanceDisks(lu, instance)
2700
    if force is not None and not force:
2701
      lu.proc.LogWarning("", hint="If the message above refers to a"
2702
                         " secondary node,"
2703
                         " you can retry the operation using '--force'.")
2704
    raise errors.OpExecError("Disk consistency error")
2705

    
2706

    
2707
class LUDeactivateInstanceDisks(NoHooksLU):
2708
  """Shutdown an instance's disks.
2709

2710
  """
2711
  _OP_REQP = ["instance_name"]
2712
  REQ_BGL = False
2713

    
2714
  def ExpandNames(self):
2715
    self._ExpandAndLockInstance()
2716
    self.needed_locks[locking.LEVEL_NODE] = []
2717
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2718

    
2719
  def DeclareLocks(self, level):
2720
    if level == locking.LEVEL_NODE:
2721
      self._LockInstancesNodes()
2722

    
2723
  def CheckPrereq(self):
2724
    """Check prerequisites.
2725

2726
    This checks that the instance is in the cluster.
2727

2728
    """
2729
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2730
    assert self.instance is not None, \
2731
      "Cannot retrieve locked instance %s" % self.op.instance_name
2732

    
2733
  def Exec(self, feedback_fn):
2734
    """Deactivate the disks
2735

2736
    """
2737
    instance = self.instance
2738
    _SafeShutdownInstanceDisks(self, instance)
2739

    
2740

    
2741
def _SafeShutdownInstanceDisks(lu, instance):
2742
  """Shutdown block devices of an instance.
2743

2744
  This function checks if an instance is running, before calling
2745
  _ShutdownInstanceDisks.
2746

2747
  """
2748
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2749
                                      [instance.hypervisor])
2750
  ins_l = ins_l[instance.primary_node]
2751
  if ins_l.failed or not isinstance(ins_l.data, list):
2752
    raise errors.OpExecError("Can't contact node '%s'" %
2753
                             instance.primary_node)
2754

    
2755
  if instance.name in ins_l.data:
2756
    raise errors.OpExecError("Instance is running, can't shutdown"
2757
                             " block devices.")
2758

    
2759
  _ShutdownInstanceDisks(lu, instance)
2760

    
2761

    
2762
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2763
  """Shutdown block devices of an instance.
2764

2765
  This does the shutdown on all nodes of the instance.
2766

2767
  If the ignore_primary is false, errors on the primary node are
2768
  ignored.
2769

2770
  """
2771
  all_result = True
2772
  for disk in instance.disks:
2773
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2774
      lu.cfg.SetDiskID(top_disk, node)
2775
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2776
      msg = result.RemoteFailMsg()
2777
      if msg:
2778
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2779
                      disk.iv_name, node, msg)
2780
        if not ignore_primary or node != instance.primary_node:
2781
          all_result = False
2782
  return all_result
2783

    
2784

    
2785
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2786
  """Checks if a node has enough free memory.
2787

2788
  This function check if a given node has the needed amount of free
2789
  memory. In case the node has less memory or we cannot get the
2790
  information from the node, this function raise an OpPrereqError
2791
  exception.
2792

2793
  @type lu: C{LogicalUnit}
2794
  @param lu: a logical unit from which we get configuration data
2795
  @type node: C{str}
2796
  @param node: the node to check
2797
  @type reason: C{str}
2798
  @param reason: string to use in the error message
2799
  @type requested: C{int}
2800
  @param requested: the amount of memory in MiB to check for
2801
  @type hypervisor_name: C{str}
2802
  @param hypervisor_name: the hypervisor to ask for memory stats
2803
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2804
      we cannot check the node
2805

2806
  """
2807
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2808
  nodeinfo[node].Raise()
2809
  free_mem = nodeinfo[node].data.get('memory_free')
2810
  if not isinstance(free_mem, int):
2811
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2812
                             " was '%s'" % (node, free_mem))
2813
  if requested > free_mem:
2814
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2815
                             " needed %s MiB, available %s MiB" %
2816
                             (node, reason, requested, free_mem))
2817

    
2818

    
2819
class LUStartupInstance(LogicalUnit):
2820
  """Starts an instance.
2821

2822
  """
2823
  HPATH = "instance-start"
2824
  HTYPE = constants.HTYPE_INSTANCE
2825
  _OP_REQP = ["instance_name", "force"]
2826
  REQ_BGL = False
2827

    
2828
  def ExpandNames(self):
2829
    self._ExpandAndLockInstance()
2830

    
2831
  def BuildHooksEnv(self):
2832
    """Build hooks env.
2833

2834
    This runs on master, primary and secondary nodes of the instance.
2835

2836
    """
2837
    env = {
2838
      "FORCE": self.op.force,
2839
      }
2840
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2841
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2842
    return env, nl, nl
2843

    
2844
  def CheckPrereq(self):
2845
    """Check prerequisites.
2846

2847
    This checks that the instance is in the cluster.
2848

2849
    """
2850
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2851
    assert self.instance is not None, \
2852
      "Cannot retrieve locked instance %s" % self.op.instance_name
2853

    
2854
    # extra beparams
2855
    self.beparams = getattr(self.op, "beparams", {})
2856
    if self.beparams:
2857
      if not isinstance(self.beparams, dict):
2858
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2859
                                   " dict" % (type(self.beparams), ))
2860
      # fill the beparams dict
2861
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2862
      self.op.beparams = self.beparams
2863

    
2864
    # extra hvparams
2865
    self.hvparams = getattr(self.op, "hvparams", {})
2866
    if self.hvparams:
2867
      if not isinstance(self.hvparams, dict):
2868
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2869
                                   " dict" % (type(self.hvparams), ))
2870

    
2871
      # check hypervisor parameter syntax (locally)
2872
      cluster = self.cfg.GetClusterInfo()
2873
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2874
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2875
                                    instance.hvparams)
2876
      filled_hvp.update(self.hvparams)
2877
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2878
      hv_type.CheckParameterSyntax(filled_hvp)
2879
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2880
      self.op.hvparams = self.hvparams
2881

    
2882
    _CheckNodeOnline(self, instance.primary_node)
2883

    
2884
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2885
    # check bridges existance
2886
    _CheckInstanceBridgesExist(self, instance)
2887

    
2888
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2889
                                              instance.name,
2890
                                              instance.hypervisor)
2891
    remote_info.Raise()
2892
    if not remote_info.data:
2893
      _CheckNodeFreeMemory(self, instance.primary_node,
2894
                           "starting instance %s" % instance.name,
2895
                           bep[constants.BE_MEMORY], instance.hypervisor)
2896

    
2897
  def Exec(self, feedback_fn):
2898
    """Start the instance.
2899

2900
    """
2901
    instance = self.instance
2902
    force = self.op.force
2903

    
2904
    self.cfg.MarkInstanceUp(instance.name)
2905

    
2906
    node_current = instance.primary_node
2907

    
2908
    _StartInstanceDisks(self, instance, force)
2909

    
2910
    result = self.rpc.call_instance_start(node_current, instance,
2911
                                          self.hvparams, self.beparams)
2912
    msg = result.RemoteFailMsg()
2913
    if msg:
2914
      _ShutdownInstanceDisks(self, instance)
2915
      raise errors.OpExecError("Could not start instance: %s" % msg)
2916

    
2917

    
2918
class LURebootInstance(LogicalUnit):
2919
  """Reboot an instance.
2920

2921
  """
2922
  HPATH = "instance-reboot"
2923
  HTYPE = constants.HTYPE_INSTANCE
2924
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2925
  REQ_BGL = False
2926

    
2927
  def ExpandNames(self):
2928
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2929
                                   constants.INSTANCE_REBOOT_HARD,
2930
                                   constants.INSTANCE_REBOOT_FULL]:
2931
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2932
                                  (constants.INSTANCE_REBOOT_SOFT,
2933
                                   constants.INSTANCE_REBOOT_HARD,
2934
                                   constants.INSTANCE_REBOOT_FULL))
2935
    self._ExpandAndLockInstance()
2936

    
2937
  def BuildHooksEnv(self):
2938
    """Build hooks env.
2939

2940
    This runs on master, primary and secondary nodes of the instance.
2941

2942
    """
2943
    env = {
2944
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2945
      "REBOOT_TYPE": self.op.reboot_type,
2946
      }
2947
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2948
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2949
    return env, nl, nl
2950

    
2951
  def CheckPrereq(self):
2952
    """Check prerequisites.
2953

2954
    This checks that the instance is in the cluster.
2955

2956
    """
2957
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2958
    assert self.instance is not None, \
2959
      "Cannot retrieve locked instance %s" % self.op.instance_name
2960

    
2961
    _CheckNodeOnline(self, instance.primary_node)
2962

    
2963
    # check bridges existance
2964
    _CheckInstanceBridgesExist(self, instance)
2965

    
2966
  def Exec(self, feedback_fn):
2967
    """Reboot the instance.
2968

2969
    """
2970
    instance = self.instance
2971
    ignore_secondaries = self.op.ignore_secondaries
2972
    reboot_type = self.op.reboot_type
2973

    
2974
    node_current = instance.primary_node
2975

    
2976
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2977
                       constants.INSTANCE_REBOOT_HARD]:
2978
      for disk in instance.disks:
2979
        self.cfg.SetDiskID(disk, node_current)
2980
      result = self.rpc.call_instance_reboot(node_current, instance,
2981
                                             reboot_type)
2982
      msg = result.RemoteFailMsg()
2983
      if msg:
2984
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2985
    else:
2986
      result = self.rpc.call_instance_shutdown(node_current, instance)
2987
      msg = result.RemoteFailMsg()
2988
      if msg:
2989
        raise errors.OpExecError("Could not shutdown instance for"
2990
                                 " full reboot: %s" % msg)
2991
      _ShutdownInstanceDisks(self, instance)
2992
      _StartInstanceDisks(self, instance, ignore_secondaries)
2993
      result = self.rpc.call_instance_start(node_current, instance, None, None)
2994
      msg = result.RemoteFailMsg()
2995
      if msg:
2996
        _ShutdownInstanceDisks(self, instance)
2997
        raise errors.OpExecError("Could not start instance for"
2998
                                 " full reboot: %s" % msg)
2999

    
3000
    self.cfg.MarkInstanceUp(instance.name)
3001

    
3002

    
3003
class LUShutdownInstance(LogicalUnit):
3004
  """Shutdown an instance.
3005

3006
  """
3007
  HPATH = "instance-stop"
3008
  HTYPE = constants.HTYPE_INSTANCE
3009
  _OP_REQP = ["instance_name"]
3010
  REQ_BGL = False
3011

    
3012
  def ExpandNames(self):
3013
    self._ExpandAndLockInstance()
3014

    
3015
  def BuildHooksEnv(self):
3016
    """Build hooks env.
3017

3018
    This runs on master, primary and secondary nodes of the instance.
3019

3020
    """
3021
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3022
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3023
    return env, nl, nl
3024

    
3025
  def CheckPrereq(self):
3026
    """Check prerequisites.
3027

3028
    This checks that the instance is in the cluster.
3029

3030
    """
3031
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3032
    assert self.instance is not None, \
3033
      "Cannot retrieve locked instance %s" % self.op.instance_name
3034
    _CheckNodeOnline(self, self.instance.primary_node)
3035

    
3036
  def Exec(self, feedback_fn):
3037
    """Shutdown the instance.
3038

3039
    """
3040
    instance = self.instance
3041
    node_current = instance.primary_node
3042
    self.cfg.MarkInstanceDown(instance.name)
3043
    result = self.rpc.call_instance_shutdown(node_current, instance)
3044
    msg = result.RemoteFailMsg()
3045
    if msg:
3046
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3047

    
3048
    _ShutdownInstanceDisks(self, instance)
3049

    
3050

    
3051
class LUReinstallInstance(LogicalUnit):
3052
  """Reinstall an instance.
3053

3054
  """
3055
  HPATH = "instance-reinstall"
3056
  HTYPE = constants.HTYPE_INSTANCE
3057
  _OP_REQP = ["instance_name"]
3058
  REQ_BGL = False
3059

    
3060
  def ExpandNames(self):
3061
    self._ExpandAndLockInstance()
3062

    
3063
  def BuildHooksEnv(self):
3064
    """Build hooks env.
3065

3066
    This runs on master, primary and secondary nodes of the instance.
3067

3068
    """
3069
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3070
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3071
    return env, nl, nl
3072

    
3073
  def CheckPrereq(self):
3074
    """Check prerequisites.
3075

3076
    This checks that the instance is in the cluster and is not running.
3077

3078
    """
3079
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3080
    assert instance is not None, \
3081
      "Cannot retrieve locked instance %s" % self.op.instance_name
3082
    _CheckNodeOnline(self, instance.primary_node)
3083

    
3084
    if instance.disk_template == constants.DT_DISKLESS:
3085
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3086
                                 self.op.instance_name)
3087
    if instance.admin_up:
3088
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3089
                                 self.op.instance_name)
3090
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3091
                                              instance.name,
3092
                                              instance.hypervisor)
3093
    remote_info.Raise()
3094
    if remote_info.data:
3095
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3096
                                 (self.op.instance_name,
3097
                                  instance.primary_node))
3098

    
3099
    self.op.os_type = getattr(self.op, "os_type", None)
3100
    if self.op.os_type is not None:
3101
      # OS verification
3102
      pnode = self.cfg.GetNodeInfo(
3103
        self.cfg.ExpandNodeName(instance.primary_node))
3104
      if pnode is None:
3105
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3106
                                   self.op.pnode)
3107
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3108
      result.Raise()
3109
      if not isinstance(result.data, objects.OS):
3110
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3111
                                   " primary node"  % self.op.os_type)
3112

    
3113
    self.instance = instance
3114

    
3115
  def Exec(self, feedback_fn):
3116
    """Reinstall the instance.
3117

3118
    """
3119
    inst = self.instance
3120

    
3121
    if self.op.os_type is not None:
3122
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3123
      inst.os = self.op.os_type
3124
      self.cfg.Update(inst)
3125

    
3126
    _StartInstanceDisks(self, inst, None)
3127
    try:
3128
      feedback_fn("Running the instance OS create scripts...")
3129
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3130
      msg = result.RemoteFailMsg()
3131
      if msg:
3132
        raise errors.OpExecError("Could not install OS for instance %s"
3133
                                 " on node %s: %s" %
3134
                                 (inst.name, inst.primary_node, msg))
3135
    finally:
3136
      _ShutdownInstanceDisks(self, inst)
3137

    
3138

    
3139
class LURenameInstance(LogicalUnit):
3140
  """Rename an instance.
3141

3142
  """
3143
  HPATH = "instance-rename"
3144
  HTYPE = constants.HTYPE_INSTANCE
3145
  _OP_REQP = ["instance_name", "new_name"]
3146

    
3147
  def BuildHooksEnv(self):
3148
    """Build hooks env.
3149

3150
    This runs on master, primary and secondary nodes of the instance.
3151

3152
    """
3153
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3154
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3155
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3156
    return env, nl, nl
3157

    
3158
  def CheckPrereq(self):
3159
    """Check prerequisites.
3160

3161
    This checks that the instance is in the cluster and is not running.
3162

3163
    """
3164
    instance = self.cfg.GetInstanceInfo(
3165
      self.cfg.ExpandInstanceName(self.op.instance_name))
3166
    if instance is None:
3167
      raise errors.OpPrereqError("Instance '%s' not known" %
3168
                                 self.op.instance_name)
3169
    _CheckNodeOnline(self, instance.primary_node)
3170

    
3171
    if instance.admin_up:
3172
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3173
                                 self.op.instance_name)
3174
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3175
                                              instance.name,
3176
                                              instance.hypervisor)
3177
    remote_info.Raise()
3178
    if remote_info.data:
3179
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3180
                                 (self.op.instance_name,
3181
                                  instance.primary_node))
3182
    self.instance = instance
3183

    
3184
    # new name verification
3185
    name_info = utils.HostInfo(self.op.new_name)
3186

    
3187
    self.op.new_name = new_name = name_info.name
3188
    instance_list = self.cfg.GetInstanceList()
3189
    if new_name in instance_list:
3190
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3191
                                 new_name)
3192

    
3193
    if not getattr(self.op, "ignore_ip", False):
3194
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3195
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3196
                                   (name_info.ip, new_name))
3197

    
3198

    
3199
  def Exec(self, feedback_fn):
3200
    """Reinstall the instance.
3201

3202
    """
3203
    inst = self.instance
3204
    old_name = inst.name
3205

    
3206
    if inst.disk_template == constants.DT_FILE:
3207
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3208

    
3209
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3210
    # Change the instance lock. This is definitely safe while we hold the BGL
3211
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3212
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3213

    
3214
    # re-read the instance from the configuration after rename
3215
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3216

    
3217
    if inst.disk_template == constants.DT_FILE:
3218
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3219
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3220
                                                     old_file_storage_dir,
3221
                                                     new_file_storage_dir)
3222
      result.Raise()
3223
      if not result.data:
3224
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3225
                                 " directory '%s' to '%s' (but the instance"
3226
                                 " has been renamed in Ganeti)" % (
3227
                                 inst.primary_node, old_file_storage_dir,
3228
                                 new_file_storage_dir))
3229

    
3230
      if not result.data[0]:
3231
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3232
                                 " (but the instance has been renamed in"
3233
                                 " Ganeti)" % (old_file_storage_dir,
3234
                                               new_file_storage_dir))
3235

    
3236
    _StartInstanceDisks(self, inst, None)
3237
    try:
3238
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3239
                                                 old_name)
3240
      msg = result.RemoteFailMsg()
3241
      if msg:
3242
        msg = ("Could not run OS rename script for instance %s on node %s"
3243
               " (but the instance has been renamed in Ganeti): %s" %
3244
               (inst.name, inst.primary_node, msg))
3245
        self.proc.LogWarning(msg)
3246
    finally:
3247
      _ShutdownInstanceDisks(self, inst)
3248

    
3249

    
3250
class LURemoveInstance(LogicalUnit):
3251
  """Remove an instance.
3252

3253
  """
3254
  HPATH = "instance-remove"
3255
  HTYPE = constants.HTYPE_INSTANCE
3256
  _OP_REQP = ["instance_name", "ignore_failures"]
3257
  REQ_BGL = False
3258

    
3259
  def ExpandNames(self):
3260
    self._ExpandAndLockInstance()
3261
    self.needed_locks[locking.LEVEL_NODE] = []
3262
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3263

    
3264
  def DeclareLocks(self, level):
3265
    if level == locking.LEVEL_NODE:
3266
      self._LockInstancesNodes()
3267

    
3268
  def BuildHooksEnv(self):
3269
    """Build hooks env.
3270

3271
    This runs on master, primary and secondary nodes of the instance.
3272

3273
    """
3274
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3275
    nl = [self.cfg.GetMasterNode()]
3276
    return env, nl, nl
3277

    
3278
  def CheckPrereq(self):
3279
    """Check prerequisites.
3280

3281
    This checks that the instance is in the cluster.
3282

3283
    """
3284
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3285
    assert self.instance is not None, \
3286
      "Cannot retrieve locked instance %s" % self.op.instance_name
3287

    
3288
  def Exec(self, feedback_fn):
3289
    """Remove the instance.
3290

3291
    """
3292
    instance = self.instance
3293
    logging.info("Shutting down instance %s on node %s",
3294
                 instance.name, instance.primary_node)
3295

    
3296
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3297
    msg = result.RemoteFailMsg()
3298
    if msg:
3299
      if self.op.ignore_failures:
3300
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3301
      else:
3302
        raise errors.OpExecError("Could not shutdown instance %s on"
3303
                                 " node %s: %s" %
3304
                                 (instance.name, instance.primary_node, msg))
3305

    
3306
    logging.info("Removing block devices for instance %s", instance.name)
3307

    
3308
    if not _RemoveDisks(self, instance):
3309
      if self.op.ignore_failures:
3310
        feedback_fn("Warning: can't remove instance's disks")
3311
      else:
3312
        raise errors.OpExecError("Can't remove instance's disks")
3313

    
3314
    logging.info("Removing instance %s out of cluster config", instance.name)
3315

    
3316
    self.cfg.RemoveInstance(instance.name)
3317
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3318

    
3319

    
3320
class LUQueryInstances(NoHooksLU):
3321
  """Logical unit for querying instances.
3322

3323
  """
3324
  _OP_REQP = ["output_fields", "names", "use_locking"]
3325
  REQ_BGL = False
3326
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3327
                                    "admin_state",
3328
                                    "disk_template", "ip", "mac", "bridge",
3329
                                    "sda_size", "sdb_size", "vcpus", "tags",
3330
                                    "network_port", "beparams",
3331
                                    r"(disk)\.(size)/([0-9]+)",
3332
                                    r"(disk)\.(sizes)", "disk_usage",
3333
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3334
                                    r"(nic)\.(macs|ips|bridges)",
3335
                                    r"(disk|nic)\.(count)",
3336
                                    "serial_no", "hypervisor", "hvparams",] +
3337
                                  ["hv/%s" % name
3338
                                   for name in constants.HVS_PARAMETERS] +
3339
                                  ["be/%s" % name
3340
                                   for name in constants.BES_PARAMETERS])
3341
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3342

    
3343

    
3344
  def ExpandNames(self):
3345
    _CheckOutputFields(static=self._FIELDS_STATIC,
3346
                       dynamic=self._FIELDS_DYNAMIC,
3347
                       selected=self.op.output_fields)
3348

    
3349
    self.needed_locks = {}
3350
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3351
    self.share_locks[locking.LEVEL_NODE] = 1
3352

    
3353
    if self.op.names:
3354
      self.wanted = _GetWantedInstances(self, self.op.names)
3355
    else:
3356
      self.wanted = locking.ALL_SET
3357

    
3358
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3359
    self.do_locking = self.do_node_query and self.op.use_locking
3360
    if self.do_locking:
3361
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3362
      self.needed_locks[locking.LEVEL_NODE] = []
3363
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3364

    
3365
  def DeclareLocks(self, level):
3366
    if level == locking.LEVEL_NODE and self.do_locking:
3367
      self._LockInstancesNodes()
3368

    
3369
  def CheckPrereq(self):
3370
    """Check prerequisites.
3371

3372
    """
3373
    pass
3374

    
3375
  def Exec(self, feedback_fn):
3376
    """Computes the list of nodes and their attributes.
3377

3378
    """
3379
    all_info = self.cfg.GetAllInstancesInfo()
3380
    if self.wanted == locking.ALL_SET:
3381
      # caller didn't specify instance names, so ordering is not important
3382
      if self.do_locking:
3383
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3384
      else:
3385
        instance_names = all_info.keys()
3386
      instance_names = utils.NiceSort(instance_names)
3387
    else:
3388
      # caller did specify names, so we must keep the ordering
3389
      if self.do_locking:
3390
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3391
      else:
3392
        tgt_set = all_info.keys()
3393
      missing = set(self.wanted).difference(tgt_set)
3394
      if missing:
3395
        raise errors.OpExecError("Some instances were removed before"
3396
                                 " retrieving their data: %s" % missing)
3397
      instance_names = self.wanted
3398

    
3399
    instance_list = [all_info[iname] for iname in instance_names]
3400

    
3401
    # begin data gathering
3402

    
3403
    nodes = frozenset([inst.primary_node for inst in instance_list])
3404
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3405

    
3406
    bad_nodes = []
3407
    off_nodes = []
3408
    if self.do_node_query:
3409
      live_data = {}
3410
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3411
      for name in nodes:
3412
        result = node_data[name]
3413
        if result.offline:
3414
          # offline nodes will be in both lists
3415
          off_nodes.append(name)
3416
        if result.failed:
3417
          bad_nodes.append(name)
3418
        else:
3419
          if result.data:
3420
            live_data.update(result.data)
3421
            # else no instance is alive
3422
    else:
3423
      live_data = dict([(name, {}) for name in instance_names])
3424

    
3425
    # end data gathering
3426

    
3427
    HVPREFIX = "hv/"
3428
    BEPREFIX = "be/"
3429
    output = []
3430
    for instance in instance_list:
3431
      iout = []
3432
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3433
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3434
      for field in self.op.output_fields:
3435
        st_match = self._FIELDS_STATIC.Matches(field)
3436
        if field == "name":
3437
          val = instance.name
3438
        elif field == "os":
3439
          val = instance.os
3440
        elif field == "pnode":
3441
          val = instance.primary_node
3442
        elif field == "snodes":
3443
          val = list(instance.secondary_nodes)
3444
        elif field == "admin_state":
3445
          val = instance.admin_up
3446
        elif field == "oper_state":
3447
          if instance.primary_node in bad_nodes:
3448
            val = None
3449
          else:
3450
            val = bool(live_data.get(instance.name))
3451
        elif field == "status":
3452
          if instance.primary_node in off_nodes:
3453
            val = "ERROR_nodeoffline"
3454
          elif instance.primary_node in bad_nodes:
3455
            val = "ERROR_nodedown"
3456
          else:
3457
            running = bool(live_data.get(instance.name))
3458
            if running:
3459
              if instance.admin_up:
3460
                val = "running"
3461
              else:
3462
                val = "ERROR_up"
3463
            else:
3464
              if instance.admin_up:
3465
                val = "ERROR_down"
3466
              else:
3467
                val = "ADMIN_down"
3468
        elif field == "oper_ram":
3469
          if instance.primary_node in bad_nodes:
3470
            val = None
3471
          elif instance.name in live_data:
3472
            val = live_data[instance.name].get("memory", "?")
3473
          else:
3474
            val = "-"
3475
        elif field == "disk_template":
3476
          val = instance.disk_template
3477
        elif field == "ip":
3478
          val = instance.nics[0].ip
3479
        elif field == "bridge":
3480
          val = instance.nics[0].bridge
3481
        elif field == "mac":
3482
          val = instance.nics[0].mac
3483
        elif field == "sda_size" or field == "sdb_size":
3484
          idx = ord(field[2]) - ord('a')
3485
          try:
3486
            val = instance.FindDisk(idx).size
3487
          except errors.OpPrereqError:
3488
            val = None
3489
        elif field == "disk_usage": # total disk usage per node
3490
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3491
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3492
        elif field == "tags":
3493
          val = list(instance.GetTags())
3494
        elif field == "serial_no":
3495
          val = instance.serial_no
3496
        elif field == "network_port":
3497
          val = instance.network_port
3498
        elif field == "hypervisor":
3499
          val = instance.hypervisor
3500
        elif field == "hvparams":
3501
          val = i_hv
3502
        elif (field.startswith(HVPREFIX) and
3503
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3504
          val = i_hv.get(field[len(HVPREFIX):], None)
3505
        elif field == "beparams":
3506
          val = i_be
3507
        elif (field.startswith(BEPREFIX) and
3508
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3509
          val = i_be.get(field[len(BEPREFIX):], None)
3510
        elif st_match and st_match.groups():
3511
          # matches a variable list
3512
          st_groups = st_match.groups()
3513
          if st_groups and st_groups[0] == "disk":
3514
            if st_groups[1] == "count":
3515
              val = len(instance.disks)
3516
            elif st_groups[1] == "sizes":
3517
              val = [disk.size for disk in instance.disks]
3518
            elif st_groups[1] == "size":
3519
              try:
3520
                val = instance.FindDisk(st_groups[2]).size
3521
              except errors.OpPrereqError:
3522
                val = None
3523
            else:
3524
              assert False, "Unhandled disk parameter"
3525
          elif st_groups[0] == "nic":
3526
            if st_groups[1] == "count":
3527
              val = len(instance.nics)
3528
            elif st_groups[1] == "macs":
3529
              val = [nic.mac for nic in instance.nics]
3530
            elif st_groups[1] == "ips":
3531
              val = [nic.ip for nic in instance.nics]
3532
            elif st_groups[1] == "bridges":
3533
              val = [nic.bridge for nic in instance.nics]
3534
            else:
3535
              # index-based item
3536
              nic_idx = int(st_groups[2])
3537
              if nic_idx >= len(instance.nics):
3538
                val = None
3539
              else:
3540
                if st_groups[1] == "mac":
3541
                  val = instance.nics[nic_idx].mac
3542
                elif st_groups[1] == "ip":
3543
                  val = instance.nics[nic_idx].ip
3544
                elif st_groups[1] == "bridge":
3545
                  val = instance.nics[nic_idx].bridge
3546
                else:
3547
                  assert False, "Unhandled NIC parameter"
3548
          else:
3549
            assert False, "Unhandled variable parameter"
3550
        else:
3551
          raise errors.ParameterError(field)
3552
        iout.append(val)
3553
      output.append(iout)
3554

    
3555
    return output
3556

    
3557

    
3558
class LUFailoverInstance(LogicalUnit):
3559
  """Failover an instance.
3560

3561
  """
3562
  HPATH = "instance-failover"
3563
  HTYPE = constants.HTYPE_INSTANCE
3564
  _OP_REQP = ["instance_name", "ignore_consistency"]
3565
  REQ_BGL = False
3566

    
3567
  def ExpandNames(self):
3568
    self._ExpandAndLockInstance()
3569
    self.needed_locks[locking.LEVEL_NODE] = []
3570
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3571

    
3572
  def DeclareLocks(self, level):
3573
    if level == locking.LEVEL_NODE:
3574
      self._LockInstancesNodes()
3575

    
3576
  def BuildHooksEnv(self):
3577
    """Build hooks env.
3578

3579
    This runs on master, primary and secondary nodes of the instance.
3580

3581
    """
3582
    env = {
3583
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3584
      }
3585
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3586
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3587
    return env, nl, nl
3588

    
3589
  def CheckPrereq(self):
3590
    """Check prerequisites.
3591

3592
    This checks that the instance is in the cluster.
3593

3594
    """
3595
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3596
    assert self.instance is not None, \
3597
      "Cannot retrieve locked instance %s" % self.op.instance_name
3598

    
3599
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3600
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3601
      raise errors.OpPrereqError("Instance's disk layout is not"
3602
                                 " network mirrored, cannot failover.")
3603

    
3604
    secondary_nodes = instance.secondary_nodes
3605
    if not secondary_nodes:
3606
      raise errors.ProgrammerError("no secondary node but using "
3607
                                   "a mirrored disk template")
3608

    
3609
    target_node = secondary_nodes[0]
3610
    _CheckNodeOnline(self, target_node)
3611
    _CheckNodeNotDrained(self, target_node)
3612
    # check memory requirements on the secondary node
3613
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3614
                         instance.name, bep[constants.BE_MEMORY],
3615
                         instance.hypervisor)
3616
    # check bridge existance
3617
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3618

    
3619
  def Exec(self, feedback_fn):
3620
    """Failover an instance.
3621

3622
    The failover is done by shutting it down on its present node and
3623
    starting it on the secondary.
3624

3625
    """
3626
    instance = self.instance
3627

    
3628
    source_node = instance.primary_node
3629
    target_node = instance.secondary_nodes[0]
3630

    
3631
    feedback_fn("* checking disk consistency between source and target")
3632
    for dev in instance.disks:
3633
      # for drbd, these are drbd over lvm
3634
      if not _CheckDiskConsistency(self, dev, target_node, False):
3635
        if instance.admin_up and not self.op.ignore_consistency:
3636
          raise errors.OpExecError("Disk %s is degraded on target node,"
3637
                                   " aborting failover." % dev.iv_name)
3638

    
3639
    feedback_fn("* shutting down instance on source node")
3640
    logging.info("Shutting down instance %s on node %s",
3641
                 instance.name, source_node)
3642

    
3643
    result = self.rpc.call_instance_shutdown(source_node, instance)
3644
    msg = result.RemoteFailMsg()
3645
    if msg:
3646
      if self.op.ignore_consistency:
3647
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3648
                             " Proceeding anyway. Please make sure node"
3649
                             " %s is down. Error details: %s",
3650
                             instance.name, source_node, source_node, msg)
3651
      else:
3652
        raise errors.OpExecError("Could not shutdown instance %s on"
3653
                                 " node %s: %s" %
3654
                                 (instance.name, source_node, msg))
3655

    
3656
    feedback_fn("* deactivating the instance's disks on source node")
3657
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3658
      raise errors.OpExecError("Can't shut down the instance's disks.")
3659

    
3660
    instance.primary_node = target_node
3661
    # distribute new instance config to the other nodes
3662
    self.cfg.Update(instance)
3663

    
3664
    # Only start the instance if it's marked as up
3665
    if instance.admin_up:
3666
      feedback_fn("* activating the instance's disks on target node")
3667
      logging.info("Starting instance %s on node %s",
3668
                   instance.name, target_node)
3669

    
3670
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3671
                                               ignore_secondaries=True)
3672
      if not disks_ok:
3673
        _ShutdownInstanceDisks(self, instance)
3674
        raise errors.OpExecError("Can't activate the instance's disks")
3675

    
3676
      feedback_fn("* starting the instance on the target node")
3677
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3678
      msg = result.RemoteFailMsg()
3679
      if msg:
3680
        _ShutdownInstanceDisks(self, instance)
3681
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3682
                                 (instance.name, target_node, msg))
3683

    
3684

    
3685
class LUMigrateInstance(LogicalUnit):
3686
  """Migrate an instance.
3687

3688
  This is migration without shutting down, compared to the failover,
3689
  which is done with shutdown.
3690

3691
  """
3692
  HPATH = "instance-migrate"
3693
  HTYPE = constants.HTYPE_INSTANCE
3694
  _OP_REQP = ["instance_name", "live", "cleanup"]
3695

    
3696
  REQ_BGL = False
3697

    
3698
  def ExpandNames(self):
3699
    self._ExpandAndLockInstance()
3700
    self.needed_locks[locking.LEVEL_NODE] = []
3701
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3702

    
3703
  def DeclareLocks(self, level):
3704
    if level == locking.LEVEL_NODE:
3705
      self._LockInstancesNodes()
3706

    
3707
  def BuildHooksEnv(self):
3708
    """Build hooks env.
3709

3710
    This runs on master, primary and secondary nodes of the instance.
3711

3712
    """
3713
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3714
    env["MIGRATE_LIVE"] = self.op.live
3715
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3716
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3717
    return env, nl, nl
3718

    
3719
  def CheckPrereq(self):
3720
    """Check prerequisites.
3721

3722
    This checks that the instance is in the cluster.
3723

3724
    """
3725
    instance = self.cfg.GetInstanceInfo(
3726
      self.cfg.ExpandInstanceName(self.op.instance_name))
3727
    if instance is None:
3728
      raise errors.OpPrereqError("Instance '%s' not known" %
3729
                                 self.op.instance_name)
3730

    
3731
    if instance.disk_template != constants.DT_DRBD8:
3732
      raise errors.OpPrereqError("Instance's disk layout is not"
3733
                                 " drbd8, cannot migrate.")
3734

    
3735
    secondary_nodes = instance.secondary_nodes
3736
    if not secondary_nodes:
3737
      raise errors.ConfigurationError("No secondary node but using"
3738
                                      " drbd8 disk template")
3739

    
3740
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3741

    
3742
    target_node = secondary_nodes[0]
3743
    # check memory requirements on the secondary node
3744
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3745
                         instance.name, i_be[constants.BE_MEMORY],
3746
                         instance.hypervisor)
3747

    
3748
    # check bridge existance
3749
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3750

    
3751
    if not self.op.cleanup:
3752
      _CheckNodeNotDrained(self, target_node)
3753
      result = self.rpc.call_instance_migratable(instance.primary_node,
3754
                                                 instance)
3755
      msg = result.RemoteFailMsg()
3756
      if msg:
3757
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3758
                                   msg)
3759

    
3760
    self.instance = instance
3761

    
3762
  def _WaitUntilSync(self):
3763
    """Poll with custom rpc for disk sync.
3764

3765
    This uses our own step-based rpc call.
3766

3767
    """
3768
    self.feedback_fn("* wait until resync is done")
3769
    all_done = False
3770
    while not all_done:
3771
      all_done = True
3772
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3773
                                            self.nodes_ip,
3774
                                            self.instance.disks)
3775
      min_percent = 100
3776
      for node, nres in result.items():
3777
        msg = nres.RemoteFailMsg()
3778
        if msg:
3779
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3780
                                   (node, msg))
3781
        node_done, node_percent = nres.payload
3782
        all_done = all_done and node_done
3783
        if node_percent is not None:
3784
          min_percent = min(min_percent, node_percent)
3785
      if not all_done:
3786
        if min_percent < 100:
3787
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3788
        time.sleep(2)
3789

    
3790
  def _EnsureSecondary(self, node):
3791
    """Demote a node to secondary.
3792

3793
    """
3794
    self.feedback_fn("* switching node %s to secondary mode" % node)
3795

    
3796
    for dev in self.instance.disks:
3797
      self.cfg.SetDiskID(dev, node)
3798

    
3799
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3800
                                          self.instance.disks)
3801
    msg = result.RemoteFailMsg()
3802
    if msg:
3803
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3804
                               " error %s" % (node, msg))
3805

    
3806
  def _GoStandalone(self):
3807
    """Disconnect from the network.
3808

3809
    """
3810
    self.feedback_fn("* changing into standalone mode")
3811
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3812
                                               self.instance.disks)
3813
    for node, nres in result.items():
3814
      msg = nres.RemoteFailMsg()
3815
      if msg:
3816
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3817
                                 " error %s" % (node, msg))
3818

    
3819
  def _GoReconnect(self, multimaster):
3820
    """Reconnect to the network.
3821

3822
    """
3823
    if multimaster:
3824
      msg = "dual-master"
3825
    else:
3826
      msg = "single-master"
3827
    self.feedback_fn("* changing disks into %s mode" % msg)
3828
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3829
                                           self.instance.disks,
3830
                                           self.instance.name, multimaster)
3831
    for node, nres in result.items():
3832
      msg = nres.RemoteFailMsg()
3833
      if msg:
3834
        raise errors.OpExecError("Cannot change disks config on node %s,"
3835
                                 " error: %s" % (node, msg))
3836

    
3837
  def _ExecCleanup(self):
3838
    """Try to cleanup after a failed migration.
3839

3840
    The cleanup is done by:
3841
      - check that the instance is running only on one node
3842
        (and update the config if needed)
3843
      - change disks on its secondary node to secondary
3844
      - wait until disks are fully synchronized
3845
      - disconnect from the network
3846
      - change disks into single-master mode
3847
      - wait again until disks are fully synchronized
3848

3849
    """
3850
    instance = self.instance
3851
    target_node = self.target_node
3852
    source_node = self.source_node
3853

    
3854
    # check running on only one node
3855
    self.feedback_fn("* checking where the instance actually runs"
3856
                     " (if this hangs, the hypervisor might be in"
3857
                     " a bad state)")
3858
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3859
    for node, result in ins_l.items():
3860
      result.Raise()
3861
      if not isinstance(result.data, list):
3862
        raise errors.OpExecError("Can't contact node '%s'" % node)
3863

    
3864
    runningon_source = instance.name in ins_l[source_node].data
3865
    runningon_target = instance.name in ins_l[target_node].data
3866

    
3867
    if runningon_source and runningon_target:
3868
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3869
                               " or the hypervisor is confused. You will have"
3870
                               " to ensure manually that it runs only on one"
3871
                               " and restart this operation.")
3872

    
3873
    if not (runningon_source or runningon_target):
3874
      raise errors.OpExecError("Instance does not seem to be running at all."
3875
                               " In this case, it's safer to repair by"
3876
                               " running 'gnt-instance stop' to ensure disk"
3877
                               " shutdown, and then restarting it.")
3878

    
3879
    if runningon_target:
3880
      # the migration has actually succeeded, we need to update the config
3881
      self.feedback_fn("* instance running on secondary node (%s),"
3882
                       " updating config" % target_node)
3883
      instance.primary_node = target_node
3884
      self.cfg.Update(instance)
3885
      demoted_node = source_node
3886
    else:
3887
      self.feedback_fn("* instance confirmed to be running on its"
3888
                       " primary node (%s)" % source_node)
3889
      demoted_node = target_node
3890

    
3891
    self._EnsureSecondary(demoted_node)
3892
    try:
3893
      self._WaitUntilSync()
3894
    except errors.OpExecError:
3895
      # we ignore here errors, since if the device is standalone, it
3896
      # won't be able to sync
3897
      pass
3898
    self._GoStandalone()
3899
    self._GoReconnect(False)
3900
    self._WaitUntilSync()
3901

    
3902
    self.feedback_fn("* done")
3903

    
3904
  def _RevertDiskStatus(self):
3905
    """Try to revert the disk status after a failed migration.
3906

3907
    """
3908
    target_node = self.target_node
3909
    try:
3910
      self._EnsureSecondary(target_node)
3911
      self._GoStandalone()
3912
      self._GoReconnect(False)
3913
      self._WaitUntilSync()
3914
    except errors.OpExecError, err:
3915
      self.LogWarning("Migration failed and I can't reconnect the"
3916
                      " drives: error '%s'\n"
3917
                      "Please look and recover the instance status" %
3918
                      str(err))
3919

    
3920
  def _AbortMigration(self):
3921
    """Call the hypervisor code to abort a started migration.
3922

3923
    """
3924
    instance = self.instance
3925
    target_node = self.target_node
3926
    migration_info = self.migration_info
3927

    
3928
    abort_result = self.rpc.call_finalize_migration(target_node,
3929
                                                    instance,
3930
                                                    migration_info,
3931
                                                    False)
3932
    abort_msg = abort_result.RemoteFailMsg()
3933
    if abort_msg:
3934
      logging.error("Aborting migration failed on target node %s: %s" %
3935
                    (target_node, abort_msg))
3936
      # Don't raise an exception here, as we stil have to try to revert the
3937
      # disk status, even if this step failed.
3938

    
3939
  def _ExecMigration(self):
3940
    """Migrate an instance.
3941

3942
    The migrate is done by:
3943
      - change the disks into dual-master mode
3944
      - wait until disks are fully synchronized again
3945
      - migrate the instance
3946
      - change disks on the new secondary node (the old primary) to secondary
3947
      - wait until disks are fully synchronized
3948
      - change disks into single-master mode
3949

3950
    """
3951
    instance = self.instance
3952
    target_node = self.target_node
3953
    source_node = self.source_node
3954

    
3955
    self.feedback_fn("* checking disk consistency between source and target")
3956
    for dev in instance.disks:
3957
      if not _CheckDiskConsistency(self, dev, target_node, False):
3958
        raise errors.OpExecError("Disk %s is degraded or not fully"
3959
                                 " synchronized on target node,"
3960
                                 " aborting migrate." % dev.iv_name)
3961

    
3962
    # First get the migration information from the remote node
3963
    result = self.rpc.call_migration_info(source_node, instance)
3964
    msg = result.RemoteFailMsg()
3965
    if msg:
3966
      log_err = ("Failed fetching source migration information from %s: %s" %
3967
                 (source_node, msg))
3968
      logging.error(log_err)
3969
      raise errors.OpExecError(log_err)
3970

    
3971
    self.migration_info = migration_info = result.payload
3972

    
3973
    # Then switch the disks to master/master mode
3974
    self._EnsureSecondary(target_node)
3975
    self._GoStandalone()
3976
    self._GoReconnect(True)
3977
    self._WaitUntilSync()
3978

    
3979
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3980
    result = self.rpc.call_accept_instance(target_node,
3981
                                           instance,
3982
                                           migration_info,
3983
                                           self.nodes_ip[target_node])
3984

    
3985
    msg = result.RemoteFailMsg()
3986
    if msg:
3987
      logging.error("Instance pre-migration failed, trying to revert"
3988
                    " disk status: %s", msg)
3989
      self._AbortMigration()
3990
      self._RevertDiskStatus()
3991
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3992
                               (instance.name, msg))
3993

    
3994
    self.feedback_fn("* migrating instance to %s" % target_node)
3995
    time.sleep(10)
3996
    result = self.rpc.call_instance_migrate(source_node, instance,
3997
                                            self.nodes_ip[target_node],
3998
                                            self.op.live)
3999
    msg = result.RemoteFailMsg()
4000
    if msg:
4001
      logging.error("Instance migration failed, trying to revert"
4002
                    " disk status: %s", msg)
4003
      self._AbortMigration()
4004
      self._RevertDiskStatus()
4005
      raise errors.OpExecError("Could not migrate instance %s: %s" %
4006
                               (instance.name, msg))
4007
    time.sleep(10)
4008

    
4009
    instance.primary_node = target_node