Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 7c4d6c7b

History | View | Annotate | Download (246.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq
51
    - implement Exec
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_BGL = True
64

    
65
  def __init__(self, processor, op, context, rpc):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overridden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = context.cfg
75
    self.context = context
76
    self.rpc = rpc
77
    # Dicts used to declare locking needs to mcpu
78
    self.needed_locks = None
79
    self.acquired_locks = {}
80
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
81
    self.add_locks = {}
82
    self.remove_locks = {}
83
    # Used to force good behavior when calling helper functions
84
    self.recalculate_locks = {}
85
    self.__ssh = None
86
    # logging
87
    self.LogWarning = processor.LogWarning
88
    self.LogInfo = processor.LogInfo
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95
    self.CheckArguments()
96

    
97
  def __GetSSH(self):
98
    """Returns the SshRunner object
99

100
    """
101
    if not self.__ssh:
102
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
103
    return self.__ssh
104

    
105
  ssh = property(fget=__GetSSH)
106

    
107
  def CheckArguments(self):
108
    """Check syntactic validity for the opcode arguments.
109

110
    This method is for doing a simple syntactic check and ensure
111
    validity of opcode parameters, without any cluster-related
112
    checks. While the same can be accomplished in ExpandNames and/or
113
    CheckPrereq, doing these separate is better because:
114

115
      - ExpandNames is left as as purely a lock-related function
116
      - CheckPrereq is run after we have acquired locks (and possible
117
        waited for them)
118

119
    The function is allowed to change the self.op attribute so that
120
    later methods can no longer worry about missing parameters.
121

122
    """
123
    pass
124

    
125
  def ExpandNames(self):
126
    """Expand names for this LU.
127

128
    This method is called before starting to execute the opcode, and it should
129
    update all the parameters of the opcode to their canonical form (e.g. a
130
    short node name must be fully expanded after this method has successfully
131
    completed). This way locking, hooks, logging, ecc. can work correctly.
132

133
    LUs which implement this method must also populate the self.needed_locks
134
    member, as a dict with lock levels as keys, and a list of needed lock names
135
    as values. Rules:
136

137
      - use an empty dict if you don't need any lock
138
      - if you don't need any lock at a particular level omit that level
139
      - don't put anything for the BGL level
140
      - if you want all locks at a level use locking.ALL_SET as a value
141

142
    If you need to share locks (rather than acquire them exclusively) at one
143
    level you can modify self.share_locks, setting a true value (usually 1) for
144
    that level. By default locks are not shared.
145

146
    Examples::
147

148
      # Acquire all nodes and one instance
149
      self.needed_locks = {
150
        locking.LEVEL_NODE: locking.ALL_SET,
151
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
152
      }
153
      # Acquire just two nodes
154
      self.needed_locks = {
155
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
156
      }
157
      # Acquire no locks
158
      self.needed_locks = {} # No, you can't leave it to the default value None
159

160
    """
161
    # The implementation of this method is mandatory only if the new LU is
162
    # concurrent, so that old LUs don't need to be changed all at the same
163
    # time.
164
    if self.REQ_BGL:
165
      self.needed_locks = {} # Exclusive LUs don't need locks.
166
    else:
167
      raise NotImplementedError
168

    
169
  def DeclareLocks(self, level):
170
    """Declare LU locking needs for a level
171

172
    While most LUs can just declare their locking needs at ExpandNames time,
173
    sometimes there's the need to calculate some locks after having acquired
174
    the ones before. This function is called just before acquiring locks at a
175
    particular level, but after acquiring the ones at lower levels, and permits
176
    such calculations. It can be used to modify self.needed_locks, and by
177
    default it does nothing.
178

179
    This function is only called if you have something already set in
180
    self.needed_locks for the level.
181

182
    @param level: Locking level which is going to be locked
183
    @type level: member of ganeti.locking.LEVELS
184

185
    """
186

    
187
  def CheckPrereq(self):
188
    """Check prerequisites for this LU.
189

190
    This method should check that the prerequisites for the execution
191
    of this LU are fulfilled. It can do internode communication, but
192
    it should be idempotent - no cluster or system changes are
193
    allowed.
194

195
    The method should raise errors.OpPrereqError in case something is
196
    not fulfilled. Its return value is ignored.
197

198
    This method should also update all the parameters of the opcode to
199
    their canonical form if it hasn't been done by ExpandNames before.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def Exec(self, feedback_fn):
205
    """Execute the LU.
206

207
    This method should implement the actual work. It should raise
208
    errors.OpExecError for failures that are somewhat dealt with in
209
    code, or expected.
210

211
    """
212
    raise NotImplementedError
213

    
214
  def BuildHooksEnv(self):
215
    """Build hooks environment for this LU.
216

217
    This method should return a three-node tuple consisting of: a dict
218
    containing the environment that will be used for running the
219
    specific hook for this LU, a list of node names on which the hook
220
    should run before the execution, and a list of node names on which
221
    the hook should run after the execution.
222

223
    The keys of the dict must not have 'GANETI_' prefixed as this will
224
    be handled in the hooks runner. Also note additional keys will be
225
    added by the hooks runner. If the LU doesn't define any
226
    environment, an empty dict (and not None) should be returned.
227

228
    No nodes should be returned as an empty list (and not None).
229

230
    Note that if the HPATH for a LU class is None, this function will
231
    not be called.
232

233
    """
234
    raise NotImplementedError
235

    
236
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
237
    """Notify the LU about the results of its hooks.
238

239
    This method is called every time a hooks phase is executed, and notifies
240
    the Logical Unit about the hooks' result. The LU can then use it to alter
241
    its result based on the hooks.  By default the method does nothing and the
242
    previous result is passed back unchanged but any LU can define it if it
243
    wants to use the local cluster hook-scripts somehow.
244

245
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
246
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
247
    @param hook_results: the results of the multi-node hooks rpc call
248
    @param feedback_fn: function used send feedback back to the caller
249
    @param lu_result: the previous Exec result this LU had, or None
250
        in the PRE phase
251
    @return: the new Exec result, based on the previous result
252
        and hook results
253

254
    """
255
    return lu_result
256

    
257
  def _ExpandAndLockInstance(self):
258
    """Helper function to expand and lock an instance.
259

260
    Many LUs that work on an instance take its name in self.op.instance_name
261
    and need to expand it and then declare the expanded name for locking. This
262
    function does it, and then updates self.op.instance_name to the expanded
263
    name. It also initializes needed_locks as a dict, if this hasn't been done
264
    before.
265

266
    """
267
    if self.needed_locks is None:
268
      self.needed_locks = {}
269
    else:
270
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
271
        "_ExpandAndLockInstance called with instance-level locks set"
272
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
273
    if expanded_name is None:
274
      raise errors.OpPrereqError("Instance '%s' not known" %
275
                                  self.op.instance_name)
276
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
277
    self.op.instance_name = expanded_name
278

    
279
  def _LockInstancesNodes(self, primary_only=False):
280
    """Helper function to declare instances' nodes for locking.
281

282
    This function should be called after locking one or more instances to lock
283
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
284
    with all primary or secondary nodes for instances already locked and
285
    present in self.needed_locks[locking.LEVEL_INSTANCE].
286

287
    It should be called from DeclareLocks, and for safety only works if
288
    self.recalculate_locks[locking.LEVEL_NODE] is set.
289

290
    In the future it may grow parameters to just lock some instance's nodes, or
291
    to just lock primaries or secondary nodes, if needed.
292

293
    If should be called in DeclareLocks in a way similar to::
294

295
      if level == locking.LEVEL_NODE:
296
        self._LockInstancesNodes()
297

298
    @type primary_only: boolean
299
    @param primary_only: only lock primary nodes of locked instances
300

301
    """
302
    assert locking.LEVEL_NODE in self.recalculate_locks, \
303
      "_LockInstancesNodes helper function called with no nodes to recalculate"
304

    
305
    # TODO: check if we're really been called with the instance locks held
306

    
307
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
308
    # future we might want to have different behaviors depending on the value
309
    # of self.recalculate_locks[locking.LEVEL_NODE]
310
    wanted_nodes = []
311
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
312
      instance = self.context.cfg.GetInstanceInfo(instance_name)
313
      wanted_nodes.append(instance.primary_node)
314
      if not primary_only:
315
        wanted_nodes.extend(instance.secondary_nodes)
316

    
317
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
318
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
319
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
320
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
321

    
322
    del self.recalculate_locks[locking.LEVEL_NODE]
323

    
324

    
325
class NoHooksLU(LogicalUnit):
326
  """Simple LU which runs no hooks.
327

328
  This LU is intended as a parent for other LogicalUnits which will
329
  run no hooks, in order to reduce duplicate code.
330

331
  """
332
  HPATH = None
333
  HTYPE = None
334

    
335

    
336
def _GetWantedNodes(lu, nodes):
337
  """Returns list of checked and expanded node names.
338

339
  @type lu: L{LogicalUnit}
340
  @param lu: the logical unit on whose behalf we execute
341
  @type nodes: list
342
  @param nodes: list of node names or None for all nodes
343
  @rtype: list
344
  @return: the list of nodes, sorted
345
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
346

347
  """
348
  if not isinstance(nodes, list):
349
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
350

    
351
  if not nodes:
352
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
353
      " non-empty list of nodes whose name is to be expanded.")
354

    
355
  wanted = []
356
  for name in nodes:
357
    node = lu.cfg.ExpandNodeName(name)
358
    if node is None:
359
      raise errors.OpPrereqError("No such node name '%s'" % name)
360
    wanted.append(node)
361

    
362
  return utils.NiceSort(wanted)
363

    
364

    
365
def _GetWantedInstances(lu, instances):
366
  """Returns list of checked and expanded instance names.
367

368
  @type lu: L{LogicalUnit}
369
  @param lu: the logical unit on whose behalf we execute
370
  @type instances: list
371
  @param instances: list of instance names or None for all instances
372
  @rtype: list
373
  @return: the list of instances, sorted
374
  @raise errors.OpPrereqError: if the instances parameter is wrong type
375
  @raise errors.OpPrereqError: if any of the passed instances is not found
376

377
  """
378
  if not isinstance(instances, list):
379
    raise errors.OpPrereqError("Invalid argument type 'instances'")
380

    
381
  if instances:
382
    wanted = []
383

    
384
    for name in instances:
385
      instance = lu.cfg.ExpandInstanceName(name)
386
      if instance is None:
387
        raise errors.OpPrereqError("No such instance name '%s'" % name)
388
      wanted.append(instance)
389

    
390
  else:
391
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
392
  return wanted
393

    
394

    
395
def _CheckOutputFields(static, dynamic, selected):
396
  """Checks whether all selected fields are valid.
397

398
  @type static: L{utils.FieldSet}
399
  @param static: static fields set
400
  @type dynamic: L{utils.FieldSet}
401
  @param dynamic: dynamic fields set
402

403
  """
404
  f = utils.FieldSet()
405
  f.Extend(static)
406
  f.Extend(dynamic)
407

    
408
  delta = f.NonMatching(selected)
409
  if delta:
410
    raise errors.OpPrereqError("Unknown output fields selected: %s"
411
                               % ",".join(delta))
412

    
413

    
414
def _CheckBooleanOpField(op, name):
415
  """Validates boolean opcode parameters.
416

417
  This will ensure that an opcode parameter is either a boolean value,
418
  or None (but that it always exists).
419

420
  """
421
  val = getattr(op, name, None)
422
  if not (val is None or isinstance(val, bool)):
423
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
424
                               (name, str(val)))
425
  setattr(op, name, val)
426

    
427

    
428
def _CheckNodeOnline(lu, node):
429
  """Ensure that a given node is online.
430

431
  @param lu: the LU on behalf of which we make the check
432
  @param node: the node to check
433
  @raise errors.OpPrereqError: if the node is offline
434

435
  """
436
  if lu.cfg.GetNodeInfo(node).offline:
437
    raise errors.OpPrereqError("Can't use offline node %s" % node)
438

    
439

    
440
def _CheckNodeNotDrained(lu, node):
441
  """Ensure that a given node is not drained.
442

443
  @param lu: the LU on behalf of which we make the check
444
  @param node: the node to check
445
  @raise errors.OpPrereqError: if the node is drained
446

447
  """
448
  if lu.cfg.GetNodeInfo(node).drained:
449
    raise errors.OpPrereqError("Can't use drained node %s" % node)
450

    
451

    
452
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
453
                          memory, vcpus, nics, disk_template, disks,
454
                          bep, hvp, hypervisor_name):
455
  """Builds instance related env variables for hooks
456

457
  This builds the hook environment from individual variables.
458

459
  @type name: string
460
  @param name: the name of the instance
461
  @type primary_node: string
462
  @param primary_node: the name of the instance's primary node
463
  @type secondary_nodes: list
464
  @param secondary_nodes: list of secondary nodes as strings
465
  @type os_type: string
466
  @param os_type: the name of the instance's OS
467
  @type status: boolean
468
  @param status: the should_run status of the instance
469
  @type memory: string
470
  @param memory: the memory size of the instance
471
  @type vcpus: string
472
  @param vcpus: the count of VCPUs the instance has
473
  @type nics: list
474
  @param nics: list of tuples (ip, bridge, mac) representing
475
      the NICs the instance  has
476
  @type disk_template: string
477
  @param disk_template: the disk template of the instance
478
  @type disks: list
479
  @param disks: the list of (size, mode) pairs
480
  @type bep: dict
481
  @param bep: the backend parameters for the instance
482
  @type hvp: dict
483
  @param hvp: the hypervisor parameters for the instance
484
  @type hypervisor_name: string
485
  @param hypervisor_name: the hypervisor for the instance
486
  @rtype: dict
487
  @return: the hook environment for this instance
488

489
  """
490
  if status:
491
    str_status = "up"
492
  else:
493
    str_status = "down"
494
  env = {
495
    "OP_TARGET": name,
496
    "INSTANCE_NAME": name,
497
    "INSTANCE_PRIMARY": primary_node,
498
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
499
    "INSTANCE_OS_TYPE": os_type,
500
    "INSTANCE_STATUS": str_status,
501
    "INSTANCE_MEMORY": memory,
502
    "INSTANCE_VCPUS": vcpus,
503
    "INSTANCE_DISK_TEMPLATE": disk_template,
504
    "INSTANCE_HYPERVISOR": hypervisor_name,
505
  }
506

    
507
  if nics:
508
    nic_count = len(nics)
509
    for idx, (ip, bridge, mac) in enumerate(nics):
510
      if ip is None:
511
        ip = ""
512
      env["INSTANCE_NIC%d_IP" % idx] = ip
513
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
514
      env["INSTANCE_NIC%d_MAC" % idx] = mac
515
  else:
516
    nic_count = 0
517

    
518
  env["INSTANCE_NIC_COUNT"] = nic_count
519

    
520
  if disks:
521
    disk_count = len(disks)
522
    for idx, (size, mode) in enumerate(disks):
523
      env["INSTANCE_DISK%d_SIZE" % idx] = size
524
      env["INSTANCE_DISK%d_MODE" % idx] = mode
525
  else:
526
    disk_count = 0
527

    
528
  env["INSTANCE_DISK_COUNT"] = disk_count
529

    
530
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
531
    for key, value in source.items():
532
      env["INSTANCE_%s_%s" % (kind, key)] = value
533

    
534
  return env
535

    
536

    
537
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
538
  """Builds instance related env variables for hooks from an object.
539

540
  @type lu: L{LogicalUnit}
541
  @param lu: the logical unit on whose behalf we execute
542
  @type instance: L{objects.Instance}
543
  @param instance: the instance for which we should build the
544
      environment
545
  @type override: dict
546
  @param override: dictionary with key/values that will override
547
      our values
548
  @rtype: dict
549
  @return: the hook environment dictionary
550

551
  """
552
  cluster = lu.cfg.GetClusterInfo()
553
  bep = cluster.FillBE(instance)
554
  hvp = cluster.FillHV(instance)
555
  args = {
556
    'name': instance.name,
557
    'primary_node': instance.primary_node,
558
    'secondary_nodes': instance.secondary_nodes,
559
    'os_type': instance.os,
560
    'status': instance.admin_up,
561
    'memory': bep[constants.BE_MEMORY],
562
    'vcpus': bep[constants.BE_VCPUS],
563
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
564
    'disk_template': instance.disk_template,
565
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
566
    'bep': bep,
567
    'hvp': hvp,
568
    'hypervisor': instance.hypervisor,
569
  }
570
  if override:
571
    args.update(override)
572
  return _BuildInstanceHookEnv(**args)
573

    
574

    
575
def _AdjustCandidatePool(lu):
576
  """Adjust the candidate pool after node operations.
577

578
  """
579
  mod_list = lu.cfg.MaintainCandidatePool()
580
  if mod_list:
581
    lu.LogInfo("Promoted nodes to master candidate role: %s",
582
               ", ".join(node.name for node in mod_list))
583
    for name in mod_list:
584
      lu.context.ReaddNode(name)
585
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
586
  if mc_now > mc_max:
587
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
588
               (mc_now, mc_max))
589

    
590

    
591
def _CheckInstanceBridgesExist(lu, instance):
592
  """Check that the bridges needed by an instance exist.
593

594
  """
595
  # check bridges existence
596
  brlist = [nic.bridge for nic in instance.nics]
597
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
598
  result.Raise()
599
  if not result.data:
600
    raise errors.OpPrereqError("One or more target bridges %s does not"
601
                               " exist on destination node '%s'" %
602
                               (brlist, instance.primary_node))
603

    
604

    
605
class LUDestroyCluster(NoHooksLU):
606
  """Logical unit for destroying the cluster.
607

608
  """
609
  _OP_REQP = []
610

    
611
  def CheckPrereq(self):
612
    """Check prerequisites.
613

614
    This checks whether the cluster is empty.
615

616
    Any errors are signaled by raising errors.OpPrereqError.
617

618
    """
619
    master = self.cfg.GetMasterNode()
620

    
621
    nodelist = self.cfg.GetNodeList()
622
    if len(nodelist) != 1 or nodelist[0] != master:
623
      raise errors.OpPrereqError("There are still %d node(s) in"
624
                                 " this cluster." % (len(nodelist) - 1))
625
    instancelist = self.cfg.GetInstanceList()
626
    if instancelist:
627
      raise errors.OpPrereqError("There are still %d instance(s) in"
628
                                 " this cluster." % len(instancelist))
629

    
630
  def Exec(self, feedback_fn):
631
    """Destroys the cluster.
632

633
    """
634
    master = self.cfg.GetMasterNode()
635
    result = self.rpc.call_node_stop_master(master, False)
636
    result.Raise()
637
    if not result.data:
638
      raise errors.OpExecError("Could not disable the master role")
639
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640
    utils.CreateBackup(priv_key)
641
    utils.CreateBackup(pub_key)
642
    return master
643

    
644

    
645
class LUVerifyCluster(LogicalUnit):
646
  """Verifies the cluster status.
647

648
  """
649
  HPATH = "cluster-verify"
650
  HTYPE = constants.HTYPE_CLUSTER
651
  _OP_REQP = ["skip_checks"]
652
  REQ_BGL = False
653

    
654
  def ExpandNames(self):
655
    self.needed_locks = {
656
      locking.LEVEL_NODE: locking.ALL_SET,
657
      locking.LEVEL_INSTANCE: locking.ALL_SET,
658
    }
659
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
660

    
661
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
662
                  node_result, feedback_fn, master_files,
663
                  drbd_map, vg_name):
664
    """Run multiple tests against a node.
665

666
    Test list:
667

668
      - compares ganeti version
669
      - checks vg existence and size > 20G
670
      - checks config file checksum
671
      - checks ssh to other nodes
672

673
    @type nodeinfo: L{objects.Node}
674
    @param nodeinfo: the node to check
675
    @param file_list: required list of files
676
    @param local_cksum: dictionary of local files and their checksums
677
    @param node_result: the results from the node
678
    @param feedback_fn: function used to accumulate results
679
    @param master_files: list of files that only masters should have
680
    @param drbd_map: the useddrbd minors for this node, in
681
        form of minor: (instance, must_exist) which correspond to instances
682
        and their running status
683
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
684

685
    """
686
    node = nodeinfo.name
687

    
688
    # main result, node_result should be a non-empty dict
689
    if not node_result or not isinstance(node_result, dict):
690
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
691
      return True
692

    
693
    # compares ganeti version
694
    local_version = constants.PROTOCOL_VERSION
695
    remote_version = node_result.get('version', None)
696
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
697
            len(remote_version) == 2):
698
      feedback_fn("  - ERROR: connection to %s failed" % (node))
699
      return True
700

    
701
    if local_version != remote_version[0]:
702
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
703
                  " node %s %s" % (local_version, node, remote_version[0]))
704
      return True
705

    
706
    # node seems compatible, we can actually try to look into its results
707

    
708
    bad = False
709

    
710
    # full package version
711
    if constants.RELEASE_VERSION != remote_version[1]:
712
      feedback_fn("  - WARNING: software version mismatch: master %s,"
713
                  " node %s %s" %
714
                  (constants.RELEASE_VERSION, node, remote_version[1]))
715

    
716
    # checks vg existence and size > 20G
717
    if vg_name is not None:
718
      vglist = node_result.get(constants.NV_VGLIST, None)
719
      if not vglist:
720
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
721
                        (node,))
722
        bad = True
723
      else:
724
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
725
                                              constants.MIN_VG_SIZE)
726
        if vgstatus:
727
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728
          bad = True
729

    
730
    # checks config file checksum
731

    
732
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
733
    if not isinstance(remote_cksum, dict):
734
      bad = True
735
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
736
    else:
737
      for file_name in file_list:
738
        node_is_mc = nodeinfo.master_candidate
739
        must_have_file = file_name not in master_files
740
        if file_name not in remote_cksum:
741
          if node_is_mc or must_have_file:
742
            bad = True
743
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
744
        elif remote_cksum[file_name] != local_cksum[file_name]:
745
          if node_is_mc or must_have_file:
746
            bad = True
747
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
748
          else:
749
            # not candidate and this is not a must-have file
750
            bad = True
751
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
752
                        " candidates (and the file is outdated)" % file_name)
753
        else:
754
          # all good, except non-master/non-must have combination
755
          if not node_is_mc and not must_have_file:
756
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
757
                        " candidates" % file_name)
758

    
759
    # checks ssh to any
760

    
761
    if constants.NV_NODELIST not in node_result:
762
      bad = True
763
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
764
    else:
765
      if node_result[constants.NV_NODELIST]:
766
        bad = True
767
        for node in node_result[constants.NV_NODELIST]:
768
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
769
                          (node, node_result[constants.NV_NODELIST][node]))
770

    
771
    if constants.NV_NODENETTEST not in node_result:
772
      bad = True
773
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
774
    else:
775
      if node_result[constants.NV_NODENETTEST]:
776
        bad = True
777
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
778
        for node in nlist:
779
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
780
                          (node, node_result[constants.NV_NODENETTEST][node]))
781

    
782
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
783
    if isinstance(hyp_result, dict):
784
      for hv_name, hv_result in hyp_result.iteritems():
785
        if hv_result is not None:
786
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
787
                      (hv_name, hv_result))
788

    
789
    # check used drbd list
790
    if vg_name is not None:
791
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
792
      if not isinstance(used_minors, (tuple, list)):
793
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
794
                    str(used_minors))
795
      else:
796
        for minor, (iname, must_exist) in drbd_map.items():
797
          if minor not in used_minors and must_exist:
798
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
799
                        " not active" % (minor, iname))
800
            bad = True
801
        for minor in used_minors:
802
          if minor not in drbd_map:
803
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
804
                        minor)
805
            bad = True
806

    
807
    return bad
808

    
809
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
810
                      node_instance, feedback_fn, n_offline):
811
    """Verify an instance.
812

813
    This function checks to see if the required block devices are
814
    available on the instance's node.
815

816
    """
817
    bad = False
818

    
819
    node_current = instanceconfig.primary_node
820

    
821
    node_vol_should = {}
822
    instanceconfig.MapLVsByNode(node_vol_should)
823

    
824
    for node in node_vol_should:
825
      if node in n_offline:
826
        # ignore missing volumes on offline nodes
827
        continue
828
      for volume in node_vol_should[node]:
829
        if node not in node_vol_is or volume not in node_vol_is[node]:
830
          feedback_fn("  - ERROR: volume %s missing on node %s" %
831
                          (volume, node))
832
          bad = True
833

    
834
    if instanceconfig.admin_up:
835
      if ((node_current not in node_instance or
836
          not instance in node_instance[node_current]) and
837
          node_current not in n_offline):
838
        feedback_fn("  - ERROR: instance %s not running on node %s" %
839
                        (instance, node_current))
840
        bad = True
841

    
842
    for node in node_instance:
843
      if (not node == node_current):
844
        if instance in node_instance[node]:
845
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
846
                          (instance, node))
847
          bad = True
848

    
849
    return bad
850

    
851
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
852
    """Verify if there are any unknown volumes in the cluster.
853

854
    The .os, .swap and backup volumes are ignored. All other volumes are
855
    reported as unknown.
856

857
    """
858
    bad = False
859

    
860
    for node in node_vol_is:
861
      for volume in node_vol_is[node]:
862
        if node not in node_vol_should or volume not in node_vol_should[node]:
863
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
864
                      (volume, node))
865
          bad = True
866
    return bad
867

    
868
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
869
    """Verify the list of running instances.
870

871
    This checks what instances are running but unknown to the cluster.
872

873
    """
874
    bad = False
875
    for node in node_instance:
876
      for runninginstance in node_instance[node]:
877
        if runninginstance not in instancelist:
878
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
879
                          (runninginstance, node))
880
          bad = True
881
    return bad
882

    
883
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
884
    """Verify N+1 Memory Resilience.
885

886
    Check that if one single node dies we can still start all the instances it
887
    was primary for.
888

889
    """
890
    bad = False
891

    
892
    for node, nodeinfo in node_info.iteritems():
893
      # This code checks that every node which is now listed as secondary has
894
      # enough memory to host all instances it is supposed to should a single
895
      # other node in the cluster fail.
896
      # FIXME: not ready for failover to an arbitrary node
897
      # FIXME: does not support file-backed instances
898
      # WARNING: we currently take into account down instances as well as up
899
      # ones, considering that even if they're down someone might want to start
900
      # them even in the event of a node failure.
901
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
902
        needed_mem = 0
903
        for instance in instances:
904
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
905
          if bep[constants.BE_AUTO_BALANCE]:
906
            needed_mem += bep[constants.BE_MEMORY]
907
        if nodeinfo['mfree'] < needed_mem:
908
          feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
909
                      " failovers should node %s fail" % (node, prinode))
910
          bad = True
911
    return bad
912

    
913
  def CheckPrereq(self):
914
    """Check prerequisites.
915

916
    Transform the list of checks we're going to skip into a set and check that
917
    all its members are valid.
918

919
    """
920
    self.skip_set = frozenset(self.op.skip_checks)
921
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
922
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
923

    
924
  def BuildHooksEnv(self):
925
    """Build hooks env.
926

927
    Cluster-Verify hooks just ran in the post phase and their failure makes
928
    the output be logged in the verify output and the verification to fail.
929

930
    """
931
    all_nodes = self.cfg.GetNodeList()
932
    env = {
933
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
934
      }
935
    for node in self.cfg.GetAllNodesInfo().values():
936
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
937

    
938
    return env, [], all_nodes
939

    
940
  def Exec(self, feedback_fn):
941
    """Verify integrity of cluster, performing various test on nodes.
942

943
    """
944
    bad = False
945
    feedback_fn("* Verifying global settings")
946
    for msg in self.cfg.VerifyConfig():
947
      feedback_fn("  - ERROR: %s" % msg)
948

    
949
    vg_name = self.cfg.GetVGName()
950
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
951
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
952
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
953
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
954
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
955
                        for iname in instancelist)
956
    i_non_redundant = [] # Non redundant instances
957
    i_non_a_balanced = [] # Non auto-balanced instances
958
    n_offline = [] # List of offline nodes
959
    n_drained = [] # List of nodes being drained
960
    node_volume = {}
961
    node_instance = {}
962
    node_info = {}
963
    instance_cfg = {}
964

    
965
    # FIXME: verify OS list
966
    # do local checksums
967
    master_files = [constants.CLUSTER_CONF_FILE]
968

    
969
    file_names = ssconf.SimpleStore().GetFileList()
970
    file_names.append(constants.SSL_CERT_FILE)
971
    file_names.append(constants.RAPI_CERT_FILE)
972
    file_names.extend(master_files)
973

    
974
    local_checksums = utils.FingerprintFiles(file_names)
975

    
976
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
977
    node_verify_param = {
978
      constants.NV_FILELIST: file_names,
979
      constants.NV_NODELIST: [node.name for node in nodeinfo
980
                              if not node.offline],
981
      constants.NV_HYPERVISOR: hypervisors,
982
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
983
                                  node.secondary_ip) for node in nodeinfo
984
                                 if not node.offline],
985
      constants.NV_INSTANCELIST: hypervisors,
986
      constants.NV_VERSION: None,
987
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
988
      }
989
    if vg_name is not None:
990
      node_verify_param[constants.NV_VGLIST] = None
991
      node_verify_param[constants.NV_LVLIST] = vg_name
992
      node_verify_param[constants.NV_DRBDLIST] = None
993
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
994
                                           self.cfg.GetClusterName())
995

    
996
    cluster = self.cfg.GetClusterInfo()
997
    master_node = self.cfg.GetMasterNode()
998
    all_drbd_map = self.cfg.ComputeDRBDMap()
999

    
1000
    for node_i in nodeinfo:
1001
      node = node_i.name
1002
      nresult = all_nvinfo[node].data
1003

    
1004
      if node_i.offline:
1005
        feedback_fn("* Skipping offline node %s" % (node,))
1006
        n_offline.append(node)
1007
        continue
1008

    
1009
      if node == master_node:
1010
        ntype = "master"
1011
      elif node_i.master_candidate:
1012
        ntype = "master candidate"
1013
      elif node_i.drained:
1014
        ntype = "drained"
1015
        n_drained.append(node)
1016
      else:
1017
        ntype = "regular"
1018
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1019

    
1020
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1021
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1022
        bad = True
1023
        continue
1024

    
1025
      node_drbd = {}
1026
      for minor, instance in all_drbd_map[node].items():
1027
        if instance not in instanceinfo:
1028
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1029
                      instance)
1030
          # ghost instance should not be running, but otherwise we
1031
          # don't give double warnings (both ghost instance and
1032
          # unallocated minor in use)
1033
          node_drbd[minor] = (instance, False)
1034
        else:
1035
          instance = instanceinfo[instance]
1036
          node_drbd[minor] = (instance.name, instance.admin_up)
1037
      result = self._VerifyNode(node_i, file_names, local_checksums,
1038
                                nresult, feedback_fn, master_files,
1039
                                node_drbd, vg_name)
1040
      bad = bad or result
1041

    
1042
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1043
      if vg_name is None:
1044
        node_volume[node] = {}
1045
      elif isinstance(lvdata, basestring):
1046
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1047
                    (node, utils.SafeEncode(lvdata)))
1048
        bad = True
1049
        node_volume[node] = {}
1050
      elif not isinstance(lvdata, dict):
1051
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1052
        bad = True
1053
        continue
1054
      else:
1055
        node_volume[node] = lvdata
1056

    
1057
      # node_instance
1058
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1059
      if not isinstance(idata, list):
1060
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1061
                    (node,))
1062
        bad = True
1063
        continue
1064

    
1065
      node_instance[node] = idata
1066

    
1067
      # node_info
1068
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1069
      if not isinstance(nodeinfo, dict):
1070
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1071
        bad = True
1072
        continue
1073

    
1074
      try:
1075
        node_info[node] = {
1076
          "mfree": int(nodeinfo['memory_free']),
1077
          "pinst": [],
1078
          "sinst": [],
1079
          # dictionary holding all instances this node is secondary for,
1080
          # grouped by their primary node. Each key is a cluster node, and each
1081
          # value is a list of instances which have the key as primary and the
1082
          # current node as secondary.  this is handy to calculate N+1 memory
1083
          # availability if you can only failover from a primary to its
1084
          # secondary.
1085
          "sinst-by-pnode": {},
1086
        }
1087
        # FIXME: devise a free space model for file based instances as well
1088
        if vg_name is not None:
1089
          if (constants.NV_VGLIST not in nresult or
1090
              vg_name not in nresult[constants.NV_VGLIST]):
1091
            feedback_fn("  - ERROR: node %s didn't return data for the"
1092
                        " volume group '%s' - it is either missing or broken" %
1093
                        (node, vg_name))
1094
            bad = True
1095
            continue
1096
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1097
      except (ValueError, KeyError):
1098
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1099
                    " from node %s" % (node,))
1100
        bad = True
1101
        continue
1102

    
1103
    node_vol_should = {}
1104

    
1105
    for instance in instancelist:
1106
      feedback_fn("* Verifying instance %s" % instance)
1107
      inst_config = instanceinfo[instance]
1108
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1109
                                     node_instance, feedback_fn, n_offline)
1110
      bad = bad or result
1111
      inst_nodes_offline = []
1112

    
1113
      inst_config.MapLVsByNode(node_vol_should)
1114

    
1115
      instance_cfg[instance] = inst_config
1116

    
1117
      pnode = inst_config.primary_node
1118
      if pnode in node_info:
1119
        node_info[pnode]['pinst'].append(instance)
1120
      elif pnode not in n_offline:
1121
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1122
                    " %s failed" % (instance, pnode))
1123
        bad = True
1124

    
1125
      if pnode in n_offline:
1126
        inst_nodes_offline.append(pnode)
1127

    
1128
      # If the instance is non-redundant we cannot survive losing its primary
1129
      # node, so we are not N+1 compliant. On the other hand we have no disk
1130
      # templates with more than one secondary so that situation is not well
1131
      # supported either.
1132
      # FIXME: does not support file-backed instances
1133
      if len(inst_config.secondary_nodes) == 0:
1134
        i_non_redundant.append(instance)
1135
      elif len(inst_config.secondary_nodes) > 1:
1136
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1137
                    % instance)
1138

    
1139
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1140
        i_non_a_balanced.append(instance)
1141

    
1142
      for snode in inst_config.secondary_nodes:
1143
        if snode in node_info:
1144
          node_info[snode]['sinst'].append(instance)
1145
          if pnode not in node_info[snode]['sinst-by-pnode']:
1146
            node_info[snode]['sinst-by-pnode'][pnode] = []
1147
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1148
        elif snode not in n_offline:
1149
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1150
                      " %s failed" % (instance, snode))
1151
          bad = True
1152
        if snode in n_offline:
1153
          inst_nodes_offline.append(snode)
1154

    
1155
      if inst_nodes_offline:
1156
        # warn that the instance lives on offline nodes, and set bad=True
1157
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1158
                    ", ".join(inst_nodes_offline))
1159
        bad = True
1160

    
1161
    feedback_fn("* Verifying orphan volumes")
1162
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1163
                                       feedback_fn)
1164
    bad = bad or result
1165

    
1166
    feedback_fn("* Verifying remaining instances")
1167
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1168
                                         feedback_fn)
1169
    bad = bad or result
1170

    
1171
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1172
      feedback_fn("* Verifying N+1 Memory redundancy")
1173
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1174
      bad = bad or result
1175

    
1176
    feedback_fn("* Other Notes")
1177
    if i_non_redundant:
1178
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1179
                  % len(i_non_redundant))
1180

    
1181
    if i_non_a_balanced:
1182
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1183
                  % len(i_non_a_balanced))
1184

    
1185
    if n_offline:
1186
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1187

    
1188
    if n_drained:
1189
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1190

    
1191
    return not bad
1192

    
1193
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1194
    """Analyze the post-hooks' result
1195

1196
    This method analyses the hook result, handles it, and sends some
1197
    nicely-formatted feedback back to the user.
1198

1199
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1200
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1201
    @param hooks_results: the results of the multi-node hooks rpc call
1202
    @param feedback_fn: function used send feedback back to the caller
1203
    @param lu_result: previous Exec result
1204
    @return: the new Exec result, based on the previous result
1205
        and hook results
1206

1207
    """
1208
    # We only really run POST phase hooks, and are only interested in
1209
    # their results
1210
    if phase == constants.HOOKS_PHASE_POST:
1211
      # Used to change hooks' output to proper indentation
1212
      indent_re = re.compile('^', re.M)
1213
      feedback_fn("* Hooks Results")
1214
      if not hooks_results:
1215
        feedback_fn("  - ERROR: general communication failure")
1216
        lu_result = 1
1217
      else:
1218
        for node_name in hooks_results:
1219
          show_node_header = True
1220
          res = hooks_results[node_name]
1221
          if res.failed or res.data is False or not isinstance(res.data, list):
1222
            if res.offline:
1223
              # no need to warn or set fail return value
1224
              continue
1225
            feedback_fn("    Communication failure in hooks execution")
1226
            lu_result = 1
1227
            continue
1228
          for script, hkr, output in res.data:
1229
            if hkr == constants.HKR_FAIL:
1230
              # The node header is only shown once, if there are
1231
              # failing hooks on that node
1232
              if show_node_header:
1233
                feedback_fn("  Node %s:" % node_name)
1234
                show_node_header = False
1235
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1236
              output = indent_re.sub('      ', output)
1237
              feedback_fn("%s" % output)
1238
              lu_result = 1
1239

    
1240
      return lu_result
1241

    
1242

    
1243
class LUVerifyDisks(NoHooksLU):
1244
  """Verifies the cluster disks status.
1245

1246
  """
1247
  _OP_REQP = []
1248
  REQ_BGL = False
1249

    
1250
  def ExpandNames(self):
1251
    self.needed_locks = {
1252
      locking.LEVEL_NODE: locking.ALL_SET,
1253
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1254
    }
1255
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1256

    
1257
  def CheckPrereq(self):
1258
    """Check prerequisites.
1259

1260
    This has no prerequisites.
1261

1262
    """
1263
    pass
1264

    
1265
  def Exec(self, feedback_fn):
1266
    """Verify integrity of cluster disks.
1267

1268
    """
1269
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1270

    
1271
    vg_name = self.cfg.GetVGName()
1272
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1273
    instances = [self.cfg.GetInstanceInfo(name)
1274
                 for name in self.cfg.GetInstanceList()]
1275

    
1276
    nv_dict = {}
1277
    for inst in instances:
1278
      inst_lvs = {}
1279
      if (not inst.admin_up or
1280
          inst.disk_template not in constants.DTS_NET_MIRROR):
1281
        continue
1282
      inst.MapLVsByNode(inst_lvs)
1283
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1284
      for node, vol_list in inst_lvs.iteritems():
1285
        for vol in vol_list:
1286
          nv_dict[(node, vol)] = inst
1287

    
1288
    if not nv_dict:
1289
      return result
1290

    
1291
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1292

    
1293
    for node in nodes:
1294
      # node_volume
1295
      lvs = node_lvs[node]
1296
      if lvs.failed:
1297
        if not lvs.offline:
1298
          self.LogWarning("Connection to node %s failed: %s" %
1299
                          (node, lvs.data))
1300
        continue
1301
      lvs = lvs.data
1302
      if isinstance(lvs, basestring):
1303
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1304
        res_nlvm[node] = lvs
1305
        continue
1306
      elif not isinstance(lvs, dict):
1307
        logging.warning("Connection to node %s failed or invalid data"
1308
                        " returned", node)
1309
        res_nodes.append(node)
1310
        continue
1311

    
1312
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1313
        inst = nv_dict.pop((node, lv_name), None)
1314
        if (not lv_online and inst is not None
1315
            and inst.name not in res_instances):
1316
          res_instances.append(inst.name)
1317

    
1318
    # any leftover items in nv_dict are missing LVs, let's arrange the
1319
    # data better
1320
    for key, inst in nv_dict.iteritems():
1321
      if inst.name not in res_missing:
1322
        res_missing[inst.name] = []
1323
      res_missing[inst.name].append(key)
1324

    
1325
    return result
1326

    
1327

    
1328
class LURenameCluster(LogicalUnit):
1329
  """Rename the cluster.
1330

1331
  """
1332
  HPATH = "cluster-rename"
1333
  HTYPE = constants.HTYPE_CLUSTER
1334
  _OP_REQP = ["name"]
1335

    
1336
  def BuildHooksEnv(self):
1337
    """Build hooks env.
1338

1339
    """
1340
    env = {
1341
      "OP_TARGET": self.cfg.GetClusterName(),
1342
      "NEW_NAME": self.op.name,
1343
      }
1344
    mn = self.cfg.GetMasterNode()
1345
    return env, [mn], [mn]
1346

    
1347
  def CheckPrereq(self):
1348
    """Verify that the passed name is a valid one.
1349

1350
    """
1351
    hostname = utils.HostInfo(self.op.name)
1352

    
1353
    new_name = hostname.name
1354
    self.ip = new_ip = hostname.ip
1355
    old_name = self.cfg.GetClusterName()
1356
    old_ip = self.cfg.GetMasterIP()
1357
    if new_name == old_name and new_ip == old_ip:
1358
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1359
                                 " cluster has changed")
1360
    if new_ip != old_ip:
1361
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1362
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1363
                                   " reachable on the network. Aborting." %
1364
                                   new_ip)
1365

    
1366
    self.op.name = new_name
1367

    
1368
  def Exec(self, feedback_fn):
1369
    """Rename the cluster.
1370

1371
    """
1372
    clustername = self.op.name
1373
    ip = self.ip
1374

    
1375
    # shutdown the master IP
1376
    master = self.cfg.GetMasterNode()
1377
    result = self.rpc.call_node_stop_master(master, False)
1378
    if result.failed or not result.data:
1379
      raise errors.OpExecError("Could not disable the master role")
1380

    
1381
    try:
1382
      cluster = self.cfg.GetClusterInfo()
1383
      cluster.cluster_name = clustername
1384
      cluster.master_ip = ip
1385
      self.cfg.Update(cluster)
1386

    
1387
      # update the known hosts file
1388
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1389
      node_list = self.cfg.GetNodeList()
1390
      try:
1391
        node_list.remove(master)
1392
      except ValueError:
1393
        pass
1394
      result = self.rpc.call_upload_file(node_list,
1395
                                         constants.SSH_KNOWN_HOSTS_FILE)
1396
      for to_node, to_result in result.iteritems():
1397
        if to_result.failed or not to_result.data:
1398
          logging.error("Copy of file %s to node %s failed",
1399
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1400

    
1401
    finally:
1402
      result = self.rpc.call_node_start_master(master, False)
1403
      if result.failed or not result.data:
1404
        self.LogWarning("Could not re-enable the master role on"
1405
                        " the master, please restart manually.")
1406

    
1407

    
1408
def _RecursiveCheckIfLVMBased(disk):
1409
  """Check if the given disk or its children are lvm-based.
1410

1411
  @type disk: L{objects.Disk}
1412
  @param disk: the disk to check
1413
  @rtype: boolean
1414
  @return: boolean indicating whether a LD_LV dev_type was found or not
1415

1416
  """
1417
  if disk.children:
1418
    for chdisk in disk.children:
1419
      if _RecursiveCheckIfLVMBased(chdisk):
1420
        return True
1421
  return disk.dev_type == constants.LD_LV
1422

    
1423

    
1424
class LUSetClusterParams(LogicalUnit):
1425
  """Change the parameters of the cluster.
1426

1427
  """
1428
  HPATH = "cluster-modify"
1429
  HTYPE = constants.HTYPE_CLUSTER
1430
  _OP_REQP = []
1431
  REQ_BGL = False
1432

    
1433
  def CheckArguments(self):
1434
    """Check parameters
1435

1436
    """
1437
    if not hasattr(self.op, "candidate_pool_size"):
1438
      self.op.candidate_pool_size = None
1439
    if self.op.candidate_pool_size is not None:
1440
      try:
1441
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1442
      except (ValueError, TypeError), err:
1443
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1444
                                   str(err))
1445
      if self.op.candidate_pool_size < 1:
1446
        raise errors.OpPrereqError("At least one master candidate needed")
1447

    
1448
  def ExpandNames(self):
1449
    # FIXME: in the future maybe other cluster params won't require checking on
1450
    # all nodes to be modified.
1451
    self.needed_locks = {
1452
      locking.LEVEL_NODE: locking.ALL_SET,
1453
    }
1454
    self.share_locks[locking.LEVEL_NODE] = 1
1455

    
1456
  def BuildHooksEnv(self):
1457
    """Build hooks env.
1458

1459
    """
1460
    env = {
1461
      "OP_TARGET": self.cfg.GetClusterName(),
1462
      "NEW_VG_NAME": self.op.vg_name,
1463
      }
1464
    mn = self.cfg.GetMasterNode()
1465
    return env, [mn], [mn]
1466

    
1467
  def CheckPrereq(self):
1468
    """Check prerequisites.
1469

1470
    This checks whether the given params don't conflict and
1471
    if the given volume group is valid.
1472

1473
    """
1474
    if self.op.vg_name is not None and not self.op.vg_name:
1475
      instances = self.cfg.GetAllInstancesInfo().values()
1476
      for inst in instances:
1477
        for disk in inst.disks:
1478
          if _RecursiveCheckIfLVMBased(disk):
1479
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1480
                                       " lvm-based instances exist")
1481

    
1482
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1483

    
1484
    # if vg_name not None, checks given volume group on all nodes
1485
    if self.op.vg_name:
1486
      vglist = self.rpc.call_vg_list(node_list)
1487
      for node in node_list:
1488
        if vglist[node].failed:
1489
          # ignoring down node
1490
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1491
          continue
1492
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1493
                                              self.op.vg_name,
1494
                                              constants.MIN_VG_SIZE)
1495
        if vgstatus:
1496
          raise errors.OpPrereqError("Error on node '%s': %s" %
1497
                                     (node, vgstatus))
1498

    
1499
    self.cluster = cluster = self.cfg.GetClusterInfo()
1500
    # validate beparams changes
1501
    if self.op.beparams:
1502
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1503
      self.new_beparams = cluster.FillDict(
1504
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1505

    
1506
    # hypervisor list/parameters
1507
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1508
    if self.op.hvparams:
1509
      if not isinstance(self.op.hvparams, dict):
1510
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1511
      for hv_name, hv_dict in self.op.hvparams.items():
1512
        if hv_name not in self.new_hvparams:
1513
          self.new_hvparams[hv_name] = hv_dict
1514
        else:
1515
          self.new_hvparams[hv_name].update(hv_dict)
1516

    
1517
    if self.op.enabled_hypervisors is not None:
1518
      self.hv_list = self.op.enabled_hypervisors
1519
    else:
1520
      self.hv_list = cluster.enabled_hypervisors
1521

    
1522
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1523
      # either the enabled list has changed, or the parameters have, validate
1524
      for hv_name, hv_params in self.new_hvparams.items():
1525
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1526
            (self.op.enabled_hypervisors and
1527
             hv_name in self.op.enabled_hypervisors)):
1528
          # either this is a new hypervisor, or its parameters have changed
1529
          hv_class = hypervisor.GetHypervisor(hv_name)
1530
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1531
          hv_class.CheckParameterSyntax(hv_params)
1532
          _CheckHVParams(self, node_list, hv_name, hv_params)
1533

    
1534
  def Exec(self, feedback_fn):
1535
    """Change the parameters of the cluster.
1536

1537
    """
1538
    if self.op.vg_name is not None:
1539
      new_volume = self.op.vg_name
1540
      if not new_volume:
1541
        new_volume = None
1542
      if new_volume != self.cfg.GetVGName():
1543
        self.cfg.SetVGName(new_volume)
1544
      else:
1545
        feedback_fn("Cluster LVM configuration already in desired"
1546
                    " state, not changing")
1547
    if self.op.hvparams:
1548
      self.cluster.hvparams = self.new_hvparams
1549
    if self.op.enabled_hypervisors is not None:
1550
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1551
    if self.op.beparams:
1552
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1553
    if self.op.candidate_pool_size is not None:
1554
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1555
      # we need to update the pool size here, otherwise the save will fail
1556
      _AdjustCandidatePool(self)
1557

    
1558
    self.cfg.Update(self.cluster)
1559

    
1560

    
1561
class LURedistributeConfig(NoHooksLU):
1562
  """Force the redistribution of cluster configuration.
1563

1564
  This is a very simple LU.
1565

1566
  """
1567
  _OP_REQP = []
1568
  REQ_BGL = False
1569

    
1570
  def ExpandNames(self):
1571
    self.needed_locks = {
1572
      locking.LEVEL_NODE: locking.ALL_SET,
1573
    }
1574
    self.share_locks[locking.LEVEL_NODE] = 1
1575

    
1576
  def CheckPrereq(self):
1577
    """Check prerequisites.
1578

1579
    """
1580

    
1581
  def Exec(self, feedback_fn):
1582
    """Redistribute the configuration.
1583

1584
    """
1585
    self.cfg.Update(self.cfg.GetClusterInfo())
1586

    
1587

    
1588
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1589
  """Sleep and poll for an instance's disk to sync.
1590

1591
  """
1592
  if not instance.disks:
1593
    return True
1594

    
1595
  if not oneshot:
1596
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1597

    
1598
  node = instance.primary_node
1599

    
1600
  for dev in instance.disks:
1601
    lu.cfg.SetDiskID(dev, node)
1602

    
1603
  retries = 0
1604
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1605
  while True:
1606
    max_time = 0
1607
    done = True
1608
    cumul_degraded = False
1609
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1610
    if rstats.failed or not rstats.data:
1611
      lu.LogWarning("Can't get any data from node %s", node)
1612
      retries += 1
1613
      if retries >= 10:
1614
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1615
                                 " aborting." % node)
1616
      time.sleep(6)
1617
      continue
1618
    rstats = rstats.data
1619
    retries = 0
1620
    for i, mstat in enumerate(rstats):
1621
      if mstat is None:
1622
        lu.LogWarning("Can't compute data for node %s/%s",
1623
                           node, instance.disks[i].iv_name)
1624
        continue
1625
      # we ignore the ldisk parameter
1626
      perc_done, est_time, is_degraded, _ = mstat
1627
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1628
      if perc_done is not None:
1629
        done = False
1630
        if est_time is not None:
1631
          rem_time = "%d estimated seconds remaining" % est_time
1632
          max_time = est_time
1633
        else:
1634
          rem_time = "no time estimate"
1635
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1636
                        (instance.disks[i].iv_name, perc_done, rem_time))
1637

    
1638
    # if we're done but degraded, let's do a few small retries, to
1639
    # make sure we see a stable and not transient situation; therefore
1640
    # we force restart of the loop
1641
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1642
      logging.info("Degraded disks found, %d retries left", degr_retries)
1643
      degr_retries -= 1
1644
      time.sleep(1)
1645
      continue
1646

    
1647
    if done or oneshot:
1648
      break
1649

    
1650
    time.sleep(min(60, max_time))
1651

    
1652
  if done:
1653
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1654
  return not cumul_degraded
1655

    
1656

    
1657
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1658
  """Check that mirrors are not degraded.
1659

1660
  The ldisk parameter, if True, will change the test from the
1661
  is_degraded attribute (which represents overall non-ok status for
1662
  the device(s)) to the ldisk (representing the local storage status).
1663

1664
  """
1665
  lu.cfg.SetDiskID(dev, node)
1666
  if ldisk:
1667
    idx = 6
1668
  else:
1669
    idx = 5
1670

    
1671
  result = True
1672
  if on_primary or dev.AssembleOnSecondary():
1673
    rstats = lu.rpc.call_blockdev_find(node, dev)
1674
    msg = rstats.RemoteFailMsg()
1675
    if msg:
1676
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1677
      result = False
1678
    elif not rstats.payload:
1679
      lu.LogWarning("Can't find disk on node %s", node)
1680
      result = False
1681
    else:
1682
      result = result and (not rstats.payload[idx])
1683
  if dev.children:
1684
    for child in dev.children:
1685
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1686

    
1687
  return result
1688

    
1689

    
1690
class LUDiagnoseOS(NoHooksLU):
1691
  """Logical unit for OS diagnose/query.
1692

1693
  """
1694
  _OP_REQP = ["output_fields", "names"]
1695
  REQ_BGL = False
1696
  _FIELDS_STATIC = utils.FieldSet()
1697
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1698

    
1699
  def ExpandNames(self):
1700
    if self.op.names:
1701
      raise errors.OpPrereqError("Selective OS query not supported")
1702

    
1703
    _CheckOutputFields(static=self._FIELDS_STATIC,
1704
                       dynamic=self._FIELDS_DYNAMIC,
1705
                       selected=self.op.output_fields)
1706

    
1707
    # Lock all nodes, in shared mode
1708
    # Temporary removal of locks, should be reverted later
1709
    # TODO: reintroduce locks when they are lighter-weight
1710
    self.needed_locks = {}
1711
    #self.share_locks[locking.LEVEL_NODE] = 1
1712
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1713

    
1714
  def CheckPrereq(self):
1715
    """Check prerequisites.
1716

1717
    """
1718

    
1719
  @staticmethod
1720
  def _DiagnoseByOS(node_list, rlist):
1721
    """Remaps a per-node return list into an a per-os per-node dictionary
1722

1723
    @param node_list: a list with the names of all nodes
1724
    @param rlist: a map with node names as keys and OS objects as values
1725

1726
    @rtype: dict
1727
    @return: a dictionary with osnames as keys and as value another map, with
1728
        nodes as keys and list of OS objects as values, eg::
1729

1730
          {"debian-etch": {"node1": [<object>,...],
1731
                           "node2": [<object>,]}
1732
          }
1733

1734
    """
1735
    all_os = {}
1736
    # we build here the list of nodes that didn't fail the RPC (at RPC
1737
    # level), so that nodes with a non-responding node daemon don't
1738
    # make all OSes invalid
1739
    good_nodes = [node_name for node_name in rlist
1740
                  if not rlist[node_name].failed]
1741
    for node_name, nr in rlist.iteritems():
1742
      if nr.failed or not nr.data:
1743
        continue
1744
      for os_obj in nr.data:
1745
        if os_obj.name not in all_os:
1746
          # build a list of nodes for this os containing empty lists
1747
          # for each node in node_list
1748
          all_os[os_obj.name] = {}
1749
          for nname in good_nodes:
1750
            all_os[os_obj.name][nname] = []
1751
        all_os[os_obj.name][node_name].append(os_obj)
1752
    return all_os
1753

    
1754
  def Exec(self, feedback_fn):
1755
    """Compute the list of OSes.
1756

1757
    """
1758
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1759
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1760
    if node_data == False:
1761
      raise errors.OpExecError("Can't gather the list of OSes")
1762
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1763
    output = []
1764
    for os_name, os_data in pol.iteritems():
1765
      row = []
1766
      for field in self.op.output_fields:
1767
        if field == "name":
1768
          val = os_name
1769
        elif field == "valid":
1770
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1771
        elif field == "node_status":
1772
          val = {}
1773
          for node_name, nos_list in os_data.iteritems():
1774
            val[node_name] = [(v.status, v.path) for v in nos_list]
1775
        else:
1776
          raise errors.ParameterError(field)
1777
        row.append(val)
1778
      output.append(row)
1779

    
1780
    return output
1781

    
1782

    
1783
class LURemoveNode(LogicalUnit):
1784
  """Logical unit for removing a node.
1785

1786
  """
1787
  HPATH = "node-remove"
1788
  HTYPE = constants.HTYPE_NODE
1789
  _OP_REQP = ["node_name"]
1790

    
1791
  def BuildHooksEnv(self):
1792
    """Build hooks env.
1793

1794
    This doesn't run on the target node in the pre phase as a failed
1795
    node would then be impossible to remove.
1796

1797
    """
1798
    env = {
1799
      "OP_TARGET": self.op.node_name,
1800
      "NODE_NAME": self.op.node_name,
1801
      }
1802
    all_nodes = self.cfg.GetNodeList()
1803
    all_nodes.remove(self.op.node_name)
1804
    return env, all_nodes, all_nodes
1805

    
1806
  def CheckPrereq(self):
1807
    """Check prerequisites.
1808

1809
    This checks:
1810
     - the node exists in the configuration
1811
     - it does not have primary or secondary instances
1812
     - it's not the master
1813

1814
    Any errors are signaled by raising errors.OpPrereqError.
1815

1816
    """
1817
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1818
    if node is None:
1819
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1820

    
1821
    instance_list = self.cfg.GetInstanceList()
1822

    
1823
    masternode = self.cfg.GetMasterNode()
1824
    if node.name == masternode:
1825
      raise errors.OpPrereqError("Node is the master node,"
1826
                                 " you need to failover first.")
1827

    
1828
    for instance_name in instance_list:
1829
      instance = self.cfg.GetInstanceInfo(instance_name)
1830
      if node.name in instance.all_nodes:
1831
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1832
                                   " please remove first." % instance_name)
1833
    self.op.node_name = node.name
1834
    self.node = node
1835

    
1836
  def Exec(self, feedback_fn):
1837
    """Removes the node from the cluster.
1838

1839
    """
1840
    node = self.node
1841
    logging.info("Stopping the node daemon and removing configs from node %s",
1842
                 node.name)
1843

    
1844
    self.context.RemoveNode(node.name)
1845

    
1846
    self.rpc.call_node_leave_cluster(node.name)
1847

    
1848
    # Promote nodes to master candidate as needed
1849
    _AdjustCandidatePool(self)
1850

    
1851

    
1852
class LUQueryNodes(NoHooksLU):
1853
  """Logical unit for querying nodes.
1854

1855
  """
1856
  _OP_REQP = ["output_fields", "names", "use_locking"]
1857
  REQ_BGL = False
1858
  _FIELDS_DYNAMIC = utils.FieldSet(
1859
    "dtotal", "dfree",
1860
    "mtotal", "mnode", "mfree",
1861
    "bootid",
1862
    "ctotal", "cnodes", "csockets",
1863
    )
1864

    
1865
  _FIELDS_STATIC = utils.FieldSet(
1866
    "name", "pinst_cnt", "sinst_cnt",
1867
    "pinst_list", "sinst_list",
1868
    "pip", "sip", "tags",
1869
    "serial_no",
1870
    "master_candidate",
1871
    "master",
1872
    "offline",
1873
    "drained",
1874
    "role",
1875
    )
1876

    
1877
  def ExpandNames(self):
1878
    _CheckOutputFields(static=self._FIELDS_STATIC,
1879
                       dynamic=self._FIELDS_DYNAMIC,
1880
                       selected=self.op.output_fields)
1881

    
1882
    self.needed_locks = {}
1883
    self.share_locks[locking.LEVEL_NODE] = 1
1884

    
1885
    if self.op.names:
1886
      self.wanted = _GetWantedNodes(self, self.op.names)
1887
    else:
1888
      self.wanted = locking.ALL_SET
1889

    
1890
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1891
    self.do_locking = self.do_node_query and self.op.use_locking
1892
    if self.do_locking:
1893
      # if we don't request only static fields, we need to lock the nodes
1894
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1895

    
1896

    
1897
  def CheckPrereq(self):
1898
    """Check prerequisites.
1899

1900
    """
1901
    # The validation of the node list is done in the _GetWantedNodes,
1902
    # if non empty, and if empty, there's no validation to do
1903
    pass
1904

    
1905
  def Exec(self, feedback_fn):
1906
    """Computes the list of nodes and their attributes.
1907

1908
    """
1909
    all_info = self.cfg.GetAllNodesInfo()
1910
    if self.do_locking:
1911
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1912
    elif self.wanted != locking.ALL_SET:
1913
      nodenames = self.wanted
1914
      missing = set(nodenames).difference(all_info.keys())
1915
      if missing:
1916
        raise errors.OpExecError(
1917
          "Some nodes were removed before retrieving their data: %s" % missing)
1918
    else:
1919
      nodenames = all_info.keys()
1920

    
1921
    nodenames = utils.NiceSort(nodenames)
1922
    nodelist = [all_info[name] for name in nodenames]
1923

    
1924
    # begin data gathering
1925

    
1926
    if self.do_node_query:
1927
      live_data = {}
1928
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1929
                                          self.cfg.GetHypervisorType())
1930
      for name in nodenames:
1931
        nodeinfo = node_data[name]
1932
        if not nodeinfo.failed and nodeinfo.data:
1933
          nodeinfo = nodeinfo.data
1934
          fn = utils.TryConvert
1935
          live_data[name] = {
1936
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1937
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1938
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1939
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1940
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1941
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1942
            "bootid": nodeinfo.get('bootid', None),
1943
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1944
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1945
            }
1946
        else:
1947
          live_data[name] = {}
1948
    else:
1949
      live_data = dict.fromkeys(nodenames, {})
1950

    
1951
    node_to_primary = dict([(name, set()) for name in nodenames])
1952
    node_to_secondary = dict([(name, set()) for name in nodenames])
1953

    
1954
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1955
                             "sinst_cnt", "sinst_list"))
1956
    if inst_fields & frozenset(self.op.output_fields):
1957
      instancelist = self.cfg.GetInstanceList()
1958

    
1959
      for instance_name in instancelist:
1960
        inst = self.cfg.GetInstanceInfo(instance_name)
1961
        if inst.primary_node in node_to_primary:
1962
          node_to_primary[inst.primary_node].add(inst.name)
1963
        for secnode in inst.secondary_nodes:
1964
          if secnode in node_to_secondary:
1965
            node_to_secondary[secnode].add(inst.name)
1966

    
1967
    master_node = self.cfg.GetMasterNode()
1968

    
1969
    # end data gathering
1970

    
1971
    output = []
1972
    for node in nodelist:
1973
      node_output = []
1974
      for field in self.op.output_fields:
1975
        if field == "name":
1976
          val = node.name
1977
        elif field == "pinst_list":
1978
          val = list(node_to_primary[node.name])
1979
        elif field == "sinst_list":
1980
          val = list(node_to_secondary[node.name])
1981
        elif field == "pinst_cnt":
1982
          val = len(node_to_primary[node.name])
1983
        elif field == "sinst_cnt":
1984
          val = len(node_to_secondary[node.name])
1985
        elif field == "pip":
1986
          val = node.primary_ip
1987
        elif field == "sip":
1988
          val = node.secondary_ip
1989
        elif field == "tags":
1990
          val = list(node.GetTags())
1991
        elif field == "serial_no":
1992
          val = node.serial_no
1993
        elif field == "master_candidate":
1994
          val = node.master_candidate
1995
        elif field == "master":
1996
          val = node.name == master_node
1997
        elif field == "offline":
1998
          val = node.offline
1999
        elif field == "drained":
2000
          val = node.drained
2001
        elif self._FIELDS_DYNAMIC.Matches(field):
2002
          val = live_data[node.name].get(field, None)
2003
        elif field == "role":
2004
          if node.name == master_node:
2005
            val = "M"
2006
          elif node.master_candidate:
2007
            val = "C"
2008
          elif node.drained:
2009
            val = "D"
2010
          elif node.offline:
2011
            val = "O"
2012
          else:
2013
            val = "R"
2014
        else:
2015
          raise errors.ParameterError(field)
2016
        node_output.append(val)
2017
      output.append(node_output)
2018

    
2019
    return output
2020

    
2021

    
2022
class LUQueryNodeVolumes(NoHooksLU):
2023
  """Logical unit for getting volumes on node(s).
2024

2025
  """
2026
  _OP_REQP = ["nodes", "output_fields"]
2027
  REQ_BGL = False
2028
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2029
  _FIELDS_STATIC = utils.FieldSet("node")
2030

    
2031
  def ExpandNames(self):
2032
    _CheckOutputFields(static=self._FIELDS_STATIC,
2033
                       dynamic=self._FIELDS_DYNAMIC,
2034
                       selected=self.op.output_fields)
2035

    
2036
    self.needed_locks = {}
2037
    self.share_locks[locking.LEVEL_NODE] = 1
2038
    if not self.op.nodes:
2039
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2040
    else:
2041
      self.needed_locks[locking.LEVEL_NODE] = \
2042
        _GetWantedNodes(self, self.op.nodes)
2043

    
2044
  def CheckPrereq(self):
2045
    """Check prerequisites.
2046

2047
    This checks that the fields required are valid output fields.
2048

2049
    """
2050
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2051

    
2052
  def Exec(self, feedback_fn):
2053
    """Computes the list of nodes and their attributes.
2054

2055
    """
2056
    nodenames = self.nodes
2057
    volumes = self.rpc.call_node_volumes(nodenames)
2058

    
2059
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2060
             in self.cfg.GetInstanceList()]
2061

    
2062
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2063

    
2064
    output = []
2065
    for node in nodenames:
2066
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2067
        continue
2068

    
2069
      node_vols = volumes[node].data[:]
2070
      node_vols.sort(key=lambda vol: vol['dev'])
2071

    
2072
      for vol in node_vols:
2073
        node_output = []
2074
        for field in self.op.output_fields:
2075
          if field == "node":
2076
            val = node
2077
          elif field == "phys":
2078
            val = vol['dev']
2079
          elif field == "vg":
2080
            val = vol['vg']
2081
          elif field == "name":
2082
            val = vol['name']
2083
          elif field == "size":
2084
            val = int(float(vol['size']))
2085
          elif field == "instance":
2086
            for inst in ilist:
2087
              if node not in lv_by_node[inst]:
2088
                continue
2089
              if vol['name'] in lv_by_node[inst][node]:
2090
                val = inst.name
2091
                break
2092
            else:
2093
              val = '-'
2094
          else:
2095
            raise errors.ParameterError(field)
2096
          node_output.append(str(val))
2097

    
2098
        output.append(node_output)
2099

    
2100
    return output
2101

    
2102

    
2103
class LUAddNode(LogicalUnit):
2104
  """Logical unit for adding node to the cluster.
2105

2106
  """
2107
  HPATH = "node-add"
2108
  HTYPE = constants.HTYPE_NODE
2109
  _OP_REQP = ["node_name"]
2110

    
2111
  def BuildHooksEnv(self):
2112
    """Build hooks env.
2113

2114
    This will run on all nodes before, and on all nodes + the new node after.
2115

2116
    """
2117
    env = {
2118
      "OP_TARGET": self.op.node_name,
2119
      "NODE_NAME": self.op.node_name,
2120
      "NODE_PIP": self.op.primary_ip,
2121
      "NODE_SIP": self.op.secondary_ip,
2122
      }
2123
    nodes_0 = self.cfg.GetNodeList()
2124
    nodes_1 = nodes_0 + [self.op.node_name, ]
2125
    return env, nodes_0, nodes_1
2126

    
2127
  def CheckPrereq(self):
2128
    """Check prerequisites.
2129

2130
    This checks:
2131
     - the new node is not already in the config
2132
     - it is resolvable
2133
     - its parameters (single/dual homed) matches the cluster
2134

2135
    Any errors are signaled by raising errors.OpPrereqError.
2136

2137
    """
2138
    node_name = self.op.node_name
2139
    cfg = self.cfg
2140

    
2141
    dns_data = utils.HostInfo(node_name)
2142

    
2143
    node = dns_data.name
2144
    primary_ip = self.op.primary_ip = dns_data.ip
2145
    secondary_ip = getattr(self.op, "secondary_ip", None)
2146
    if secondary_ip is None:
2147
      secondary_ip = primary_ip
2148
    if not utils.IsValidIP(secondary_ip):
2149
      raise errors.OpPrereqError("Invalid secondary IP given")
2150
    self.op.secondary_ip = secondary_ip
2151

    
2152
    node_list = cfg.GetNodeList()
2153
    if not self.op.readd and node in node_list:
2154
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2155
                                 node)
2156
    elif self.op.readd and node not in node_list:
2157
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2158

    
2159
    for existing_node_name in node_list:
2160
      existing_node = cfg.GetNodeInfo(existing_node_name)
2161

    
2162
      if self.op.readd and node == existing_node_name:
2163
        if (existing_node.primary_ip != primary_ip or
2164
            existing_node.secondary_ip != secondary_ip):
2165
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2166
                                     " address configuration as before")
2167
        continue
2168

    
2169
      if (existing_node.primary_ip == primary_ip or
2170
          existing_node.secondary_ip == primary_ip or
2171
          existing_node.primary_ip == secondary_ip or
2172
          existing_node.secondary_ip == secondary_ip):
2173
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2174
                                   " existing node %s" % existing_node.name)
2175

    
2176
    # check that the type of the node (single versus dual homed) is the
2177
    # same as for the master
2178
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2179
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2180
    newbie_singlehomed = secondary_ip == primary_ip
2181
    if master_singlehomed != newbie_singlehomed:
2182
      if master_singlehomed:
2183
        raise errors.OpPrereqError("The master has no private ip but the"
2184
                                   " new node has one")
2185
      else:
2186
        raise errors.OpPrereqError("The master has a private ip but the"
2187
                                   " new node doesn't have one")
2188

    
2189
    # checks reachability
2190
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2191
      raise errors.OpPrereqError("Node not reachable by ping")
2192

    
2193
    if not newbie_singlehomed:
2194
      # check reachability from my secondary ip to newbie's secondary ip
2195
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2196
                           source=myself.secondary_ip):
2197
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2198
                                   " based ping to noded port")
2199

    
2200
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2201
    if self.op.readd:
2202
      exceptions = [node]
2203
    else:
2204
      exceptions = []
2205
    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2206
    # the new node will increase mc_max with one, so:
2207
    mc_max = min(mc_max + 1, cp_size)
2208
    self.master_candidate = mc_now < mc_max
2209

    
2210
    if self.op.readd:
2211
      self.new_node = self.cfg.GetNodeInfo(node)
2212
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2213
    else:
2214
      self.new_node = objects.Node(name=node,
2215
                                   primary_ip=primary_ip,
2216
                                   secondary_ip=secondary_ip,
2217
                                   master_candidate=self.master_candidate,
2218
                                   offline=False, drained=False)
2219

    
2220
  def Exec(self, feedback_fn):
2221
    """Adds the new node to the cluster.
2222

2223
    """
2224
    new_node = self.new_node
2225
    node = new_node.name
2226

    
2227
    # for re-adds, reset the offline/drained/master-candidate flags;
2228
    # we need to reset here, otherwise offline would prevent RPC calls
2229
    # later in the procedure; this also means that if the re-add
2230
    # fails, we are left with a non-offlined, broken node
2231
    if self.op.readd:
2232
      new_node.drained = new_node.offline = False
2233
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2234
      # if we demote the node, we do cleanup later in the procedure
2235
      new_node.master_candidate = self.master_candidate
2236

    
2237
    # notify the user about any possible mc promotion
2238
    if new_node.master_candidate:
2239
      self.LogInfo("Node will be a master candidate")
2240

    
2241
    # check connectivity
2242
    result = self.rpc.call_version([node])[node]
2243
    result.Raise()
2244
    if result.data:
2245
      if constants.PROTOCOL_VERSION == result.data:
2246
        logging.info("Communication to node %s fine, sw version %s match",
2247
                     node, result.data)
2248
      else:
2249
        raise errors.OpExecError("Version mismatch master version %s,"
2250
                                 " node version %s" %
2251
                                 (constants.PROTOCOL_VERSION, result.data))
2252
    else:
2253
      raise errors.OpExecError("Cannot get version from the new node")
2254

    
2255
    # setup ssh on node
2256
    logging.info("Copy ssh key to node %s", node)
2257
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2258
    keyarray = []
2259
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2260
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2261
                priv_key, pub_key]
2262

    
2263
    for i in keyfiles:
2264
      f = open(i, 'r')
2265
      try:
2266
        keyarray.append(f.read())
2267
      finally:
2268
        f.close()
2269

    
2270
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2271
                                    keyarray[2],
2272
                                    keyarray[3], keyarray[4], keyarray[5])
2273

    
2274
    msg = result.RemoteFailMsg()
2275
    if msg:
2276
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2277
                               " new node: %s" % msg)
2278

    
2279
    # Add node to our /etc/hosts, and add key to known_hosts
2280
    utils.AddHostToEtcHosts(new_node.name)
2281

    
2282
    if new_node.secondary_ip != new_node.primary_ip:
2283
      result = self.rpc.call_node_has_ip_address(new_node.name,
2284
                                                 new_node.secondary_ip)
2285
      if result.failed or not result.data:
2286
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2287
                                 " you gave (%s). Please fix and re-run this"
2288
                                 " command." % new_node.secondary_ip)
2289

    
2290
    node_verify_list = [self.cfg.GetMasterNode()]
2291
    node_verify_param = {
2292
      'nodelist': [node],
2293
      # TODO: do a node-net-test as well?
2294
    }
2295

    
2296
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2297
                                       self.cfg.GetClusterName())
2298
    for verifier in node_verify_list:
2299
      if result[verifier].failed or not result[verifier].data:
2300
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2301
                                 " for remote verification" % verifier)
2302
      if result[verifier].data['nodelist']:
2303
        for failed in result[verifier].data['nodelist']:
2304
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2305
                      (verifier, result[verifier].data['nodelist'][failed]))
2306
        raise errors.OpExecError("ssh/hostname verification failed.")
2307

    
2308
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2309
    # including the node just added
2310
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2311
    dist_nodes = self.cfg.GetNodeList()
2312
    if not self.op.readd:
2313
      dist_nodes.append(node)
2314
    if myself.name in dist_nodes:
2315
      dist_nodes.remove(myself.name)
2316

    
2317
    logging.debug("Copying hosts and known_hosts to all nodes")
2318
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2319
      result = self.rpc.call_upload_file(dist_nodes, fname)
2320
      for to_node, to_result in result.iteritems():
2321
        if to_result.failed or not to_result.data:
2322
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2323

    
2324
    to_copy = []
2325
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2326
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2327
      to_copy.append(constants.VNC_PASSWORD_FILE)
2328

    
2329
    for fname in to_copy:
2330
      result = self.rpc.call_upload_file([node], fname)
2331
      if result[node].failed or not result[node]:
2332
        logging.error("Could not copy file %s to node %s", fname, node)
2333

    
2334
    if self.op.readd:
2335
      self.context.ReaddNode(new_node)
2336
      # make sure we redistribute the config
2337
      self.cfg.Update(new_node)
2338
      # and make sure the new node will not have old files around
2339
      if not new_node.master_candidate:
2340
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2341
        msg = result.RemoteFailMsg()
2342
        if msg:
2343
          self.LogWarning("Node failed to demote itself from master"
2344
                          " candidate status: %s" % msg)
2345
    else:
2346
      self.context.AddNode(new_node)
2347

    
2348

    
2349
class LUSetNodeParams(LogicalUnit):
2350
  """Modifies the parameters of a node.
2351

2352
  """
2353
  HPATH = "node-modify"
2354
  HTYPE = constants.HTYPE_NODE
2355
  _OP_REQP = ["node_name"]
2356
  REQ_BGL = False
2357

    
2358
  def CheckArguments(self):
2359
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2360
    if node_name is None:
2361
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2362
    self.op.node_name = node_name
2363
    _CheckBooleanOpField(self.op, 'master_candidate')
2364
    _CheckBooleanOpField(self.op, 'offline')
2365
    _CheckBooleanOpField(self.op, 'drained')
2366
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2367
    if all_mods.count(None) == 3:
2368
      raise errors.OpPrereqError("Please pass at least one modification")
2369
    if all_mods.count(True) > 1:
2370
      raise errors.OpPrereqError("Can't set the node into more than one"
2371
                                 " state at the same time")
2372

    
2373
  def ExpandNames(self):
2374
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2375

    
2376
  def BuildHooksEnv(self):
2377
    """Build hooks env.
2378

2379
    This runs on the master node.
2380

2381
    """
2382
    env = {
2383
      "OP_TARGET": self.op.node_name,
2384
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2385
      "OFFLINE": str(self.op.offline),
2386
      "DRAINED": str(self.op.drained),
2387
      }
2388
    nl = [self.cfg.GetMasterNode(),
2389
          self.op.node_name]
2390
    return env, nl, nl
2391

    
2392
  def CheckPrereq(self):
2393
    """Check prerequisites.
2394

2395
    This only checks the instance list against the existing names.
2396

2397
    """
2398
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2399

    
2400
    if ((self.op.master_candidate == False or self.op.offline == True or
2401
         self.op.drained == True) and node.master_candidate):
2402
      # we will demote the node from master_candidate
2403
      if self.op.node_name == self.cfg.GetMasterNode():
2404
        raise errors.OpPrereqError("The master node has to be a"
2405
                                   " master candidate, online and not drained")
2406
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2407
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2408
      if num_candidates <= cp_size:
2409
        msg = ("Not enough master candidates (desired"
2410
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2411
        if self.op.force:
2412
          self.LogWarning(msg)
2413
        else:
2414
          raise errors.OpPrereqError(msg)
2415

    
2416
    if (self.op.master_candidate == True and
2417
        ((node.offline and not self.op.offline == False) or
2418
         (node.drained and not self.op.drained == False))):
2419
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2420
                                 " to master_candidate" % node.name)
2421

    
2422
    return
2423

    
2424
  def Exec(self, feedback_fn):
2425
    """Modifies a node.
2426

2427
    """
2428
    node = self.node
2429

    
2430
    result = []
2431
    changed_mc = False
2432

    
2433
    if self.op.offline is not None:
2434
      node.offline = self.op.offline
2435
      result.append(("offline", str(self.op.offline)))
2436
      if self.op.offline == True:
2437
        if node.master_candidate:
2438
          node.master_candidate = False
2439
          changed_mc = True
2440
          result.append(("master_candidate", "auto-demotion due to offline"))
2441
        if node.drained:
2442
          node.drained = False
2443
          result.append(("drained", "clear drained status due to offline"))
2444

    
2445
    if self.op.master_candidate is not None:
2446
      node.master_candidate = self.op.master_candidate
2447
      changed_mc = True
2448
      result.append(("master_candidate", str(self.op.master_candidate)))
2449
      if self.op.master_candidate == False:
2450
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2451
        msg = rrc.RemoteFailMsg()
2452
        if msg:
2453
          self.LogWarning("Node failed to demote itself: %s" % msg)
2454

    
2455
    if self.op.drained is not None:
2456
      node.drained = self.op.drained
2457
      result.append(("drained", str(self.op.drained)))
2458
      if self.op.drained == True:
2459
        if node.master_candidate:
2460
          node.master_candidate = False
2461
          changed_mc = True
2462
          result.append(("master_candidate", "auto-demotion due to drain"))
2463
          rrc = self.rpc.call_node_demote_from_mc(node.name)
2464
          msg = rrc.RemoteFailMsg()
2465
          if msg:
2466
            self.LogWarning("Node failed to demote itself: %s" % msg)
2467
        if node.offline:
2468
          node.offline = False
2469
          result.append(("offline", "clear offline status due to drain"))
2470

    
2471
    # this will trigger configuration file update, if needed
2472
    self.cfg.Update(node)
2473
    # this will trigger job queue propagation or cleanup
2474
    if changed_mc:
2475
      self.context.ReaddNode(node)
2476

    
2477
    return result
2478

    
2479

    
2480
class LUQueryClusterInfo(NoHooksLU):
2481
  """Query cluster configuration.
2482

2483
  """
2484
  _OP_REQP = []
2485
  REQ_BGL = False
2486

    
2487
  def ExpandNames(self):
2488
    self.needed_locks = {}
2489

    
2490
  def CheckPrereq(self):
2491
    """No prerequsites needed for this LU.
2492

2493
    """
2494
    pass
2495

    
2496
  def Exec(self, feedback_fn):
2497
    """Return cluster config.
2498

2499
    """
2500
    cluster = self.cfg.GetClusterInfo()
2501
    result = {
2502
      "software_version": constants.RELEASE_VERSION,
2503
      "protocol_version": constants.PROTOCOL_VERSION,
2504
      "config_version": constants.CONFIG_VERSION,
2505
      "os_api_version": constants.OS_API_VERSION,
2506
      "export_version": constants.EXPORT_VERSION,
2507
      "architecture": (platform.architecture()[0], platform.machine()),
2508
      "name": cluster.cluster_name,
2509
      "master": cluster.master_node,
2510
      "default_hypervisor": cluster.default_hypervisor,
2511
      "enabled_hypervisors": cluster.enabled_hypervisors,
2512
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor])
2513
                        for hypervisor_name in cluster.enabled_hypervisors]),
2514
      "beparams": cluster.beparams,
2515
      "candidate_pool_size": cluster.candidate_pool_size,
2516
      "default_bridge": cluster.default_bridge,
2517
      "master_netdev": cluster.master_netdev,
2518
      "volume_group_name": cluster.volume_group_name,
2519
      "file_storage_dir": cluster.file_storage_dir,
2520
      }
2521

    
2522
    return result
2523

    
2524

    
2525
class LUQueryConfigValues(NoHooksLU):
2526
  """Return configuration values.
2527

2528
  """
2529
  _OP_REQP = []
2530
  REQ_BGL = False
2531
  _FIELDS_DYNAMIC = utils.FieldSet()
2532
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2533

    
2534
  def ExpandNames(self):
2535
    self.needed_locks = {}
2536

    
2537
    _CheckOutputFields(static=self._FIELDS_STATIC,
2538
                       dynamic=self._FIELDS_DYNAMIC,
2539
                       selected=self.op.output_fields)
2540

    
2541
  def CheckPrereq(self):
2542
    """No prerequisites.
2543

2544
    """
2545
    pass
2546

    
2547
  def Exec(self, feedback_fn):
2548
    """Dump a representation of the cluster config to the standard output.
2549

2550
    """
2551
    values = []
2552
    for field in self.op.output_fields:
2553
      if field == "cluster_name":
2554
        entry = self.cfg.GetClusterName()
2555
      elif field == "master_node":
2556
        entry = self.cfg.GetMasterNode()
2557
      elif field == "drain_flag":
2558
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2559
      else:
2560
        raise errors.ParameterError(field)
2561
      values.append(entry)
2562
    return values
2563

    
2564

    
2565
class LUActivateInstanceDisks(NoHooksLU):
2566
  """Bring up an instance's disks.
2567

2568
  """
2569
  _OP_REQP = ["instance_name"]
2570
  REQ_BGL = False
2571

    
2572
  def ExpandNames(self):
2573
    self._ExpandAndLockInstance()
2574
    self.needed_locks[locking.LEVEL_NODE] = []
2575
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2576

    
2577
  def DeclareLocks(self, level):
2578
    if level == locking.LEVEL_NODE:
2579
      self._LockInstancesNodes()
2580

    
2581
  def CheckPrereq(self):
2582
    """Check prerequisites.
2583

2584
    This checks that the instance is in the cluster.
2585

2586
    """
2587
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2588
    assert self.instance is not None, \
2589
      "Cannot retrieve locked instance %s" % self.op.instance_name
2590
    _CheckNodeOnline(self, self.instance.primary_node)
2591

    
2592
  def Exec(self, feedback_fn):
2593
    """Activate the disks.
2594

2595
    """
2596
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2597
    if not disks_ok:
2598
      raise errors.OpExecError("Cannot activate block devices")
2599

    
2600
    return disks_info
2601

    
2602

    
2603
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2604
  """Prepare the block devices for an instance.
2605

2606
  This sets up the block devices on all nodes.
2607

2608
  @type lu: L{LogicalUnit}
2609
  @param lu: the logical unit on whose behalf we execute
2610
  @type instance: L{objects.Instance}
2611
  @param instance: the instance for whose disks we assemble
2612
  @type ignore_secondaries: boolean
2613
  @param ignore_secondaries: if true, errors on secondary nodes
2614
      won't result in an error return from the function
2615
  @return: False if the operation failed, otherwise a list of
2616
      (host, instance_visible_name, node_visible_name)
2617
      with the mapping from node devices to instance devices
2618

2619
  """
2620
  device_info = []
2621
  disks_ok = True
2622
  iname = instance.name
2623
  # With the two passes mechanism we try to reduce the window of
2624
  # opportunity for the race condition of switching DRBD to primary
2625
  # before handshaking occured, but we do not eliminate it
2626

    
2627
  # The proper fix would be to wait (with some limits) until the
2628
  # connection has been made and drbd transitions from WFConnection
2629
  # into any other network-connected state (Connected, SyncTarget,
2630
  # SyncSource, etc.)
2631

    
2632
  # 1st pass, assemble on all nodes in secondary mode
2633
  for inst_disk in instance.disks:
2634
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2635
      lu.cfg.SetDiskID(node_disk, node)
2636
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2637
      msg = result.RemoteFailMsg()
2638
      if msg:
2639
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2640
                           " (is_primary=False, pass=1): %s",
2641
                           inst_disk.iv_name, node, msg)
2642
        if not ignore_secondaries:
2643
          disks_ok = False
2644

    
2645
  # FIXME: race condition on drbd migration to primary
2646

    
2647
  # 2nd pass, do only the primary node
2648
  for inst_disk in instance.disks:
2649
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2650
      if node != instance.primary_node:
2651
        continue
2652
      lu.cfg.SetDiskID(node_disk, node)
2653
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2654
      msg = result.RemoteFailMsg()
2655
      if msg:
2656
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2657
                           " (is_primary=True, pass=2): %s",
2658
                           inst_disk.iv_name, node, msg)
2659
        disks_ok = False
2660
    device_info.append((instance.primary_node, inst_disk.iv_name,
2661
                        result.payload))
2662

    
2663
  # leave the disks configured for the primary node
2664
  # this is a workaround that would be fixed better by
2665
  # improving the logical/physical id handling
2666
  for disk in instance.disks:
2667
    lu.cfg.SetDiskID(disk, instance.primary_node)
2668

    
2669
  return disks_ok, device_info
2670

    
2671

    
2672
def _StartInstanceDisks(lu, instance, force):
2673
  """Start the disks of an instance.
2674

2675
  """
2676
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2677
                                           ignore_secondaries=force)
2678
  if not disks_ok:
2679
    _ShutdownInstanceDisks(lu, instance)
2680
    if force is not None and not force:
2681
      lu.proc.LogWarning("", hint="If the message above refers to a"
2682
                         " secondary node,"
2683
                         " you can retry the operation using '--force'.")
2684
    raise errors.OpExecError("Disk consistency error")
2685

    
2686

    
2687
class LUDeactivateInstanceDisks(NoHooksLU):
2688
  """Shutdown an instance's disks.
2689

2690
  """
2691
  _OP_REQP = ["instance_name"]
2692
  REQ_BGL = False
2693

    
2694
  def ExpandNames(self):
2695
    self._ExpandAndLockInstance()
2696
    self.needed_locks[locking.LEVEL_NODE] = []
2697
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2698

    
2699
  def DeclareLocks(self, level):
2700
    if level == locking.LEVEL_NODE:
2701
      self._LockInstancesNodes()
2702

    
2703
  def CheckPrereq(self):
2704
    """Check prerequisites.
2705

2706
    This checks that the instance is in the cluster.
2707

2708
    """
2709
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2710
    assert self.instance is not None, \
2711
      "Cannot retrieve locked instance %s" % self.op.instance_name
2712

    
2713
  def Exec(self, feedback_fn):
2714
    """Deactivate the disks
2715

2716
    """
2717
    instance = self.instance
2718
    _SafeShutdownInstanceDisks(self, instance)
2719

    
2720

    
2721
def _SafeShutdownInstanceDisks(lu, instance):
2722
  """Shutdown block devices of an instance.
2723

2724
  This function checks if an instance is running, before calling
2725
  _ShutdownInstanceDisks.
2726

2727
  """
2728
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2729
                                      [instance.hypervisor])
2730
  ins_l = ins_l[instance.primary_node]
2731
  if ins_l.failed or not isinstance(ins_l.data, list):
2732
    raise errors.OpExecError("Can't contact node '%s'" %
2733
                             instance.primary_node)
2734

    
2735
  if instance.name in ins_l.data:
2736
    raise errors.OpExecError("Instance is running, can't shutdown"
2737
                             " block devices.")
2738

    
2739
  _ShutdownInstanceDisks(lu, instance)
2740

    
2741

    
2742
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2743
  """Shutdown block devices of an instance.
2744

2745
  This does the shutdown on all nodes of the instance.
2746

2747
  If the ignore_primary is false, errors on the primary node are
2748
  ignored.
2749

2750
  """
2751
  all_result = True
2752
  for disk in instance.disks:
2753
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2754
      lu.cfg.SetDiskID(top_disk, node)
2755
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2756
      msg = result.RemoteFailMsg()
2757
      if msg:
2758
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2759
                      disk.iv_name, node, msg)
2760
        if not ignore_primary or node != instance.primary_node:
2761
          all_result = False
2762
  return all_result
2763

    
2764

    
2765
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2766
  """Checks if a node has enough free memory.
2767

2768
  This function check if a given node has the needed amount of free
2769
  memory. In case the node has less memory or we cannot get the
2770
  information from the node, this function raise an OpPrereqError
2771
  exception.
2772

2773
  @type lu: C{LogicalUnit}
2774
  @param lu: a logical unit from which we get configuration data
2775
  @type node: C{str}
2776
  @param node: the node to check
2777
  @type reason: C{str}
2778
  @param reason: string to use in the error message
2779
  @type requested: C{int}
2780
  @param requested: the amount of memory in MiB to check for
2781
  @type hypervisor_name: C{str}
2782
  @param hypervisor_name: the hypervisor to ask for memory stats
2783
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2784
      we cannot check the node
2785

2786
  """
2787
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2788
  nodeinfo[node].Raise()
2789
  free_mem = nodeinfo[node].data.get('memory_free')
2790
  if not isinstance(free_mem, int):
2791
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2792
                             " was '%s'" % (node, free_mem))
2793
  if requested > free_mem:
2794
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2795
                             " needed %s MiB, available %s MiB" %
2796
                             (node, reason, requested, free_mem))
2797

    
2798

    
2799
class LUStartupInstance(LogicalUnit):
2800
  """Starts an instance.
2801

2802
  """
2803
  HPATH = "instance-start"
2804
  HTYPE = constants.HTYPE_INSTANCE
2805
  _OP_REQP = ["instance_name", "force"]
2806
  REQ_BGL = False
2807

    
2808
  def ExpandNames(self):
2809
    self._ExpandAndLockInstance()
2810

    
2811
  def BuildHooksEnv(self):
2812
    """Build hooks env.
2813

2814
    This runs on master, primary and secondary nodes of the instance.
2815

2816
    """
2817
    env = {
2818
      "FORCE": self.op.force,
2819
      }
2820
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2821
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2822
    return env, nl, nl
2823

    
2824
  def CheckPrereq(self):
2825
    """Check prerequisites.
2826

2827
    This checks that the instance is in the cluster.
2828

2829
    """
2830
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2831
    assert self.instance is not None, \
2832
      "Cannot retrieve locked instance %s" % self.op.instance_name
2833

    
2834
    # extra beparams
2835
    self.beparams = getattr(self.op, "beparams", {})
2836
    if self.beparams:
2837
      if not isinstance(self.beparams, dict):
2838
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2839
                                   " dict" % (type(self.beparams), ))
2840
      # fill the beparams dict
2841
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2842
      self.op.beparams = self.beparams
2843

    
2844
    # extra hvparams
2845
    self.hvparams = getattr(self.op, "hvparams", {})
2846
    if self.hvparams:
2847
      if not isinstance(self.hvparams, dict):
2848
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2849
                                   " dict" % (type(self.hvparams), ))
2850

    
2851
      # check hypervisor parameter syntax (locally)
2852
      cluster = self.cfg.GetClusterInfo()
2853
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2854
      filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2855
                                    instance.hvparams)
2856
      filled_hvp.update(self.hvparams)
2857
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2858
      hv_type.CheckParameterSyntax(filled_hvp)
2859
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2860
      self.op.hvparams = self.hvparams
2861

    
2862
    _CheckNodeOnline(self, instance.primary_node)
2863

    
2864
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2865
    # check bridges existence
2866
    _CheckInstanceBridgesExist(self, instance)
2867

    
2868
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2869
                                              instance.name,
2870
                                              instance.hypervisor)
2871
    remote_info.Raise()
2872
    if not remote_info.data:
2873
      _CheckNodeFreeMemory(self, instance.primary_node,
2874
                           "starting instance %s" % instance.name,
2875
                           bep[constants.BE_MEMORY], instance.hypervisor)
2876

    
2877
  def Exec(self, feedback_fn):
2878
    """Start the instance.
2879

2880
    """
2881
    instance = self.instance
2882
    force = self.op.force
2883

    
2884
    self.cfg.MarkInstanceUp(instance.name)
2885

    
2886
    node_current = instance.primary_node
2887

    
2888
    _StartInstanceDisks(self, instance, force)
2889

    
2890
    result = self.rpc.call_instance_start(node_current, instance,
2891
                                          self.hvparams, self.beparams)
2892
    msg = result.RemoteFailMsg()
2893
    if msg:
2894
      _ShutdownInstanceDisks(self, instance)
2895
      raise errors.OpExecError("Could not start instance: %s" % msg)
2896

    
2897

    
2898
class LURebootInstance(LogicalUnit):
2899
  """Reboot an instance.
2900

2901
  """
2902
  HPATH = "instance-reboot"
2903
  HTYPE = constants.HTYPE_INSTANCE
2904
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2905
  REQ_BGL = False
2906

    
2907
  def ExpandNames(self):
2908
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2909
                                   constants.INSTANCE_REBOOT_HARD,
2910
                                   constants.INSTANCE_REBOOT_FULL]:
2911
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2912
                                  (constants.INSTANCE_REBOOT_SOFT,
2913
                                   constants.INSTANCE_REBOOT_HARD,
2914
                                   constants.INSTANCE_REBOOT_FULL))
2915
    self._ExpandAndLockInstance()
2916

    
2917
  def BuildHooksEnv(self):
2918
    """Build hooks env.
2919

2920
    This runs on master, primary and secondary nodes of the instance.
2921

2922
    """
2923
    env = {
2924
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2925
      "REBOOT_TYPE": self.op.reboot_type,
2926
      }
2927
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2928
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2929
    return env, nl, nl
2930

    
2931
  def CheckPrereq(self):
2932
    """Check prerequisites.
2933

2934
    This checks that the instance is in the cluster.
2935

2936
    """
2937
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2938
    assert self.instance is not None, \
2939
      "Cannot retrieve locked instance %s" % self.op.instance_name
2940

    
2941
    _CheckNodeOnline(self, instance.primary_node)
2942

    
2943
    # check bridges existence
2944
    _CheckInstanceBridgesExist(self, instance)
2945

    
2946
  def Exec(self, feedback_fn):
2947
    """Reboot the instance.
2948

2949
    """
2950
    instance = self.instance
2951
    ignore_secondaries = self.op.ignore_secondaries
2952
    reboot_type = self.op.reboot_type
2953

    
2954
    node_current = instance.primary_node
2955

    
2956
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2957
                       constants.INSTANCE_REBOOT_HARD]:
2958
      for disk in instance.disks:
2959
        self.cfg.SetDiskID(disk, node_current)
2960
      result = self.rpc.call_instance_reboot(node_current, instance,
2961
                                             reboot_type)
2962
      msg = result.RemoteFailMsg()
2963
      if msg:
2964
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2965
    else:
2966
      result = self.rpc.call_instance_shutdown(node_current, instance)
2967
      msg = result.RemoteFailMsg()
2968
      if msg:
2969
        raise errors.OpExecError("Could not shutdown instance for"
2970
                                 " full reboot: %s" % msg)
2971
      _ShutdownInstanceDisks(self, instance)
2972
      _StartInstanceDisks(self, instance, ignore_secondaries)
2973
      result = self.rpc.call_instance_start(node_current, instance, None, None)
2974
      msg = result.RemoteFailMsg()
2975
      if msg:
2976
        _ShutdownInstanceDisks(self, instance)
2977
        raise errors.OpExecError("Could not start instance for"
2978
                                 " full reboot: %s" % msg)
2979

    
2980
    self.cfg.MarkInstanceUp(instance.name)
2981

    
2982

    
2983
class LUShutdownInstance(LogicalUnit):
2984
  """Shutdown an instance.
2985

2986
  """
2987
  HPATH = "instance-stop"
2988
  HTYPE = constants.HTYPE_INSTANCE
2989
  _OP_REQP = ["instance_name"]
2990
  REQ_BGL = False
2991

    
2992
  def ExpandNames(self):
2993
    self._ExpandAndLockInstance()
2994

    
2995
  def BuildHooksEnv(self):
2996
    """Build hooks env.
2997

2998
    This runs on master, primary and secondary nodes of the instance.
2999

3000
    """
3001
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3002
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3003
    return env, nl, nl
3004

    
3005
  def CheckPrereq(self):
3006
    """Check prerequisites.
3007

3008
    This checks that the instance is in the cluster.
3009

3010
    """
3011
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3012
    assert self.instance is not None, \
3013
      "Cannot retrieve locked instance %s" % self.op.instance_name
3014
    _CheckNodeOnline(self, self.instance.primary_node)
3015

    
3016
  def Exec(self, feedback_fn):
3017
    """Shutdown the instance.
3018

3019
    """
3020
    instance = self.instance
3021
    node_current = instance.primary_node
3022
    self.cfg.MarkInstanceDown(instance.name)
3023
    result = self.rpc.call_instance_shutdown(node_current, instance)
3024
    msg = result.RemoteFailMsg()
3025
    if msg:
3026
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3027

    
3028
    _ShutdownInstanceDisks(self, instance)
3029

    
3030

    
3031
class LUReinstallInstance(LogicalUnit):
3032
  """Reinstall an instance.
3033

3034
  """
3035
  HPATH = "instance-reinstall"
3036
  HTYPE = constants.HTYPE_INSTANCE
3037
  _OP_REQP = ["instance_name"]
3038
  REQ_BGL = False
3039

    
3040
  def ExpandNames(self):
3041
    self._ExpandAndLockInstance()
3042

    
3043
  def BuildHooksEnv(self):
3044
    """Build hooks env.
3045

3046
    This runs on master, primary and secondary nodes of the instance.
3047

3048
    """
3049
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3050
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3051
    return env, nl, nl
3052

    
3053
  def CheckPrereq(self):
3054
    """Check prerequisites.
3055

3056
    This checks that the instance is in the cluster and is not running.
3057

3058
    """
3059
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3060
    assert instance is not None, \
3061
      "Cannot retrieve locked instance %s" % self.op.instance_name
3062
    _CheckNodeOnline(self, instance.primary_node)
3063

    
3064
    if instance.disk_template == constants.DT_DISKLESS:
3065
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3066
                                 self.op.instance_name)
3067
    if instance.admin_up:
3068
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3069
                                 self.op.instance_name)
3070
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3071
                                              instance.name,
3072
                                              instance.hypervisor)
3073
    remote_info.Raise()
3074
    if remote_info.data:
3075
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3076
                                 (self.op.instance_name,
3077
                                  instance.primary_node))
3078

    
3079
    self.op.os_type = getattr(self.op, "os_type", None)
3080
    if self.op.os_type is not None:
3081
      # OS verification
3082
      pnode = self.cfg.GetNodeInfo(
3083
        self.cfg.ExpandNodeName(instance.primary_node))
3084
      if pnode is None:
3085
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3086
                                   self.op.pnode)
3087
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3088
      result.Raise()
3089
      if not isinstance(result.data, objects.OS):
3090
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3091
                                   " primary node"  % self.op.os_type)
3092

    
3093
    self.instance = instance
3094

    
3095
  def Exec(self, feedback_fn):
3096
    """Reinstall the instance.
3097

3098
    """
3099
    inst = self.instance
3100

    
3101
    if self.op.os_type is not None:
3102
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3103
      inst.os = self.op.os_type
3104
      self.cfg.Update(inst)
3105

    
3106
    _StartInstanceDisks(self, inst, None)
3107
    try:
3108
      feedback_fn("Running the instance OS create scripts...")
3109
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3110
      msg = result.RemoteFailMsg()
3111
      if msg:
3112
        raise errors.OpExecError("Could not install OS for instance %s"
3113
                                 " on node %s: %s" %
3114
                                 (inst.name, inst.primary_node, msg))
3115
    finally:
3116
      _ShutdownInstanceDisks(self, inst)
3117

    
3118

    
3119
class LURenameInstance(LogicalUnit):
3120
  """Rename an instance.
3121

3122
  """
3123
  HPATH = "instance-rename"
3124
  HTYPE = constants.HTYPE_INSTANCE
3125
  _OP_REQP = ["instance_name", "new_name"]
3126

    
3127
  def BuildHooksEnv(self):
3128
    """Build hooks env.
3129

3130
    This runs on master, primary and secondary nodes of the instance.
3131

3132
    """
3133
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3134
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3135
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3136
    return env, nl, nl
3137

    
3138
  def CheckPrereq(self):
3139
    """Check prerequisites.
3140

3141
    This checks that the instance is in the cluster and is not running.
3142

3143
    """
3144
    instance = self.cfg.GetInstanceInfo(
3145
      self.cfg.ExpandInstanceName(self.op.instance_name))
3146
    if instance is None:
3147
      raise errors.OpPrereqError("Instance '%s' not known" %
3148
                                 self.op.instance_name)
3149
    _CheckNodeOnline(self, instance.primary_node)
3150

    
3151
    if instance.admin_up:
3152
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3153
                                 self.op.instance_name)
3154
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3155
                                              instance.name,
3156
                                              instance.hypervisor)
3157
    remote_info.Raise()
3158
    if remote_info.data:
3159
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3160
                                 (self.op.instance_name,
3161
                                  instance.primary_node))
3162
    self.instance = instance
3163

    
3164
    # new name verification
3165
    name_info = utils.HostInfo(self.op.new_name)
3166

    
3167
    self.op.new_name = new_name = name_info.name
3168
    instance_list = self.cfg.GetInstanceList()
3169
    if new_name in instance_list:
3170
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3171
                                 new_name)
3172

    
3173
    if not getattr(self.op, "ignore_ip", False):
3174
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3175
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3176
                                   (name_info.ip, new_name))
3177

    
3178

    
3179
  def Exec(self, feedback_fn):
3180
    """Reinstall the instance.
3181

3182
    """
3183
    inst = self.instance
3184
    old_name = inst.name
3185

    
3186
    if inst.disk_template == constants.DT_FILE:
3187
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3188

    
3189
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3190
    # Change the instance lock. This is definitely safe while we hold the BGL
3191
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3192
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3193

    
3194
    # re-read the instance from the configuration after rename
3195
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3196

    
3197
    if inst.disk_template == constants.DT_FILE:
3198
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3199
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3200
                                                     old_file_storage_dir,
3201
                                                     new_file_storage_dir)
3202
      result.Raise()
3203
      if not result.data:
3204
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3205
                                 " directory '%s' to '%s' (but the instance"
3206
                                 " has been renamed in Ganeti)" % (
3207
                                 inst.primary_node, old_file_storage_dir,
3208
                                 new_file_storage_dir))
3209

    
3210
      if not result.data[0]:
3211
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3212
                                 " (but the instance has been renamed in"
3213
                                 " Ganeti)" % (old_file_storage_dir,
3214
                                               new_file_storage_dir))
3215

    
3216
    _StartInstanceDisks(self, inst, None)
3217
    try:
3218
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3219
                                                 old_name)
3220
      msg = result.RemoteFailMsg()
3221
      if msg:
3222
        msg = ("Could not run OS rename script for instance %s on node %s"
3223
               " (but the instance has been renamed in Ganeti): %s" %
3224
               (inst.name, inst.primary_node, msg))
3225
        self.proc.LogWarning(msg)
3226
    finally:
3227
      _ShutdownInstanceDisks(self, inst)
3228

    
3229

    
3230
class LURemoveInstance(LogicalUnit):
3231
  """Remove an instance.
3232

3233
  """
3234
  HPATH = "instance-remove"
3235
  HTYPE = constants.HTYPE_INSTANCE
3236
  _OP_REQP = ["instance_name", "ignore_failures"]
3237
  REQ_BGL = False
3238

    
3239
  def ExpandNames(self):
3240
    self._ExpandAndLockInstance()
3241
    self.needed_locks[locking.LEVEL_NODE] = []
3242
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3243

    
3244
  def DeclareLocks(self, level):
3245
    if level == locking.LEVEL_NODE:
3246
      self._LockInstancesNodes()
3247

    
3248
  def BuildHooksEnv(self):
3249
    """Build hooks env.
3250

3251
    This runs on master, primary and secondary nodes of the instance.
3252

3253
    """
3254
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3255
    nl = [self.cfg.GetMasterNode()]
3256
    return env, nl, nl
3257

    
3258
  def CheckPrereq(self):
3259
    """Check prerequisites.
3260

3261
    This checks that the instance is in the cluster.
3262

3263
    """
3264
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3265
    assert self.instance is not None, \
3266
      "Cannot retrieve locked instance %s" % self.op.instance_name
3267

    
3268
  def Exec(self, feedback_fn):
3269
    """Remove the instance.
3270

3271
    """
3272
    instance = self.instance
3273
    logging.info("Shutting down instance %s on node %s",
3274
                 instance.name, instance.primary_node)
3275

    
3276
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3277
    msg = result.RemoteFailMsg()
3278
    if msg:
3279
      if self.op.ignore_failures:
3280
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3281
      else:
3282
        raise errors.OpExecError("Could not shutdown instance %s on"
3283
                                 " node %s: %s" %
3284
                                 (instance.name, instance.primary_node, msg))
3285

    
3286
    logging.info("Removing block devices for instance %s", instance.name)
3287

    
3288
    if not _RemoveDisks(self, instance):
3289
      if self.op.ignore_failures:
3290
        feedback_fn("Warning: can't remove instance's disks")
3291
      else:
3292
        raise errors.OpExecError("Can't remove instance's disks")
3293

    
3294
    logging.info("Removing instance %s out of cluster config", instance.name)
3295

    
3296
    self.cfg.RemoveInstance(instance.name)
3297
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3298

    
3299

    
3300
class LUQueryInstances(NoHooksLU):
3301
  """Logical unit for querying instances.
3302

3303
  """
3304
  _OP_REQP = ["output_fields", "names", "use_locking"]
3305
  REQ_BGL = False
3306
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3307
                                    "admin_state",
3308
                                    "disk_template", "ip", "mac", "bridge",
3309
                                    "sda_size", "sdb_size", "vcpus", "tags",
3310
                                    "network_port", "beparams",
3311
                                    r"(disk)\.(size)/([0-9]+)",
3312
                                    r"(disk)\.(sizes)", "disk_usage",
3313
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3314
                                    r"(nic)\.(macs|ips|bridges)",
3315
                                    r"(disk|nic)\.(count)",
3316
                                    "serial_no", "hypervisor", "hvparams",] +
3317
                                  ["hv/%s" % name
3318
                                   for name in constants.HVS_PARAMETERS] +
3319
                                  ["be/%s" % name
3320
                                   for name in constants.BES_PARAMETERS])
3321
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3322

    
3323

    
3324
  def ExpandNames(self):
3325
    _CheckOutputFields(static=self._FIELDS_STATIC,
3326
                       dynamic=self._FIELDS_DYNAMIC,
3327
                       selected=self.op.output_fields)
3328

    
3329
    self.needed_locks = {}
3330
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3331
    self.share_locks[locking.LEVEL_NODE] = 1
3332

    
3333
    if self.op.names:
3334
      self.wanted = _GetWantedInstances(self, self.op.names)
3335
    else:
3336
      self.wanted = locking.ALL_SET
3337

    
3338
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3339
    self.do_locking = self.do_node_query and self.op.use_locking
3340
    if self.do_locking:
3341
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3342
      self.needed_locks[locking.LEVEL_NODE] = []
3343
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3344

    
3345
  def DeclareLocks(self, level):
3346
    if level == locking.LEVEL_NODE and self.do_locking:
3347
      self._LockInstancesNodes()
3348

    
3349
  def CheckPrereq(self):
3350
    """Check prerequisites.
3351

3352
    """
3353
    pass
3354

    
3355
  def Exec(self, feedback_fn):
3356
    """Computes the list of nodes and their attributes.
3357

3358
    """
3359
    all_info = self.cfg.GetAllInstancesInfo()
3360
    if self.wanted == locking.ALL_SET:
3361
      # caller didn't specify instance names, so ordering is not important
3362
      if self.do_locking:
3363
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3364
      else:
3365
        instance_names = all_info.keys()
3366
      instance_names = utils.NiceSort(instance_names)
3367
    else:
3368
      # caller did specify names, so we must keep the ordering
3369
      if self.do_locking:
3370
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3371
      else:
3372
        tgt_set = all_info.keys()
3373
      missing = set(self.wanted).difference(tgt_set)
3374
      if missing:
3375
        raise errors.OpExecError("Some instances were removed before"
3376
                                 " retrieving their data: %s" % missing)
3377
      instance_names = self.wanted
3378

    
3379
    instance_list = [all_info[iname] for iname in instance_names]
3380

    
3381
    # begin data gathering
3382

    
3383
    nodes = frozenset([inst.primary_node for inst in instance_list])
3384
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3385

    
3386
    bad_nodes = []
3387
    off_nodes = []
3388
    if self.do_node_query:
3389
      live_data = {}
3390
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3391
      for name in nodes:
3392
        result = node_data[name]
3393
        if result.offline:
3394
          # offline nodes will be in both lists
3395
          off_nodes.append(name)
3396
        if result.failed:
3397
          bad_nodes.append(name)
3398
        else:
3399
          if result.data:
3400
            live_data.update(result.data)
3401
            # else no instance is alive
3402
    else:
3403
      live_data = dict([(name, {}) for name in instance_names])
3404

    
3405
    # end data gathering
3406

    
3407
    HVPREFIX = "hv/"
3408
    BEPREFIX = "be/"
3409
    output = []
3410
    for instance in instance_list:
3411
      iout = []
3412
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3413
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3414
      for field in self.op.output_fields:
3415
        st_match = self._FIELDS_STATIC.Matches(field)
3416
        if field == "name":
3417
          val = instance.name
3418
        elif field == "os":
3419
          val = instance.os
3420
        elif field == "pnode":
3421
          val = instance.primary_node
3422
        elif field == "snodes":
3423
          val = list(instance.secondary_nodes)
3424
        elif field == "admin_state":
3425
          val = instance.admin_up
3426
        elif field == "oper_state":
3427
          if instance.primary_node in bad_nodes:
3428
            val = None
3429
          else:
3430
            val = bool(live_data.get(instance.name))
3431
        elif field == "status":
3432
          if instance.primary_node in off_nodes:
3433
            val = "ERROR_nodeoffline"
3434
          elif instance.primary_node in bad_nodes:
3435
            val = "ERROR_nodedown"
3436
          else:
3437
            running = bool(live_data.get(instance.name))
3438
            if running:
3439
              if instance.admin_up:
3440
                val = "running"
3441
              else:
3442
                val = "ERROR_up"
3443
            else:
3444
              if instance.admin_up:
3445
                val = "ERROR_down"
3446
              else:
3447
                val = "ADMIN_down"
3448
        elif field == "oper_ram":
3449
          if instance.primary_node in bad_nodes:
3450
            val = None
3451
          elif instance.name in live_data:
3452
            val = live_data[instance.name].get("memory", "?")
3453
          else:
3454
            val = "-"
3455
        elif field == "vcpus":
3456
          val = i_be[constants.BE_VCPUS]
3457
        elif field == "disk_template":
3458
          val = instance.disk_template
3459
        elif field == "ip":
3460
          if instance.nics:
3461
            val = instance.nics[0].ip
3462
          else:
3463
            val = None
3464
        elif field == "bridge":
3465
          if instance.nics:
3466
            val = instance.nics[0].bridge
3467
          else:
3468
            val = None
3469
        elif field == "mac":
3470
          if instance.nics:
3471
            val = instance.nics[0].mac
3472
          else:
3473
            val = None
3474
        elif field == "sda_size" or field == "sdb_size":
3475
          idx = ord(field[2]) - ord('a')
3476
          try:
3477
            val = instance.FindDisk(idx).size
3478
          except errors.OpPrereqError:
3479
            val = None
3480
        elif field == "disk_usage": # total disk usage per node
3481
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3482
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3483
        elif field == "tags":
3484
          val = list(instance.GetTags())
3485
        elif field == "serial_no":
3486
          val = instance.serial_no
3487
        elif field == "network_port":
3488
          val = instance.network_port
3489
        elif field == "hypervisor":
3490
          val = instance.hypervisor
3491
        elif field == "hvparams":
3492
          val = i_hv
3493
        elif (field.startswith(HVPREFIX) and
3494
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3495
          val = i_hv.get(field[len(HVPREFIX):], None)
3496
        elif field == "beparams":
3497
          val = i_be
3498
        elif (field.startswith(BEPREFIX) and
3499
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3500
          val = i_be.get(field[len(BEPREFIX):], None)
3501
        elif st_match and st_match.groups():
3502
          # matches a variable list
3503
          st_groups = st_match.groups()
3504
          if st_groups and st_groups[0] == "disk":
3505
            if st_groups[1] == "count":
3506
              val = len(instance.disks)
3507
            elif st_groups[1] == "sizes":
3508
              val = [disk.size for disk in instance.disks]
3509
            elif st_groups[1] == "size":
3510
              try:
3511
                val = instance.FindDisk(st_groups[2]).size
3512
              except errors.OpPrereqError:
3513
                val = None
3514
            else:
3515
              assert False, "Unhandled disk parameter"
3516
          elif st_groups[0] == "nic":
3517
            if st_groups[1] == "count":
3518
              val = len(instance.nics)
3519
            elif st_groups[1] == "macs":
3520
              val = [nic.mac for nic in instance.nics]
3521
            elif st_groups[1] == "ips":
3522
              val = [nic.ip for nic in instance.nics]
3523
            elif st_groups[1] == "bridges":
3524
              val = [nic.bridge for nic in instance.nics]
3525
            else:
3526
              # index-based item
3527
              nic_idx = int(st_groups[2])
3528
              if nic_idx >= len(instance.nics):
3529
                val = None
3530
              else:
3531
                if st_groups[1] == "mac":
3532
                  val = instance.nics[nic_idx].mac
3533
                elif st_groups[1] == "ip":
3534
                  val = instance.nics[nic_idx].ip
3535
                elif st_groups[1] == "bridge":
3536
                  val = instance.nics[nic_idx].bridge
3537
                else:
3538
                  assert False, "Unhandled NIC parameter"
3539
          else:
3540
            assert False, ("Declared but unhandled variable parameter '%s'" %
3541
                           field)
3542
        else:
3543
          assert False, "Declared but unhandled parameter '%s'" % field
3544
        iout.append(val)
3545
      output.append(iout)
3546

    
3547
    return output
3548

    
3549

    
3550
class LUFailoverInstance(LogicalUnit):
3551
  """Failover an instance.
3552

3553
  """
3554
  HPATH = "instance-failover"
3555
  HTYPE = constants.HTYPE_INSTANCE
3556
  _OP_REQP = ["instance_name", "ignore_consistency"]
3557
  REQ_BGL = False
3558

    
3559
  def ExpandNames(self):
3560
    self._ExpandAndLockInstance()
3561
    self.needed_locks[locking.LEVEL_NODE] = []
3562
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3563

    
3564
  def DeclareLocks(self, level):
3565
    if level == locking.LEVEL_NODE:
3566
      self._LockInstancesNodes()
3567

    
3568
  def BuildHooksEnv(self):
3569
    """Build hooks env.
3570

3571
    This runs on master, primary and secondary nodes of the instance.
3572

3573
    """
3574
    env = {
3575
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3576
      }
3577
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3578
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3579
    return env, nl, nl
3580

    
3581
  def CheckPrereq(self):
3582
    """Check prerequisites.
3583

3584
    This checks that the instance is in the cluster.
3585

3586
    """
3587
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3588
    assert self.instance is not None, \
3589
      "Cannot retrieve locked instance %s" % self.op.instance_name
3590

    
3591
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3592
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3593
      raise errors.OpPrereqError("Instance's disk layout is not"
3594
                                 " network mirrored, cannot failover.")
3595

    
3596
    secondary_nodes = instance.secondary_nodes
3597
    if not secondary_nodes:
3598
      raise errors.ProgrammerError("no secondary node but using "
3599
                                   "a mirrored disk template")
3600

    
3601
    target_node = secondary_nodes[0]
3602
    _CheckNodeOnline(self, target_node)
3603
    _CheckNodeNotDrained(self, target_node)
3604

    
3605
    if instance.admin_up:
3606
      # check memory requirements on the secondary node
3607
      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3608
                           instance.name, bep[constants.BE_MEMORY],
3609
                           instance.hypervisor)
3610
    else:
3611
      self.LogInfo("Not checking memory on the secondary node as"
3612
                   " instance will not be started")
3613

    
3614
    # check bridge existence
3615
    brlist = [nic.bridge for nic in instance.nics]
3616
    result = self.rpc.call_bridges_exist(target_node, brlist)
3617
    result.Raise()
3618
    if not result.data:
3619
      raise errors.OpPrereqError("One or more target bridges %s does not"
3620
                                 " exist on destination node '%s'" %
3621
                                 (brlist, target_node))
3622

    
3623
  def Exec(self, feedback_fn):
3624
    """Failover an instance.
3625

3626
    The failover is done by shutting it down on its present node and
3627
    starting it on the secondary.
3628

3629
    """
3630
    instance = self.instance
3631

    
3632
    source_node = instance.primary_node
3633
    target_node = instance.secondary_nodes[0]
3634

    
3635
    feedback_fn("* checking disk consistency between source and target")
3636
    for dev in instance.disks:
3637
      # for drbd, these are drbd over lvm
3638
      if not _CheckDiskConsistency(self, dev, target_node, False):
3639
        if instance.admin_up and not self.op.ignore_consistency:
3640
          raise errors.OpExecError("Disk %s is degraded on target node,"
3641
                                   " aborting failover." % dev.iv_name)
3642

    
3643
    feedback_fn("* shutting down instance on source node")
3644
    logging.info("Shutting down instance %s on node %s",
3645
                 instance.name, source_node)
3646

    
3647
    result = self.rpc.call_instance_shutdown(source_node, instance)
3648
    msg = result.RemoteFailMsg()
3649
    if msg:
3650
      if self.op.ignore_consistency:
3651
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3652
                             " Proceeding anyway. Please make sure node"
3653
                             " %s is down. Error details: %s",
3654
                             instance.name, source_node, source_node, msg)
3655
      else:
3656
        raise errors.OpExecError("Could not shutdown instance %s on"
3657
                                 " node %s: %s" %
3658
                                 (instance.name, source_node, msg))
3659

    
3660
    feedback_fn("* deactivating the instance's disks on source node")
3661
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3662
      raise errors.OpExecError("Can't shut down the instance's disks.")
3663

    
3664
    instance.primary_node = target_node
3665
    # distribute new instance config to the other nodes
3666
    self.cfg.Update(instance)
3667

    
3668
    # Only start the instance if it's marked as up
3669
    if instance.admin_up:
3670
      feedback_fn("* activating the instance's disks on target node")
3671
      logging.info("Starting instance %s on node %s",
3672
                   instance.name, target_node)
3673

    
3674
      disks_ok, _ = _AssembleInstanceDisks(self, instance,
3675
                                               ignore_secondaries=True)
3676
      if not disks_ok:
3677
        _ShutdownInstanceDisks(self, instance)
3678
        raise errors.OpExecError("Can't activate the instance's disks")
3679

    
3680
      feedback_fn("* starting the instance on the target node")
3681
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3682
      msg = result.RemoteFailMsg()
3683
      if msg:
3684
        _ShutdownInstanceDisks(self, instance)
3685
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3686
                                 (instance.name, target_node, msg))
3687

    
3688

    
3689
class LUMigrateInstance(LogicalUnit):
3690
  """Migrate an instance.
3691

3692
  This is migration without shutting down, compared to the failover,
3693
  which is done with shutdown.
3694

3695
  """
3696
  HPATH = "instance-migrate"
3697
  HTYPE = constants.HTYPE_INSTANCE
3698
  _OP_REQP = ["instance_name", "live", "cleanup"]
3699

    
3700
  REQ_BGL = False
3701

    
3702
  def ExpandNames(self):
3703
    self._ExpandAndLockInstance()
3704
    self.needed_locks[locking.LEVEL_NODE] = []
3705
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3706

    
3707
  def DeclareLocks(self, level):
3708
    if level == locking.LEVEL_NODE:
3709
      self._LockInstancesNodes()
3710

    
3711
  def BuildHooksEnv(self):
3712
    """Build hooks env.
3713

3714
    This runs on master, primary and secondary nodes of the instance.
3715

3716
    """
3717
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3718
    env["MIGRATE_LIVE"] = self.op.live
3719
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3720
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3721
    return env, nl, nl
3722

    
3723
  def CheckPrereq(self):
3724
    """Check prerequisites.
3725

3726
    This checks that the instance is in the cluster.
3727

3728
    """
3729
    instance = self.cfg.GetInstanceInfo(
3730
      self.cfg.ExpandInstanceName(self.op.instance_name))
3731
    if instance is None:
3732
      raise errors.OpPrereqError("Instance '%s' not known" %
3733
                                 self.op.instance_name)
3734

    
3735
    if instance.disk_template != constants.DT_DRBD8:
3736
      raise errors.OpPrereqError("Instance's disk layout is not"
3737
                                 " drbd8, cannot migrate.")
3738

    
3739
    secondary_nodes = instance.secondary_nodes
3740
    if not secondary_nodes:
3741
      raise errors.ConfigurationError("No secondary node but using"
3742
                                      " drbd8 disk template")
3743

    
3744
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3745

    
3746
    target_node = secondary_nodes[0]
3747
    # check memory requirements on the secondary node
3748
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3749
                         instance.name, i_be[constants.BE_MEMORY],
3750
                         instance.hypervisor)
3751

    
3752
    # check bridge existence
3753
    brlist = [nic.bridge for nic in instance.nics]
3754
    result = self.rpc.call_bridges_exist(target_node, brlist)
3755
    if result.failed or not result.data:
3756
      raise errors.OpPrereqError("One or more target bridges %s does not"
3757
                                 " exist on destination node '%s'" %
3758
                                 (brlist, target_node))
3759

    
3760
    if not self.op.cleanup:
3761
      _CheckNodeNotDrained(self, target_node)
3762
      result = self.rpc.call_instance_migratable(instance.primary_node,
3763
                                                 instance)
3764
      msg = result.RemoteFailMsg()
3765
      if msg:
3766
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3767
                                   msg)
3768

    
3769
    self.instance = instance
3770

    
3771
  def _WaitUntilSync(self):
3772
    """Poll with custom rpc for disk sync.
3773

3774
    This uses our own step-based rpc call.
3775

3776
    """
3777
    self.feedback_fn("* wait until resync is done")
3778
    all_done = False
3779
    while not all_done:
3780
      all_done = True
3781
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3782
                                            self.nodes_ip,
3783
                                            self.instance.disks)
3784
      min_percent = 100
3785
      for node, nres in result.items():
3786
        msg = nres.RemoteFailMsg()
3787
        if msg:
3788
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3789
                                   (node, msg))
3790
        node_done, node_percent = nres.payload
3791
        all_done = all_done and node_done
3792
        if node_percent is not None:
3793
          min_percent = min(min_percent, node_percent)
3794
      if not all_done:
3795
        if min_percent < 100:
3796
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3797
        time.sleep(2)
3798

    
3799
  def _EnsureSecondary(self, node):
3800
    """Demote a node to secondary.
3801

3802
    """
3803
    self.feedback_fn("* switching node %s to secondary mode" % node)
3804

    
3805
    for dev in self.instance.disks:
3806
      self.cfg.SetDiskID(dev, node)
3807

    
3808
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3809
                                          self.instance.disks)
3810
    msg = result.RemoteFailMsg()
3811
    if msg:
3812
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3813
                               " error %s" % (node, msg))
3814

    
3815
  def _GoStandalone(self):
3816
    """Disconnect from the network.
3817

3818
    """
3819
    self.feedback_fn("* changing into standalone mode")
3820
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3821
                                               self.instance.disks)
3822
    for node, nres in result.items():
3823
      msg = nres.RemoteFailMsg()
3824
      if msg:
3825
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3826
                                 " error %s" % (node, msg))
3827

    
3828
  def _GoReconnect(self, multimaster):
3829
    """Reconnect to the network.
3830

3831
    """
3832
    if multimaster:
3833
      msg = "dual-master"
3834
    else:
3835
      msg = "single-master"
3836
    self.feedback_fn("* changing disks into %s mode" % msg)
3837
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3838
                                           self.instance.disks,
3839
                                           self.instance.name, multimaster)
3840
    for node, nres in result.items():
3841
      msg = nres.RemoteFailMsg()
3842
      if msg:
3843
        raise errors.OpExecError("Cannot change disks config on node %s,"
3844
                                 " error: %s" % (node, msg))
3845

    
3846
  def _ExecCleanup(self):
3847
    """Try to cleanup after a failed migration.
3848

3849
    The cleanup is done by:
3850
      - check that the instance is running only on one node
3851
        (and update the config if needed)
3852
      - change disks on its secondary node to secondary
3853
      - wait until disks are fully synchronized
3854
      - disconnect from the network
3855
      - change disks into single-master mode
3856
      - wait again until disks are fully synchronized
3857

3858
    """
3859
    instance = self.instance
3860
    target_node = self.target_node
3861
    source_node = self.source_node
3862

    
3863
    # check running on only one node
3864
    self.feedback_fn("* checking where the instance actually runs"
3865
                     " (if this hangs, the hypervisor might be in"
3866
                     " a bad state)")
3867
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3868
    for node, result in ins_l.items():
3869
      result.Raise()
3870
      if not isinstance(result.data, list):
3871
        raise errors.OpExecError("Can't contact node '%s'" % node)
3872

    
3873
    runningon_source = instance.name in ins_l[source_node].data
3874
    runningon_target = instance.name in ins_l[target_node].data
3875

    
3876
    if runningon_source and runningon_target:
3877
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3878
                               " or the hypervisor is confused. You will have"
3879
                               " to ensure manually that it runs only on one"
3880
                               " and restart this operation.")
3881

    
3882
    if not (runningon_source or runningon_target):
3883
      raise errors.OpExecError("Instance does not seem to be running at all."
3884
                               " In this case, it's safer to repair by"
3885
                               " running 'gnt-instance stop' to ensure disk"
3886
                               " shutdown, and then restarting it.")
3887

    
3888
    if runningon_target:
3889
      # the migration has actually succeeded, we need to update the config
3890
      self.feedback_fn("* instance running on secondary node (%s),"
3891
                       " updating config" % target_node)
3892
      instance.primary_node = target_node
3893
      self.cfg.Update(instance)
3894
      demoted_node = source_node
3895
    else:
3896
      self.feedback_fn("* instance confirmed to be running on its"
3897
                       " primary node (%s)" % source_node)
3898
      demoted_node = target_node
3899

    
3900
    self._EnsureSecondary(demoted_node)
3901
    try:
3902
      self._WaitUntilSync()
3903
    except errors.OpExecError:
3904
      # we ignore here errors, since if the device is standalone, it
3905
      # won't be able to sync
3906
      pass
3907
    self._GoStandalone()
3908
    self._GoReconnect(False)
3909
    self._WaitUntilSync()
3910

    
3911
    self.feedback_fn("* done")
3912

    
3913
  def _RevertDiskStatus(self):
3914
    """Try to revert the disk status after a failed migration.
3915

3916
    """
3917
    target_node = self.target_node
3918
    try:
3919
      self._EnsureSecondary(target_node)
3920
      self._GoStandalone()
3921
      self._GoReconnect(False)
3922
      self._WaitUntilSync()
3923
    except errors.OpExecError, err:
3924
      self.LogWarning("Migration failed and I can't reconnect the"
3925
                      " drives: error '%s'\n"
3926
                      "Please look and recover the instance status" %
3927
                      str(err))
3928

    
3929
  def _AbortMigration(self):
3930
    """Call the hypervisor code to abort a started migration.
3931

3932
    """
3933
    instance = self.instance
3934
    target_node = self.target_node
3935
    migration_info = self.migration_info
3936

    
3937
    abort_result = self.rpc.call_finalize_migration(target_node,
3938
                                                    instance,
3939
                                                    migration_info,
3940
                                                    False)
3941
    abort_msg = abort_result.RemoteFailMsg()
3942
    if abort_msg:
3943
      logging.error("Aborting migration failed on target node %s: %s" %
3944
                    (target_node, abort_msg))
3945
      # Don't raise an exception here, as we stil have to try to revert the
3946
      # disk status, even if this step failed.
3947

    
3948
  def _ExecMigration(self):
3949
    """Migrate an instance.
3950

3951
    The migrate is done by:
3952
      - change the disks into dual-master mode
3953
      - wait until disks are fully synchronized again
3954
      - migrate the instance
3955
      - change disks on the new secondary node (the old primary) to secondary
3956
      - wait until disks are fully synchronized
3957
      - change disks into single-master mode
3958

3959
    """
3960
    instance = self.instance
3961
    target_node = self.target_node
3962
    source_node = self.source_node
3963

    
3964
    self.feedback_fn("* checking disk consistency between source and target")
3965
    for dev in instance.disks:
3966
      if not _CheckDiskConsistency(self, dev, target_node, False):
3967
        raise errors.OpExecError("Disk %s is degraded or not fully"
3968
                                 " synchronized on target node,"
3969
                                 " aborting migrate." % dev.iv_name)
3970

    
3971
    # First get the migration information from the remote node
3972
    result = self.rpc.call_migration_info(source_node, instance)
3973
    msg = result.RemoteFailMsg()
3974
    if msg:
3975
      log_err = ("Failed fetching source migration information from %s: %s" %
3976
                 (source_node, msg))
3977
      logging.error(log_err)
3978
      raise errors.OpExecError(log_err)
3979

    
3980
    self.migration_info = migration_info = result.payload
3981