Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ b119bccb

History | View | Annotate | Download (247.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq
51
    - implement Exec
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_BGL = True
64

    
65
  def __init__(self, processor, op, context, rpc):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overridden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = context.cfg
75
    self.context = context
76
    self.rpc = rpc
77
    # Dicts used to declare locking needs to mcpu
78
    self.needed_locks = None
79
    self.acquired_locks = {}
80
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
81
    self.add_locks = {}
82
    self.remove_locks = {}
83
    # Used to force good behavior when calling helper functions
84
    self.recalculate_locks = {}
85
    self.__ssh = None
86
    # logging
87
    self.LogWarning = processor.LogWarning
88
    self.LogInfo = processor.LogInfo
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95
    self.CheckArguments()
96

    
97
  def __GetSSH(self):
98
    """Returns the SshRunner object
99

100
    """
101
    if not self.__ssh:
102
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
103
    return self.__ssh
104

    
105
  ssh = property(fget=__GetSSH)
106

    
107
  def CheckArguments(self):
108
    """Check syntactic validity for the opcode arguments.
109

110
    This method is for doing a simple syntactic check and ensure
111
    validity of opcode parameters, without any cluster-related
112
    checks. While the same can be accomplished in ExpandNames and/or
113
    CheckPrereq, doing these separate is better because:
114

115
      - ExpandNames is left as as purely a lock-related function
116
      - CheckPrereq is run after we have acquired locks (and possible
117
        waited for them)
118

119
    The function is allowed to change the self.op attribute so that
120
    later methods can no longer worry about missing parameters.
121

122
    """
123
    pass
124

    
125
  def ExpandNames(self):
126
    """Expand names for this LU.
127

128
    This method is called before starting to execute the opcode, and it should
129
    update all the parameters of the opcode to their canonical form (e.g. a
130
    short node name must be fully expanded after this method has successfully
131
    completed). This way locking, hooks, logging, ecc. can work correctly.
132

133
    LUs which implement this method must also populate the self.needed_locks
134
    member, as a dict with lock levels as keys, and a list of needed lock names
135
    as values. Rules:
136

137
      - use an empty dict if you don't need any lock
138
      - if you don't need any lock at a particular level omit that level
139
      - don't put anything for the BGL level
140
      - if you want all locks at a level use locking.ALL_SET as a value
141

142
    If you need to share locks (rather than acquire them exclusively) at one
143
    level you can modify self.share_locks, setting a true value (usually 1) for
144
    that level. By default locks are not shared.
145

146
    Examples::
147

148
      # Acquire all nodes and one instance
149
      self.needed_locks = {
150
        locking.LEVEL_NODE: locking.ALL_SET,
151
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
152
      }
153
      # Acquire just two nodes
154
      self.needed_locks = {
155
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
156
      }
157
      # Acquire no locks
158
      self.needed_locks = {} # No, you can't leave it to the default value None
159

160
    """
161
    # The implementation of this method is mandatory only if the new LU is
162
    # concurrent, so that old LUs don't need to be changed all at the same
163
    # time.
164
    if self.REQ_BGL:
165
      self.needed_locks = {} # Exclusive LUs don't need locks.
166
    else:
167
      raise NotImplementedError
168

    
169
  def DeclareLocks(self, level):
170
    """Declare LU locking needs for a level
171

172
    While most LUs can just declare their locking needs at ExpandNames time,
173
    sometimes there's the need to calculate some locks after having acquired
174
    the ones before. This function is called just before acquiring locks at a
175
    particular level, but after acquiring the ones at lower levels, and permits
176
    such calculations. It can be used to modify self.needed_locks, and by
177
    default it does nothing.
178

179
    This function is only called if you have something already set in
180
    self.needed_locks for the level.
181

182
    @param level: Locking level which is going to be locked
183
    @type level: member of ganeti.locking.LEVELS
184

185
    """
186

    
187
  def CheckPrereq(self):
188
    """Check prerequisites for this LU.
189

190
    This method should check that the prerequisites for the execution
191
    of this LU are fulfilled. It can do internode communication, but
192
    it should be idempotent - no cluster or system changes are
193
    allowed.
194

195
    The method should raise errors.OpPrereqError in case something is
196
    not fulfilled. Its return value is ignored.
197

198
    This method should also update all the parameters of the opcode to
199
    their canonical form if it hasn't been done by ExpandNames before.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def Exec(self, feedback_fn):
205
    """Execute the LU.
206

207
    This method should implement the actual work. It should raise
208
    errors.OpExecError for failures that are somewhat dealt with in
209
    code, or expected.
210

211
    """
212
    raise NotImplementedError
213

    
214
  def BuildHooksEnv(self):
215
    """Build hooks environment for this LU.
216

217
    This method should return a three-node tuple consisting of: a dict
218
    containing the environment that will be used for running the
219
    specific hook for this LU, a list of node names on which the hook
220
    should run before the execution, and a list of node names on which
221
    the hook should run after the execution.
222

223
    The keys of the dict must not have 'GANETI_' prefixed as this will
224
    be handled in the hooks runner. Also note additional keys will be
225
    added by the hooks runner. If the LU doesn't define any
226
    environment, an empty dict (and not None) should be returned.
227

228
    No nodes should be returned as an empty list (and not None).
229

230
    Note that if the HPATH for a LU class is None, this function will
231
    not be called.
232

233
    """
234
    raise NotImplementedError
235

    
236
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
237
    """Notify the LU about the results of its hooks.
238

239
    This method is called every time a hooks phase is executed, and notifies
240
    the Logical Unit about the hooks' result. The LU can then use it to alter
241
    its result based on the hooks.  By default the method does nothing and the
242
    previous result is passed back unchanged but any LU can define it if it
243
    wants to use the local cluster hook-scripts somehow.
244

245
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
246
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
247
    @param hook_results: the results of the multi-node hooks rpc call
248
    @param feedback_fn: function used send feedback back to the caller
249
    @param lu_result: the previous Exec result this LU had, or None
250
        in the PRE phase
251
    @return: the new Exec result, based on the previous result
252
        and hook results
253

254
    """
255
    return lu_result
256

    
257
  def _ExpandAndLockInstance(self):
258
    """Helper function to expand and lock an instance.
259

260
    Many LUs that work on an instance take its name in self.op.instance_name
261
    and need to expand it and then declare the expanded name for locking. This
262
    function does it, and then updates self.op.instance_name to the expanded
263
    name. It also initializes needed_locks as a dict, if this hasn't been done
264
    before.
265

266
    """
267
    if self.needed_locks is None:
268
      self.needed_locks = {}
269
    else:
270
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
271
        "_ExpandAndLockInstance called with instance-level locks set"
272
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
273
    if expanded_name is None:
274
      raise errors.OpPrereqError("Instance '%s' not known" %
275
                                  self.op.instance_name)
276
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
277
    self.op.instance_name = expanded_name
278

    
279
  def _LockInstancesNodes(self, primary_only=False):
280
    """Helper function to declare instances' nodes for locking.
281

282
    This function should be called after locking one or more instances to lock
283
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
284
    with all primary or secondary nodes for instances already locked and
285
    present in self.needed_locks[locking.LEVEL_INSTANCE].
286

287
    It should be called from DeclareLocks, and for safety only works if
288
    self.recalculate_locks[locking.LEVEL_NODE] is set.
289

290
    In the future it may grow parameters to just lock some instance's nodes, or
291
    to just lock primaries or secondary nodes, if needed.
292

293
    If should be called in DeclareLocks in a way similar to::
294

295
      if level == locking.LEVEL_NODE:
296
        self._LockInstancesNodes()
297

298
    @type primary_only: boolean
299
    @param primary_only: only lock primary nodes of locked instances
300

301
    """
302
    assert locking.LEVEL_NODE in self.recalculate_locks, \
303
      "_LockInstancesNodes helper function called with no nodes to recalculate"
304

    
305
    # TODO: check if we're really been called with the instance locks held
306

    
307
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
308
    # future we might want to have different behaviors depending on the value
309
    # of self.recalculate_locks[locking.LEVEL_NODE]
310
    wanted_nodes = []
311
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
312
      instance = self.context.cfg.GetInstanceInfo(instance_name)
313
      wanted_nodes.append(instance.primary_node)
314
      if not primary_only:
315
        wanted_nodes.extend(instance.secondary_nodes)
316

    
317
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
318
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
319
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
320
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
321

    
322
    del self.recalculate_locks[locking.LEVEL_NODE]
323

    
324

    
325
class NoHooksLU(LogicalUnit):
326
  """Simple LU which runs no hooks.
327

328
  This LU is intended as a parent for other LogicalUnits which will
329
  run no hooks, in order to reduce duplicate code.
330

331
  """
332
  HPATH = None
333
  HTYPE = None
334

    
335

    
336
def _GetWantedNodes(lu, nodes):
337
  """Returns list of checked and expanded node names.
338

339
  @type lu: L{LogicalUnit}
340
  @param lu: the logical unit on whose behalf we execute
341
  @type nodes: list
342
  @param nodes: list of node names or None for all nodes
343
  @rtype: list
344
  @return: the list of nodes, sorted
345
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
346

347
  """
348
  if not isinstance(nodes, list):
349
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
350

    
351
  if not nodes:
352
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
353
      " non-empty list of nodes whose name is to be expanded.")
354

    
355
  wanted = []
356
  for name in nodes:
357
    node = lu.cfg.ExpandNodeName(name)
358
    if node is None:
359
      raise errors.OpPrereqError("No such node name '%s'" % name)
360
    wanted.append(node)
361

    
362
  return utils.NiceSort(wanted)
363

    
364

    
365
def _GetWantedInstances(lu, instances):
366
  """Returns list of checked and expanded instance names.
367

368
  @type lu: L{LogicalUnit}
369
  @param lu: the logical unit on whose behalf we execute
370
  @type instances: list
371
  @param instances: list of instance names or None for all instances
372
  @rtype: list
373
  @return: the list of instances, sorted
374
  @raise errors.OpPrereqError: if the instances parameter is wrong type
375
  @raise errors.OpPrereqError: if any of the passed instances is not found
376

377
  """
378
  if not isinstance(instances, list):
379
    raise errors.OpPrereqError("Invalid argument type 'instances'")
380

    
381
  if instances:
382
    wanted = []
383

    
384
    for name in instances:
385
      instance = lu.cfg.ExpandInstanceName(name)
386
      if instance is None:
387
        raise errors.OpPrereqError("No such instance name '%s'" % name)
388
      wanted.append(instance)
389

    
390
  else:
391
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
392
  return wanted
393

    
394

    
395
def _CheckOutputFields(static, dynamic, selected):
396
  """Checks whether all selected fields are valid.
397

398
  @type static: L{utils.FieldSet}
399
  @param static: static fields set
400
  @type dynamic: L{utils.FieldSet}
401
  @param dynamic: dynamic fields set
402

403
  """
404
  f = utils.FieldSet()
405
  f.Extend(static)
406
  f.Extend(dynamic)
407

    
408
  delta = f.NonMatching(selected)
409
  if delta:
410
    raise errors.OpPrereqError("Unknown output fields selected: %s"
411
                               % ",".join(delta))
412

    
413

    
414
def _CheckBooleanOpField(op, name):
415
  """Validates boolean opcode parameters.
416

417
  This will ensure that an opcode parameter is either a boolean value,
418
  or None (but that it always exists).
419

420
  """
421
  val = getattr(op, name, None)
422
  if not (val is None or isinstance(val, bool)):
423
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
424
                               (name, str(val)))
425
  setattr(op, name, val)
426

    
427

    
428
def _CheckNodeOnline(lu, node):
429
  """Ensure that a given node is online.
430

431
  @param lu: the LU on behalf of which we make the check
432
  @param node: the node to check
433
  @raise errors.OpPrereqError: if the node is offline
434

435
  """
436
  if lu.cfg.GetNodeInfo(node).offline:
437
    raise errors.OpPrereqError("Can't use offline node %s" % node)
438

    
439

    
440
def _CheckNodeNotDrained(lu, node):
441
  """Ensure that a given node is not drained.
442

443
  @param lu: the LU on behalf of which we make the check
444
  @param node: the node to check
445
  @raise errors.OpPrereqError: if the node is drained
446

447
  """
448
  if lu.cfg.GetNodeInfo(node).drained:
449
    raise errors.OpPrereqError("Can't use drained node %s" % node)
450

    
451

    
452
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
453
                          memory, vcpus, nics, disk_template, disks,
454
                          bep, hvp, hypervisor_name):
455
  """Builds instance related env variables for hooks
456

457
  This builds the hook environment from individual variables.
458

459
  @type name: string
460
  @param name: the name of the instance
461
  @type primary_node: string
462
  @param primary_node: the name of the instance's primary node
463
  @type secondary_nodes: list
464
  @param secondary_nodes: list of secondary nodes as strings
465
  @type os_type: string
466
  @param os_type: the name of the instance's OS
467
  @type status: boolean
468
  @param status: the should_run status of the instance
469
  @type memory: string
470
  @param memory: the memory size of the instance
471
  @type vcpus: string
472
  @param vcpus: the count of VCPUs the instance has
473
  @type nics: list
474
  @param nics: list of tuples (ip, bridge, mac) representing
475
      the NICs the instance  has
476
  @type disk_template: string
477
  @param disk_template: the disk template of the instance
478
  @type disks: list
479
  @param disks: the list of (size, mode) pairs
480
  @type bep: dict
481
  @param bep: the backend parameters for the instance
482
  @type hvp: dict
483
  @param hvp: the hypervisor parameters for the instance
484
  @type hypervisor_name: string
485
  @param hypervisor_name: the hypervisor for the instance
486
  @rtype: dict
487
  @return: the hook environment for this instance
488

489
  """
490
  if status:
491
    str_status = "up"
492
  else:
493
    str_status = "down"
494
  env = {
495
    "OP_TARGET": name,
496
    "INSTANCE_NAME": name,
497
    "INSTANCE_PRIMARY": primary_node,
498
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
499
    "INSTANCE_OS_TYPE": os_type,
500
    "INSTANCE_STATUS": str_status,
501
    "INSTANCE_MEMORY": memory,
502
    "INSTANCE_VCPUS": vcpus,
503
    "INSTANCE_DISK_TEMPLATE": disk_template,
504
    "INSTANCE_HYPERVISOR": hypervisor_name,
505
  }
506

    
507
  if nics:
508
    nic_count = len(nics)
509
    for idx, (ip, bridge, mac) in enumerate(nics):
510
      if ip is None:
511
        ip = ""
512
      env["INSTANCE_NIC%d_IP" % idx] = ip
513
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
514
      env["INSTANCE_NIC%d_MAC" % idx] = mac
515
  else:
516
    nic_count = 0
517

    
518
  env["INSTANCE_NIC_COUNT"] = nic_count
519

    
520
  if disks:
521
    disk_count = len(disks)
522
    for idx, (size, mode) in enumerate(disks):
523
      env["INSTANCE_DISK%d_SIZE" % idx] = size
524
      env["INSTANCE_DISK%d_MODE" % idx] = mode
525
  else:
526
    disk_count = 0
527

    
528
  env["INSTANCE_DISK_COUNT"] = disk_count
529

    
530
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
531
    for key, value in source.items():
532
      env["INSTANCE_%s_%s" % (kind, key)] = value
533

    
534
  return env
535

    
536

    
537
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
538
  """Builds instance related env variables for hooks from an object.
539

540
  @type lu: L{LogicalUnit}
541
  @param lu: the logical unit on whose behalf we execute
542
  @type instance: L{objects.Instance}
543
  @param instance: the instance for which we should build the
544
      environment
545
  @type override: dict
546
  @param override: dictionary with key/values that will override
547
      our values
548
  @rtype: dict
549
  @return: the hook environment dictionary
550

551
  """
552
  cluster = lu.cfg.GetClusterInfo()
553
  bep = cluster.FillBE(instance)
554
  hvp = cluster.FillHV(instance)
555
  args = {
556
    'name': instance.name,
557
    'primary_node': instance.primary_node,
558
    'secondary_nodes': instance.secondary_nodes,
559
    'os_type': instance.os,
560
    'status': instance.admin_up,
561
    'memory': bep[constants.BE_MEMORY],
562
    'vcpus': bep[constants.BE_VCPUS],
563
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
564
    'disk_template': instance.disk_template,
565
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
566
    'bep': bep,
567
    'hvp': hvp,
568
    'hypervisor': instance.hypervisor,
569
  }
570
  if override:
571
    args.update(override)
572
  return _BuildInstanceHookEnv(**args)
573

    
574

    
575
def _AdjustCandidatePool(lu):
576
  """Adjust the candidate pool after node operations.
577

578
  """
579
  mod_list = lu.cfg.MaintainCandidatePool()
580
  if mod_list:
581
    lu.LogInfo("Promoted nodes to master candidate role: %s",
582
               ", ".join(node.name for node in mod_list))
583
    for name in mod_list:
584
      lu.context.ReaddNode(name)
585
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
586
  if mc_now > mc_max:
587
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
588
               (mc_now, mc_max))
589

    
590

    
591
def _CheckInstanceBridgesExist(lu, instance):
592
  """Check that the bridges needed by an instance exist.
593

594
  """
595
  # check bridges existence
596
  brlist = [nic.bridge for nic in instance.nics]
597
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
598
  result.Raise()
599
  if not result.data:
600
    raise errors.OpPrereqError("One or more target bridges %s does not"
601
                               " exist on destination node '%s'" %
602
                               (brlist, instance.primary_node))
603

    
604

    
605
class LUDestroyCluster(NoHooksLU):
606
  """Logical unit for destroying the cluster.
607

608
  """
609
  _OP_REQP = []
610

    
611
  def CheckPrereq(self):
612
    """Check prerequisites.
613

614
    This checks whether the cluster is empty.
615

616
    Any errors are signaled by raising errors.OpPrereqError.
617

618
    """
619
    master = self.cfg.GetMasterNode()
620

    
621
    nodelist = self.cfg.GetNodeList()
622
    if len(nodelist) != 1 or nodelist[0] != master:
623
      raise errors.OpPrereqError("There are still %d node(s) in"
624
                                 " this cluster." % (len(nodelist) - 1))
625
    instancelist = self.cfg.GetInstanceList()
626
    if instancelist:
627
      raise errors.OpPrereqError("There are still %d instance(s) in"
628
                                 " this cluster." % len(instancelist))
629

    
630
  def Exec(self, feedback_fn):
631
    """Destroys the cluster.
632

633
    """
634
    master = self.cfg.GetMasterNode()
635
    result = self.rpc.call_node_stop_master(master, False)
636
    result.Raise()
637
    if not result.data:
638
      raise errors.OpExecError("Could not disable the master role")
639
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640
    utils.CreateBackup(priv_key)
641
    utils.CreateBackup(pub_key)
642
    return master
643

    
644

    
645
class LUVerifyCluster(LogicalUnit):
646
  """Verifies the cluster status.
647

648
  """
649
  HPATH = "cluster-verify"
650
  HTYPE = constants.HTYPE_CLUSTER
651
  _OP_REQP = ["skip_checks"]
652
  REQ_BGL = False
653

    
654
  def ExpandNames(self):
655
    self.needed_locks = {
656
      locking.LEVEL_NODE: locking.ALL_SET,
657
      locking.LEVEL_INSTANCE: locking.ALL_SET,
658
    }
659
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
660

    
661
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
662
                  node_result, feedback_fn, master_files,
663
                  drbd_map, vg_name):
664
    """Run multiple tests against a node.
665

666
    Test list:
667

668
      - compares ganeti version
669
      - checks vg existence and size > 20G
670
      - checks config file checksum
671
      - checks ssh to other nodes
672

673
    @type nodeinfo: L{objects.Node}
674
    @param nodeinfo: the node to check
675
    @param file_list: required list of files
676
    @param local_cksum: dictionary of local files and their checksums
677
    @param node_result: the results from the node
678
    @param feedback_fn: function used to accumulate results
679
    @param master_files: list of files that only masters should have
680
    @param drbd_map: the useddrbd minors for this node, in
681
        form of minor: (instance, must_exist) which correspond to instances
682
        and their running status
683
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
684

685
    """
686
    node = nodeinfo.name
687

    
688
    # main result, node_result should be a non-empty dict
689
    if not node_result or not isinstance(node_result, dict):
690
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
691
      return True
692

    
693
    # compares ganeti version
694
    local_version = constants.PROTOCOL_VERSION
695
    remote_version = node_result.get('version', None)
696
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
697
            len(remote_version) == 2):
698
      feedback_fn("  - ERROR: connection to %s failed" % (node))
699
      return True
700

    
701
    if local_version != remote_version[0]:
702
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
703
                  " node %s %s" % (local_version, node, remote_version[0]))
704
      return True
705

    
706
    # node seems compatible, we can actually try to look into its results
707

    
708
    bad = False
709

    
710
    # full package version
711
    if constants.RELEASE_VERSION != remote_version[1]:
712
      feedback_fn("  - WARNING: software version mismatch: master %s,"
713
                  " node %s %s" %
714
                  (constants.RELEASE_VERSION, node, remote_version[1]))
715

    
716
    # checks vg existence and size > 20G
717
    if vg_name is not None:
718
      vglist = node_result.get(constants.NV_VGLIST, None)
719
      if not vglist:
720
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
721
                        (node,))
722
        bad = True
723
      else:
724
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
725
                                              constants.MIN_VG_SIZE)
726
        if vgstatus:
727
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728
          bad = True
729

    
730
    # checks config file checksum
731

    
732
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
733
    if not isinstance(remote_cksum, dict):
734
      bad = True
735
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
736
    else:
737
      for file_name in file_list:
738
        node_is_mc = nodeinfo.master_candidate
739
        must_have_file = file_name not in master_files
740
        if file_name not in remote_cksum:
741
          if node_is_mc or must_have_file:
742
            bad = True
743
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
744
        elif remote_cksum[file_name] != local_cksum[file_name]:
745
          if node_is_mc or must_have_file:
746
            bad = True
747
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
748
          else:
749
            # not candidate and this is not a must-have file
750
            bad = True
751
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
752
                        " candidates (and the file is outdated)" % file_name)
753
        else:
754
          # all good, except non-master/non-must have combination
755
          if not node_is_mc and not must_have_file:
756
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
757
                        " candidates" % file_name)
758

    
759
    # checks ssh to any
760

    
761
    if constants.NV_NODELIST not in node_result:
762
      bad = True
763
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
764
    else:
765
      if node_result[constants.NV_NODELIST]:
766
        bad = True
767
        for node in node_result[constants.NV_NODELIST]:
768
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
769
                          (node, node_result[constants.NV_NODELIST][node]))
770

    
771
    if constants.NV_NODENETTEST not in node_result:
772
      bad = True
773
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
774
    else:
775
      if node_result[constants.NV_NODENETTEST]:
776
        bad = True
777
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
778
        for node in nlist:
779
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
780
                          (node, node_result[constants.NV_NODENETTEST][node]))
781

    
782
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
783
    if isinstance(hyp_result, dict):
784
      for hv_name, hv_result in hyp_result.iteritems():
785
        if hv_result is not None:
786
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
787
                      (hv_name, hv_result))
788

    
789
    # check used drbd list
790
    if vg_name is not None:
791
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
792
      if not isinstance(used_minors, (tuple, list)):
793
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
794
                    str(used_minors))
795
      else:
796
        for minor, (iname, must_exist) in drbd_map.items():
797
          if minor not in used_minors and must_exist:
798
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
799
                        " not active" % (minor, iname))
800
            bad = True
801
        for minor in used_minors:
802
          if minor not in drbd_map:
803
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
804
                        minor)
805
            bad = True
806

    
807
    return bad
808

    
809
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
810
                      node_instance, feedback_fn, n_offline):
811
    """Verify an instance.
812

813
    This function checks to see if the required block devices are
814
    available on the instance's node.
815

816
    """
817
    bad = False
818

    
819
    node_current = instanceconfig.primary_node
820

    
821
    node_vol_should = {}
822
    instanceconfig.MapLVsByNode(node_vol_should)
823

    
824
    for node in node_vol_should:
825
      if node in n_offline:
826
        # ignore missing volumes on offline nodes
827
        continue
828
      for volume in node_vol_should[node]:
829
        if node not in node_vol_is or volume not in node_vol_is[node]:
830
          feedback_fn("  - ERROR: volume %s missing on node %s" %
831
                          (volume, node))
832
          bad = True
833

    
834
    if instanceconfig.admin_up:
835
      if ((node_current not in node_instance or
836
          not instance in node_instance[node_current]) and
837
          node_current not in n_offline):
838
        feedback_fn("  - ERROR: instance %s not running on node %s" %
839
                        (instance, node_current))
840
        bad = True
841

    
842
    for node in node_instance:
843
      if (not node == node_current):
844
        if instance in node_instance[node]:
845
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
846
                          (instance, node))
847
          bad = True
848

    
849
    return bad
850

    
851
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
852
    """Verify if there are any unknown volumes in the cluster.
853

854
    The .os, .swap and backup volumes are ignored. All other volumes are
855
    reported as unknown.
856

857
    """
858
    bad = False
859

    
860
    for node in node_vol_is:
861
      for volume in node_vol_is[node]:
862
        if node not in node_vol_should or volume not in node_vol_should[node]:
863
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
864
                      (volume, node))
865
          bad = True
866
    return bad
867

    
868
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
869
    """Verify the list of running instances.
870

871
    This checks what instances are running but unknown to the cluster.
872

873
    """
874
    bad = False
875
    for node in node_instance:
876
      for runninginstance in node_instance[node]:
877
        if runninginstance not in instancelist:
878
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
879
                          (runninginstance, node))
880
          bad = True
881
    return bad
882

    
883
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
884
    """Verify N+1 Memory Resilience.
885

886
    Check that if one single node dies we can still start all the instances it
887
    was primary for.
888

889
    """
890
    bad = False
891

    
892
    for node, nodeinfo in node_info.iteritems():
893
      # This code checks that every node which is now listed as secondary has
894
      # enough memory to host all instances it is supposed to should a single
895
      # other node in the cluster fail.
896
      # FIXME: not ready for failover to an arbitrary node
897
      # FIXME: does not support file-backed instances
898
      # WARNING: we currently take into account down instances as well as up
899
      # ones, considering that even if they're down someone might want to start
900
      # them even in the event of a node failure.
901
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
902
        needed_mem = 0
903
        for instance in instances:
904
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
905
          if bep[constants.BE_AUTO_BALANCE]:
906
            needed_mem += bep[constants.BE_MEMORY]
907
        if nodeinfo['mfree'] < needed_mem:
908
          feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
909
                      " failovers should node %s fail" % (node, prinode))
910
          bad = True
911
    return bad
912

    
913
  def CheckPrereq(self):
914
    """Check prerequisites.
915

916
    Transform the list of checks we're going to skip into a set and check that
917
    all its members are valid.
918

919
    """
920
    self.skip_set = frozenset(self.op.skip_checks)
921
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
922
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
923

    
924
  def BuildHooksEnv(self):
925
    """Build hooks env.
926

927
    Cluster-Verify hooks just ran in the post phase and their failure makes
928
    the output be logged in the verify output and the verification to fail.
929

930
    """
931
    all_nodes = self.cfg.GetNodeList()
932
    env = {
933
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
934
      }
935
    for node in self.cfg.GetAllNodesInfo().values():
936
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
937

    
938
    return env, [], all_nodes
939

    
940
  def Exec(self, feedback_fn):
941
    """Verify integrity of cluster, performing various test on nodes.
942

943
    """
944
    bad = False
945
    feedback_fn("* Verifying global settings")
946
    for msg in self.cfg.VerifyConfig():
947
      feedback_fn("  - ERROR: %s" % msg)
948

    
949
    vg_name = self.cfg.GetVGName()
950
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
951
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
952
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
953
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
954
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
955
                        for iname in instancelist)
956
    i_non_redundant = [] # Non redundant instances
957
    i_non_a_balanced = [] # Non auto-balanced instances
958
    n_offline = [] # List of offline nodes
959
    n_drained = [] # List of nodes being drained
960
    node_volume = {}
961
    node_instance = {}
962
    node_info = {}
963
    instance_cfg = {}
964

    
965
    # FIXME: verify OS list
966
    # do local checksums
967
    master_files = [constants.CLUSTER_CONF_FILE]
968

    
969
    file_names = ssconf.SimpleStore().GetFileList()
970
    file_names.append(constants.SSL_CERT_FILE)
971
    file_names.append(constants.RAPI_CERT_FILE)
972
    file_names.extend(master_files)
973

    
974
    local_checksums = utils.FingerprintFiles(file_names)
975

    
976
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
977
    node_verify_param = {
978
      constants.NV_FILELIST: file_names,
979
      constants.NV_NODELIST: [node.name for node in nodeinfo
980
                              if not node.offline],
981
      constants.NV_HYPERVISOR: hypervisors,
982
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
983
                                  node.secondary_ip) for node in nodeinfo
984
                                 if not node.offline],
985
      constants.NV_INSTANCELIST: hypervisors,
986
      constants.NV_VERSION: None,
987
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
988
      }
989
    if vg_name is not None:
990
      node_verify_param[constants.NV_VGLIST] = None
991
      node_verify_param[constants.NV_LVLIST] = vg_name
992
      node_verify_param[constants.NV_DRBDLIST] = None
993
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
994
                                           self.cfg.GetClusterName())
995

    
996
    cluster = self.cfg.GetClusterInfo()
997
    master_node = self.cfg.GetMasterNode()
998
    all_drbd_map = self.cfg.ComputeDRBDMap()
999

    
1000
    for node_i in nodeinfo:
1001
      node = node_i.name
1002
      nresult = all_nvinfo[node].data
1003

    
1004
      if node_i.offline:
1005
        feedback_fn("* Skipping offline node %s" % (node,))
1006
        n_offline.append(node)
1007
        continue
1008

    
1009
      if node == master_node:
1010
        ntype = "master"
1011
      elif node_i.master_candidate:
1012
        ntype = "master candidate"
1013
      elif node_i.drained:
1014
        ntype = "drained"
1015
        n_drained.append(node)
1016
      else:
1017
        ntype = "regular"
1018
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1019

    
1020
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1021
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1022
        bad = True
1023
        continue
1024

    
1025
      node_drbd = {}
1026
      for minor, instance in all_drbd_map[node].items():
1027
        if instance not in instanceinfo:
1028
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1029
                      instance)
1030
          # ghost instance should not be running, but otherwise we
1031
          # don't give double warnings (both ghost instance and
1032
          # unallocated minor in use)
1033
          node_drbd[minor] = (instance, False)
1034
        else:
1035
          instance = instanceinfo[instance]
1036
          node_drbd[minor] = (instance.name, instance.admin_up)
1037
      result = self._VerifyNode(node_i, file_names, local_checksums,
1038
                                nresult, feedback_fn, master_files,
1039
                                node_drbd, vg_name)
1040
      bad = bad or result
1041

    
1042
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1043
      if vg_name is None:
1044
        node_volume[node] = {}
1045
      elif isinstance(lvdata, basestring):
1046
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1047
                    (node, utils.SafeEncode(lvdata)))
1048
        bad = True
1049
        node_volume[node] = {}
1050
      elif not isinstance(lvdata, dict):
1051
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1052
        bad = True
1053
        continue
1054
      else:
1055
        node_volume[node] = lvdata
1056

    
1057
      # node_instance
1058
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1059
      if not isinstance(idata, list):
1060
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1061
                    (node,))
1062
        bad = True
1063
        continue
1064

    
1065
      node_instance[node] = idata
1066

    
1067
      # node_info
1068
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1069
      if not isinstance(nodeinfo, dict):
1070
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1071
        bad = True
1072
        continue
1073

    
1074
      try:
1075
        node_info[node] = {
1076
          "mfree": int(nodeinfo['memory_free']),
1077
          "pinst": [],
1078
          "sinst": [],
1079
          # dictionary holding all instances this node is secondary for,
1080
          # grouped by their primary node. Each key is a cluster node, and each
1081
          # value is a list of instances which have the key as primary and the
1082
          # current node as secondary.  this is handy to calculate N+1 memory
1083
          # availability if you can only failover from a primary to its
1084
          # secondary.
1085
          "sinst-by-pnode": {},
1086
        }
1087
        # FIXME: devise a free space model for file based instances as well
1088
        if vg_name is not None:
1089
          if (constants.NV_VGLIST not in nresult or
1090
              vg_name not in nresult[constants.NV_VGLIST]):
1091
            feedback_fn("  - ERROR: node %s didn't return data for the"
1092
                        " volume group '%s' - it is either missing or broken" %
1093
                        (node, vg_name))
1094
            bad = True
1095
            continue
1096
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1097
      except (ValueError, KeyError):
1098
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1099
                    " from node %s" % (node,))
1100
        bad = True
1101
        continue
1102

    
1103
    node_vol_should = {}
1104

    
1105
    for instance in instancelist:
1106
      feedback_fn("* Verifying instance %s" % instance)
1107
      inst_config = instanceinfo[instance]
1108
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1109
                                     node_instance, feedback_fn, n_offline)
1110
      bad = bad or result
1111
      inst_nodes_offline = []
1112

    
1113
      inst_config.MapLVsByNode(node_vol_should)
1114

    
1115
      instance_cfg[instance] = inst_config
1116

    
1117
      pnode = inst_config.primary_node
1118
      if pnode in node_info:
1119
        node_info[pnode]['pinst'].append(instance)
1120
      elif pnode not in n_offline:
1121
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1122
                    " %s failed" % (instance, pnode))
1123
        bad = True
1124

    
1125
      if pnode in n_offline:
1126
        inst_nodes_offline.append(pnode)
1127

    
1128
      # If the instance is non-redundant we cannot survive losing its primary
1129
      # node, so we are not N+1 compliant. On the other hand we have no disk
1130
      # templates with more than one secondary so that situation is not well
1131
      # supported either.
1132
      # FIXME: does not support file-backed instances
1133
      if len(inst_config.secondary_nodes) == 0:
1134
        i_non_redundant.append(instance)
1135
      elif len(inst_config.secondary_nodes) > 1:
1136
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1137
                    % instance)
1138

    
1139
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1140
        i_non_a_balanced.append(instance)
1141

    
1142
      for snode in inst_config.secondary_nodes:
1143
        if snode in node_info:
1144
          node_info[snode]['sinst'].append(instance)
1145
          if pnode not in node_info[snode]['sinst-by-pnode']:
1146
            node_info[snode]['sinst-by-pnode'][pnode] = []
1147
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1148
        elif snode not in n_offline:
1149
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1150
                      " %s failed" % (instance, snode))
1151
          bad = True
1152
        if snode in n_offline:
1153
          inst_nodes_offline.append(snode)
1154

    
1155
      if inst_nodes_offline:
1156
        # warn that the instance lives on offline nodes, and set bad=True
1157
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1158
                    ", ".join(inst_nodes_offline))
1159
        bad = True
1160

    
1161
    feedback_fn("* Verifying orphan volumes")
1162
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1163
                                       feedback_fn)
1164
    bad = bad or result
1165

    
1166
    feedback_fn("* Verifying remaining instances")
1167
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1168
                                         feedback_fn)
1169
    bad = bad or result
1170

    
1171
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1172
      feedback_fn("* Verifying N+1 Memory redundancy")
1173
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1174
      bad = bad or result
1175

    
1176
    feedback_fn("* Other Notes")
1177
    if i_non_redundant:
1178
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1179
                  % len(i_non_redundant))
1180

    
1181
    if i_non_a_balanced:
1182
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1183
                  % len(i_non_a_balanced))
1184

    
1185
    if n_offline:
1186
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1187

    
1188
    if n_drained:
1189
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1190

    
1191
    return not bad
1192

    
1193
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1194
    """Analyze the post-hooks' result
1195

1196
    This method analyses the hook result, handles it, and sends some
1197
    nicely-formatted feedback back to the user.
1198

1199
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1200
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1201
    @param hooks_results: the results of the multi-node hooks rpc call
1202
    @param feedback_fn: function used send feedback back to the caller
1203
    @param lu_result: previous Exec result
1204
    @return: the new Exec result, based on the previous result
1205
        and hook results
1206

1207
    """
1208
    # We only really run POST phase hooks, and are only interested in
1209
    # their results
1210
    if phase == constants.HOOKS_PHASE_POST:
1211
      # Used to change hooks' output to proper indentation
1212
      indent_re = re.compile('^', re.M)
1213
      feedback_fn("* Hooks Results")
1214
      if not hooks_results:
1215
        feedback_fn("  - ERROR: general communication failure")
1216
        lu_result = 1
1217
      else:
1218
        for node_name in hooks_results:
1219
          show_node_header = True
1220
          res = hooks_results[node_name]
1221
          if res.failed or res.data is False or not isinstance(res.data, list):
1222
            if res.offline:
1223
              # no need to warn or set fail return value
1224
              continue
1225
            feedback_fn("    Communication failure in hooks execution")
1226
            lu_result = 1
1227
            continue
1228
          for script, hkr, output in res.data:
1229
            if hkr == constants.HKR_FAIL:
1230
              # The node header is only shown once, if there are
1231
              # failing hooks on that node
1232
              if show_node_header:
1233
                feedback_fn("  Node %s:" % node_name)
1234
                show_node_header = False
1235
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1236
              output = indent_re.sub('      ', output)
1237
              feedback_fn("%s" % output)
1238
              lu_result = 1
1239

    
1240
      return lu_result
1241

    
1242

    
1243
class LUVerifyDisks(NoHooksLU):
1244
  """Verifies the cluster disks status.
1245

1246
  """
1247
  _OP_REQP = []
1248
  REQ_BGL = False
1249

    
1250
  def ExpandNames(self):
1251
    self.needed_locks = {
1252
      locking.LEVEL_NODE: locking.ALL_SET,
1253
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1254
    }
1255
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1256

    
1257
  def CheckPrereq(self):
1258
    """Check prerequisites.
1259

1260
    This has no prerequisites.
1261

1262
    """
1263
    pass
1264

    
1265
  def Exec(self, feedback_fn):
1266
    """Verify integrity of cluster disks.
1267

1268
    """
1269
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1270

    
1271
    vg_name = self.cfg.GetVGName()
1272
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1273
    instances = [self.cfg.GetInstanceInfo(name)
1274
                 for name in self.cfg.GetInstanceList()]
1275

    
1276
    nv_dict = {}
1277
    for inst in instances:
1278
      inst_lvs = {}
1279
      if (not inst.admin_up or
1280
          inst.disk_template not in constants.DTS_NET_MIRROR):
1281
        continue
1282
      inst.MapLVsByNode(inst_lvs)
1283
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1284
      for node, vol_list in inst_lvs.iteritems():
1285
        for vol in vol_list:
1286
          nv_dict[(node, vol)] = inst
1287

    
1288
    if not nv_dict:
1289
      return result
1290

    
1291
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1292

    
1293
    for node in nodes:
1294
      # node_volume
1295
      lvs = node_lvs[node]
1296
      if lvs.failed:
1297
        if not lvs.offline:
1298
          self.LogWarning("Connection to node %s failed: %s" %
1299
                          (node, lvs.data))
1300
        continue
1301
      lvs = lvs.data
1302
      if isinstance(lvs, basestring):
1303
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1304
        res_nlvm[node] = lvs
1305
        continue
1306
      elif not isinstance(lvs, dict):
1307
        logging.warning("Connection to node %s failed or invalid data"
1308
                        " returned", node)
1309
        res_nodes.append(node)
1310
        continue
1311

    
1312
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1313
        inst = nv_dict.pop((node, lv_name), None)
1314
        if (not lv_online and inst is not None
1315
            and inst.name not in res_instances):
1316
          res_instances.append(inst.name)
1317

    
1318
    # any leftover items in nv_dict are missing LVs, let's arrange the
1319
    # data better
1320
    for key, inst in nv_dict.iteritems():
1321
      if inst.name not in res_missing:
1322
        res_missing[inst.name] = []
1323
      res_missing[inst.name].append(key)
1324

    
1325
    return result
1326

    
1327

    
1328
class LURenameCluster(LogicalUnit):
1329
  """Rename the cluster.
1330

1331
  """
1332
  HPATH = "cluster-rename"
1333
  HTYPE = constants.HTYPE_CLUSTER
1334
  _OP_REQP = ["name"]
1335

    
1336
  def BuildHooksEnv(self):
1337
    """Build hooks env.
1338

1339
    """
1340
    env = {
1341
      "OP_TARGET": self.cfg.GetClusterName(),
1342
      "NEW_NAME": self.op.name,
1343
      }
1344
    mn = self.cfg.GetMasterNode()
1345
    return env, [mn], [mn]
1346

    
1347
  def CheckPrereq(self):
1348
    """Verify that the passed name is a valid one.
1349

1350
    """
1351
    hostname = utils.HostInfo(self.op.name)
1352

    
1353
    new_name = hostname.name
1354
    self.ip = new_ip = hostname.ip
1355
    old_name = self.cfg.GetClusterName()
1356
    old_ip = self.cfg.GetMasterIP()
1357
    if new_name == old_name and new_ip == old_ip:
1358
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1359
                                 " cluster has changed")
1360
    if new_ip != old_ip:
1361
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1362
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1363
                                   " reachable on the network. Aborting." %
1364
                                   new_ip)
1365

    
1366
    self.op.name = new_name
1367

    
1368
  def Exec(self, feedback_fn):
1369
    """Rename the cluster.
1370

1371
    """
1372
    clustername = self.op.name
1373
    ip = self.ip
1374

    
1375
    # shutdown the master IP
1376
    master = self.cfg.GetMasterNode()
1377
    result = self.rpc.call_node_stop_master(master, False)
1378
    if result.failed or not result.data:
1379
      raise errors.OpExecError("Could not disable the master role")
1380

    
1381
    try:
1382
      cluster = self.cfg.GetClusterInfo()
1383
      cluster.cluster_name = clustername
1384
      cluster.master_ip = ip
1385
      self.cfg.Update(cluster)
1386

    
1387
      # update the known hosts file
1388
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1389
      node_list = self.cfg.GetNodeList()
1390
      try:
1391
        node_list.remove(master)
1392
      except ValueError:
1393
        pass
1394
      result = self.rpc.call_upload_file(node_list,
1395
                                         constants.SSH_KNOWN_HOSTS_FILE)
1396
      for to_node, to_result in result.iteritems():
1397
        if to_result.failed or not to_result.data:
1398
          logging.error("Copy of file %s to node %s failed",
1399
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1400

    
1401
    finally:
1402
      result = self.rpc.call_node_start_master(master, False)
1403
      if result.failed or not result.data:
1404
        self.LogWarning("Could not re-enable the master role on"
1405
                        " the master, please restart manually.")
1406

    
1407

    
1408
def _RecursiveCheckIfLVMBased(disk):
1409
  """Check if the given disk or its children are lvm-based.
1410

1411
  @type disk: L{objects.Disk}
1412
  @param disk: the disk to check
1413
  @rtype: boolean
1414
  @return: boolean indicating whether a LD_LV dev_type was found or not
1415

1416
  """
1417
  if disk.children:
1418
    for chdisk in disk.children:
1419
      if _RecursiveCheckIfLVMBased(chdisk):
1420
        return True
1421
  return disk.dev_type == constants.LD_LV
1422

    
1423

    
1424
class LUSetClusterParams(LogicalUnit):
1425
  """Change the parameters of the cluster.
1426

1427
  """
1428
  HPATH = "cluster-modify"
1429
  HTYPE = constants.HTYPE_CLUSTER
1430
  _OP_REQP = []
1431
  REQ_BGL = False
1432

    
1433
  def CheckArguments(self):
1434
    """Check parameters
1435

1436
    """
1437
    if not hasattr(self.op, "candidate_pool_size"):
1438
      self.op.candidate_pool_size = None
1439
    if self.op.candidate_pool_size is not None:
1440
      try:
1441
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1442
      except (ValueError, TypeError), err:
1443
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1444
                                   str(err))
1445
      if self.op.candidate_pool_size < 1:
1446
        raise errors.OpPrereqError("At least one master candidate needed")
1447

    
1448
  def ExpandNames(self):
1449
    # FIXME: in the future maybe other cluster params won't require checking on
1450
    # all nodes to be modified.
1451
    self.needed_locks = {
1452
      locking.LEVEL_NODE: locking.ALL_SET,
1453
    }
1454
    self.share_locks[locking.LEVEL_NODE] = 1
1455

    
1456
  def BuildHooksEnv(self):
1457
    """Build hooks env.
1458

1459
    """
1460
    env = {
1461
      "OP_TARGET": self.cfg.GetClusterName(),
1462
      "NEW_VG_NAME": self.op.vg_name,
1463
      }
1464
    mn = self.cfg.GetMasterNode()
1465
    return env, [mn], [mn]
1466

    
1467
  def CheckPrereq(self):
1468
    """Check prerequisites.
1469

1470
    This checks whether the given params don't conflict and
1471
    if the given volume group is valid.
1472

1473
    """
1474
    if self.op.vg_name is not None and not self.op.vg_name:
1475
      instances = self.cfg.GetAllInstancesInfo().values()
1476
      for inst in instances:
1477
        for disk in inst.disks:
1478
          if _RecursiveCheckIfLVMBased(disk):
1479
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1480
                                       " lvm-based instances exist")
1481

    
1482
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1483

    
1484
    # if vg_name not None, checks given volume group on all nodes
1485
    if self.op.vg_name:
1486
      vglist = self.rpc.call_vg_list(node_list)
1487
      for node in node_list:
1488
        if vglist[node].failed:
1489
          # ignoring down node
1490
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1491
          continue
1492
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1493
                                              self.op.vg_name,
1494
                                              constants.MIN_VG_SIZE)
1495
        if vgstatus:
1496
          raise errors.OpPrereqError("Error on node '%s': %s" %
1497
                                     (node, vgstatus))
1498

    
1499
    self.cluster = cluster = self.cfg.GetClusterInfo()
1500
    # validate beparams changes
1501
    if self.op.beparams:
1502
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1503
      self.new_beparams = cluster.FillDict(
1504
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1505

    
1506
    # hypervisor list/parameters
1507
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1508
    if self.op.hvparams:
1509
      if not isinstance(self.op.hvparams, dict):
1510
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1511
      for hv_name, hv_dict in self.op.hvparams.items():
1512
        if hv_name not in self.new_hvparams:
1513
          self.new_hvparams[hv_name] = hv_dict
1514
        else:
1515
          self.new_hvparams[hv_name].update(hv_dict)
1516

    
1517
    if self.op.enabled_hypervisors is not None:
1518
      self.hv_list = self.op.enabled_hypervisors
1519
      if not self.hv_list:
1520
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1521
                                   " least one member")
1522
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1523
      if invalid_hvs:
1524
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1525
                                   " entries: %s" % invalid_hvs)
1526
    else:
1527
      self.hv_list = cluster.enabled_hypervisors
1528

    
1529
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1530
      # either the enabled list has changed, or the parameters have, validate
1531
      for hv_name, hv_params in self.new_hvparams.items():
1532
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1533
            (self.op.enabled_hypervisors and
1534
             hv_name in self.op.enabled_hypervisors)):
1535
          # either this is a new hypervisor, or its parameters have changed
1536
          hv_class = hypervisor.GetHypervisor(hv_name)
1537
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1538
          hv_class.CheckParameterSyntax(hv_params)
1539
          _CheckHVParams(self, node_list, hv_name, hv_params)
1540

    
1541
  def Exec(self, feedback_fn):
1542
    """Change the parameters of the cluster.
1543

1544
    """
1545
    if self.op.vg_name is not None:
1546
      new_volume = self.op.vg_name
1547
      if not new_volume:
1548
        new_volume = None
1549
      if new_volume != self.cfg.GetVGName():
1550
        self.cfg.SetVGName(new_volume)
1551
      else:
1552
        feedback_fn("Cluster LVM configuration already in desired"
1553
                    " state, not changing")
1554
    if self.op.hvparams:
1555
      self.cluster.hvparams = self.new_hvparams
1556
    if self.op.enabled_hypervisors is not None:
1557
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1558
    if self.op.beparams:
1559
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1560
    if self.op.candidate_pool_size is not None:
1561
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1562
      # we need to update the pool size here, otherwise the save will fail
1563
      _AdjustCandidatePool(self)
1564

    
1565
    self.cfg.Update(self.cluster)
1566

    
1567

    
1568
class LURedistributeConfig(NoHooksLU):
1569
  """Force the redistribution of cluster configuration.
1570

1571
  This is a very simple LU.
1572

1573
  """
1574
  _OP_REQP = []
1575
  REQ_BGL = False
1576

    
1577
  def ExpandNames(self):
1578
    self.needed_locks = {
1579
      locking.LEVEL_NODE: locking.ALL_SET,
1580
    }
1581
    self.share_locks[locking.LEVEL_NODE] = 1
1582

    
1583
  def CheckPrereq(self):
1584
    """Check prerequisites.
1585

1586
    """
1587

    
1588
  def Exec(self, feedback_fn):
1589
    """Redistribute the configuration.
1590

1591
    """
1592
    self.cfg.Update(self.cfg.GetClusterInfo())
1593

    
1594

    
1595
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1596
  """Sleep and poll for an instance's disk to sync.
1597

1598
  """
1599
  if not instance.disks:
1600
    return True
1601

    
1602
  if not oneshot:
1603
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1604

    
1605
  node = instance.primary_node
1606

    
1607
  for dev in instance.disks:
1608
    lu.cfg.SetDiskID(dev, node)
1609

    
1610
  retries = 0
1611
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1612
  while True:
1613
    max_time = 0
1614
    done = True
1615
    cumul_degraded = False
1616
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1617
    if rstats.failed or not rstats.data:
1618
      lu.LogWarning("Can't get any data from node %s", node)
1619
      retries += 1
1620
      if retries >= 10:
1621
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1622
                                 " aborting." % node)
1623
      time.sleep(6)
1624
      continue
1625
    rstats = rstats.data
1626
    retries = 0
1627
    for i, mstat in enumerate(rstats):
1628
      if mstat is None:
1629
        lu.LogWarning("Can't compute data for node %s/%s",
1630
                           node, instance.disks[i].iv_name)
1631
        continue
1632
      # we ignore the ldisk parameter
1633
      perc_done, est_time, is_degraded, _ = mstat
1634
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1635
      if perc_done is not None:
1636
        done = False
1637
        if est_time is not None:
1638
          rem_time = "%d estimated seconds remaining" % est_time
1639
          max_time = est_time
1640
        else:
1641
          rem_time = "no time estimate"
1642
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1643
                        (instance.disks[i].iv_name, perc_done, rem_time))
1644

    
1645
    # if we're done but degraded, let's do a few small retries, to
1646
    # make sure we see a stable and not transient situation; therefore
1647
    # we force restart of the loop
1648
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1649
      logging.info("Degraded disks found, %d retries left", degr_retries)
1650
      degr_retries -= 1
1651
      time.sleep(1)
1652
      continue
1653

    
1654
    if done or oneshot:
1655
      break
1656

    
1657
    time.sleep(min(60, max_time))
1658

    
1659
  if done:
1660
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1661
  return not cumul_degraded
1662

    
1663

    
1664
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1665
  """Check that mirrors are not degraded.
1666

1667
  The ldisk parameter, if True, will change the test from the
1668
  is_degraded attribute (which represents overall non-ok status for
1669
  the device(s)) to the ldisk (representing the local storage status).
1670

1671
  """
1672
  lu.cfg.SetDiskID(dev, node)
1673
  if ldisk:
1674
    idx = 6
1675
  else:
1676
    idx = 5
1677

    
1678
  result = True
1679
  if on_primary or dev.AssembleOnSecondary():
1680
    rstats = lu.rpc.call_blockdev_find(node, dev)
1681
    msg = rstats.RemoteFailMsg()
1682
    if msg:
1683
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1684
      result = False
1685
    elif not rstats.payload:
1686
      lu.LogWarning("Can't find disk on node %s", node)
1687
      result = False
1688
    else:
1689
      result = result and (not rstats.payload[idx])
1690
  if dev.children:
1691
    for child in dev.children:
1692
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1693

    
1694
  return result
1695

    
1696

    
1697
class LUDiagnoseOS(NoHooksLU):
1698
  """Logical unit for OS diagnose/query.
1699

1700
  """
1701
  _OP_REQP = ["output_fields", "names"]
1702
  REQ_BGL = False
1703
  _FIELDS_STATIC = utils.FieldSet()
1704
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1705

    
1706
  def ExpandNames(self):
1707
    if self.op.names:
1708
      raise errors.OpPrereqError("Selective OS query not supported")
1709

    
1710
    _CheckOutputFields(static=self._FIELDS_STATIC,
1711
                       dynamic=self._FIELDS_DYNAMIC,
1712
                       selected=self.op.output_fields)
1713

    
1714
    # Lock all nodes, in shared mode
1715
    # Temporary removal of locks, should be reverted later
1716
    # TODO: reintroduce locks when they are lighter-weight
1717
    self.needed_locks = {}
1718
    #self.share_locks[locking.LEVEL_NODE] = 1
1719
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1720

    
1721
  def CheckPrereq(self):
1722
    """Check prerequisites.
1723

1724
    """
1725

    
1726
  @staticmethod
1727
  def _DiagnoseByOS(node_list, rlist):
1728
    """Remaps a per-node return list into an a per-os per-node dictionary
1729

1730
    @param node_list: a list with the names of all nodes
1731
    @param rlist: a map with node names as keys and OS objects as values
1732

1733
    @rtype: dict
1734
    @return: a dictionary with osnames as keys and as value another map, with
1735
        nodes as keys and list of OS objects as values, eg::
1736

1737
          {"debian-etch": {"node1": [<object>,...],
1738
                           "node2": [<object>,]}
1739
          }
1740

1741
    """
1742
    all_os = {}
1743
    # we build here the list of nodes that didn't fail the RPC (at RPC
1744
    # level), so that nodes with a non-responding node daemon don't
1745
    # make all OSes invalid
1746
    good_nodes = [node_name for node_name in rlist
1747
                  if not rlist[node_name].failed]
1748
    for node_name, nr in rlist.iteritems():
1749
      if nr.failed or not nr.data:
1750
        continue
1751
      for os_obj in nr.data:
1752
        if os_obj.name not in all_os:
1753
          # build a list of nodes for this os containing empty lists
1754
          # for each node in node_list
1755
          all_os[os_obj.name] = {}
1756
          for nname in good_nodes:
1757
            all_os[os_obj.name][nname] = []
1758
        all_os[os_obj.name][node_name].append(os_obj)
1759
    return all_os
1760

    
1761
  def Exec(self, feedback_fn):
1762
    """Compute the list of OSes.
1763

1764
    """
1765
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1766
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1767
    if node_data == False:
1768
      raise errors.OpExecError("Can't gather the list of OSes")
1769
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1770
    output = []
1771
    for os_name, os_data in pol.iteritems():
1772
      row = []
1773
      for field in self.op.output_fields:
1774
        if field == "name":
1775
          val = os_name
1776
        elif field == "valid":
1777
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1778
        elif field == "node_status":
1779
          val = {}
1780
          for node_name, nos_list in os_data.iteritems():
1781
            val[node_name] = [(v.status, v.path) for v in nos_list]
1782
        else:
1783
          raise errors.ParameterError(field)
1784
        row.append(val)
1785
      output.append(row)
1786

    
1787
    return output
1788

    
1789

    
1790
class LURemoveNode(LogicalUnit):
1791
  """Logical unit for removing a node.
1792

1793
  """
1794
  HPATH = "node-remove"
1795
  HTYPE = constants.HTYPE_NODE
1796
  _OP_REQP = ["node_name"]
1797

    
1798
  def BuildHooksEnv(self):
1799
    """Build hooks env.
1800

1801
    This doesn't run on the target node in the pre phase as a failed
1802
    node would then be impossible to remove.
1803

1804
    """
1805
    env = {
1806
      "OP_TARGET": self.op.node_name,
1807
      "NODE_NAME": self.op.node_name,
1808
      }
1809
    all_nodes = self.cfg.GetNodeList()
1810
    all_nodes.remove(self.op.node_name)
1811
    return env, all_nodes, all_nodes
1812

    
1813
  def CheckPrereq(self):
1814
    """Check prerequisites.
1815

1816
    This checks:
1817
     - the node exists in the configuration
1818
     - it does not have primary or secondary instances
1819
     - it's not the master
1820

1821
    Any errors are signaled by raising errors.OpPrereqError.
1822

1823
    """
1824
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1825
    if node is None:
1826
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1827

    
1828
    instance_list = self.cfg.GetInstanceList()
1829

    
1830
    masternode = self.cfg.GetMasterNode()
1831
    if node.name == masternode:
1832
      raise errors.OpPrereqError("Node is the master node,"
1833
                                 " you need to failover first.")
1834

    
1835
    for instance_name in instance_list:
1836
      instance = self.cfg.GetInstanceInfo(instance_name)
1837
      if node.name in instance.all_nodes:
1838
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1839
                                   " please remove first." % instance_name)
1840
    self.op.node_name = node.name
1841
    self.node = node
1842

    
1843
  def Exec(self, feedback_fn):
1844
    """Removes the node from the cluster.
1845

1846
    """
1847
    node = self.node
1848
    logging.info("Stopping the node daemon and removing configs from node %s",
1849
                 node.name)
1850

    
1851
    self.context.RemoveNode(node.name)
1852

    
1853
    self.rpc.call_node_leave_cluster(node.name)
1854

    
1855
    # Promote nodes to master candidate as needed
1856
    _AdjustCandidatePool(self)
1857

    
1858

    
1859
class LUQueryNodes(NoHooksLU):
1860
  """Logical unit for querying nodes.
1861

1862
  """
1863
  _OP_REQP = ["output_fields", "names", "use_locking"]
1864
  REQ_BGL = False
1865
  _FIELDS_DYNAMIC = utils.FieldSet(
1866
    "dtotal", "dfree",
1867
    "mtotal", "mnode", "mfree",
1868
    "bootid",
1869
    "ctotal", "cnodes", "csockets",
1870
    )
1871

    
1872
  _FIELDS_STATIC = utils.FieldSet(
1873
    "name", "pinst_cnt", "sinst_cnt",
1874
    "pinst_list", "sinst_list",
1875
    "pip", "sip", "tags",
1876
    "serial_no",
1877
    "master_candidate",
1878
    "master",
1879
    "offline",
1880
    "drained",
1881
    "role",
1882
    )
1883

    
1884
  def ExpandNames(self):
1885
    _CheckOutputFields(static=self._FIELDS_STATIC,
1886
                       dynamic=self._FIELDS_DYNAMIC,
1887
                       selected=self.op.output_fields)
1888

    
1889
    self.needed_locks = {}
1890
    self.share_locks[locking.LEVEL_NODE] = 1
1891

    
1892
    if self.op.names:
1893
      self.wanted = _GetWantedNodes(self, self.op.names)
1894
    else:
1895
      self.wanted = locking.ALL_SET
1896

    
1897
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1898
    self.do_locking = self.do_node_query and self.op.use_locking
1899
    if self.do_locking:
1900
      # if we don't request only static fields, we need to lock the nodes
1901
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1902

    
1903

    
1904
  def CheckPrereq(self):
1905
    """Check prerequisites.
1906

1907
    """
1908
    # The validation of the node list is done in the _GetWantedNodes,
1909
    # if non empty, and if empty, there's no validation to do
1910
    pass
1911

    
1912
  def Exec(self, feedback_fn):
1913
    """Computes the list of nodes and their attributes.
1914

1915
    """
1916
    all_info = self.cfg.GetAllNodesInfo()
1917
    if self.do_locking:
1918
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1919
    elif self.wanted != locking.ALL_SET:
1920
      nodenames = self.wanted
1921
      missing = set(nodenames).difference(all_info.keys())
1922
      if missing:
1923
        raise errors.OpExecError(
1924
          "Some nodes were removed before retrieving their data: %s" % missing)
1925
    else:
1926
      nodenames = all_info.keys()
1927

    
1928
    nodenames = utils.NiceSort(nodenames)
1929
    nodelist = [all_info[name] for name in nodenames]
1930

    
1931
    # begin data gathering
1932

    
1933
    if self.do_node_query:
1934
      live_data = {}
1935
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1936
                                          self.cfg.GetHypervisorType())
1937
      for name in nodenames:
1938
        nodeinfo = node_data[name]
1939
        if not nodeinfo.failed and nodeinfo.data:
1940
          nodeinfo = nodeinfo.data
1941
          fn = utils.TryConvert
1942
          live_data[name] = {
1943
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1944
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1945
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1946
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1947
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1948
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1949
            "bootid": nodeinfo.get('bootid', None),
1950
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1951
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1952
            }
1953
        else:
1954
          live_data[name] = {}
1955
    else:
1956
      live_data = dict.fromkeys(nodenames, {})
1957

    
1958
    node_to_primary = dict([(name, set()) for name in nodenames])
1959
    node_to_secondary = dict([(name, set()) for name in nodenames])
1960

    
1961
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1962
                             "sinst_cnt", "sinst_list"))
1963
    if inst_fields & frozenset(self.op.output_fields):
1964
      instancelist = self.cfg.GetInstanceList()
1965

    
1966
      for instance_name in instancelist:
1967
        inst = self.cfg.GetInstanceInfo(instance_name)
1968
        if inst.primary_node in node_to_primary:
1969
          node_to_primary[inst.primary_node].add(inst.name)
1970
        for secnode in inst.secondary_nodes:
1971
          if secnode in node_to_secondary:
1972
            node_to_secondary[secnode].add(inst.name)
1973

    
1974
    master_node = self.cfg.GetMasterNode()
1975

    
1976
    # end data gathering
1977

    
1978
    output = []
1979
    for node in nodelist:
1980
      node_output = []
1981
      for field in self.op.output_fields:
1982
        if field == "name":
1983
          val = node.name
1984
        elif field == "pinst_list":
1985
          val = list(node_to_primary[node.name])
1986
        elif field == "sinst_list":
1987
          val = list(node_to_secondary[node.name])
1988
        elif field == "pinst_cnt":
1989
          val = len(node_to_primary[node.name])
1990
        elif field == "sinst_cnt":
1991
          val = len(node_to_secondary[node.name])
1992
        elif field == "pip":
1993
          val = node.primary_ip
1994
        elif field == "sip":
1995
          val = node.secondary_ip
1996
        elif field == "tags":
1997
          val = list(node.GetTags())
1998
        elif field == "serial_no":
1999
          val = node.serial_no
2000
        elif field == "master_candidate":
2001
          val = node.master_candidate
2002
        elif field == "master":
2003
          val = node.name == master_node
2004
        elif field == "offline":
2005
          val = node.offline
2006
        elif field == "drained":
2007
          val = node.drained
2008
        elif self._FIELDS_DYNAMIC.Matches(field):
2009
          val = live_data[node.name].get(field, None)
2010
        elif field == "role":
2011
          if node.name == master_node:
2012
            val = "M"
2013
          elif node.master_candidate:
2014
            val = "C"
2015
          elif node.drained:
2016
            val = "D"
2017
          elif node.offline:
2018
            val = "O"
2019
          else:
2020
            val = "R"
2021
        else:
2022
          raise errors.ParameterError(field)
2023
        node_output.append(val)
2024
      output.append(node_output)
2025

    
2026
    return output
2027

    
2028

    
2029
class LUQueryNodeVolumes(NoHooksLU):
2030
  """Logical unit for getting volumes on node(s).
2031

2032
  """
2033
  _OP_REQP = ["nodes", "output_fields"]
2034
  REQ_BGL = False
2035
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2036
  _FIELDS_STATIC = utils.FieldSet("node")
2037

    
2038
  def ExpandNames(self):
2039
    _CheckOutputFields(static=self._FIELDS_STATIC,
2040
                       dynamic=self._FIELDS_DYNAMIC,
2041
                       selected=self.op.output_fields)
2042

    
2043
    self.needed_locks = {}
2044
    self.share_locks[locking.LEVEL_NODE] = 1
2045
    if not self.op.nodes:
2046
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2047
    else:
2048
      self.needed_locks[locking.LEVEL_NODE] = \
2049
        _GetWantedNodes(self, self.op.nodes)
2050

    
2051
  def CheckPrereq(self):
2052
    """Check prerequisites.
2053

2054
    This checks that the fields required are valid output fields.
2055

2056
    """
2057
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2058

    
2059
  def Exec(self, feedback_fn):
2060
    """Computes the list of nodes and their attributes.
2061

2062
    """
2063
    nodenames = self.nodes
2064
    volumes = self.rpc.call_node_volumes(nodenames)
2065

    
2066
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2067
             in self.cfg.GetInstanceList()]
2068

    
2069
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2070

    
2071
    output = []
2072
    for node in nodenames:
2073
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2074
        continue
2075

    
2076
      node_vols = volumes[node].data[:]
2077
      node_vols.sort(key=lambda vol: vol['dev'])
2078

    
2079
      for vol in node_vols:
2080
        node_output = []
2081
        for field in self.op.output_fields:
2082
          if field == "node":
2083
            val = node
2084
          elif field == "phys":
2085
            val = vol['dev']
2086
          elif field == "vg":
2087
            val = vol['vg']
2088
          elif field == "name":
2089
            val = vol['name']
2090
          elif field == "size":
2091
            val = int(float(vol['size']))
2092
          elif field == "instance":
2093
            for inst in ilist:
2094
              if node not in lv_by_node[inst]:
2095
                continue
2096
              if vol['name'] in lv_by_node[inst][node]:
2097
                val = inst.name
2098
                break
2099
            else:
2100
              val = '-'
2101
          else:
2102
            raise errors.ParameterError(field)
2103
          node_output.append(str(val))
2104

    
2105
        output.append(node_output)
2106

    
2107
    return output
2108

    
2109

    
2110
class LUAddNode(LogicalUnit):
2111
  """Logical unit for adding node to the cluster.
2112

2113
  """
2114
  HPATH = "node-add"
2115
  HTYPE = constants.HTYPE_NODE
2116
  _OP_REQP = ["node_name"]
2117

    
2118
  def BuildHooksEnv(self):
2119
    """Build hooks env.
2120

2121
    This will run on all nodes before, and on all nodes + the new node after.
2122

2123
    """
2124
    env = {
2125
      "OP_TARGET": self.op.node_name,
2126
      "NODE_NAME": self.op.node_name,
2127
      "NODE_PIP": self.op.primary_ip,
2128
      "NODE_SIP": self.op.secondary_ip,
2129
      }
2130
    nodes_0 = self.cfg.GetNodeList()
2131
    nodes_1 = nodes_0 + [self.op.node_name, ]
2132
    return env, nodes_0, nodes_1
2133

    
2134
  def CheckPrereq(self):
2135
    """Check prerequisites.
2136

2137
    This checks:
2138
     - the new node is not already in the config
2139
     - it is resolvable
2140
     - its parameters (single/dual homed) matches the cluster
2141

2142
    Any errors are signaled by raising errors.OpPrereqError.
2143

2144
    """
2145
    node_name = self.op.node_name
2146
    cfg = self.cfg
2147

    
2148
    dns_data = utils.HostInfo(node_name)
2149

    
2150
    node = dns_data.name
2151
    primary_ip = self.op.primary_ip = dns_data.ip
2152
    secondary_ip = getattr(self.op, "secondary_ip", None)
2153
    if secondary_ip is None:
2154
      secondary_ip = primary_ip
2155
    if not utils.IsValidIP(secondary_ip):
2156
      raise errors.OpPrereqError("Invalid secondary IP given")
2157
    self.op.secondary_ip = secondary_ip
2158

    
2159
    node_list = cfg.GetNodeList()
2160
    if not self.op.readd and node in node_list:
2161
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2162
                                 node)
2163
    elif self.op.readd and node not in node_list:
2164
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2165

    
2166
    for existing_node_name in node_list:
2167
      existing_node = cfg.GetNodeInfo(existing_node_name)
2168

    
2169
      if self.op.readd and node == existing_node_name:
2170
        if (existing_node.primary_ip != primary_ip or
2171
            existing_node.secondary_ip != secondary_ip):
2172
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2173
                                     " address configuration as before")
2174
        continue
2175

    
2176
      if (existing_node.primary_ip == primary_ip or
2177
          existing_node.secondary_ip == primary_ip or
2178
          existing_node.primary_ip == secondary_ip or
2179
          existing_node.secondary_ip == secondary_ip):
2180
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2181
                                   " existing node %s" % existing_node.name)
2182

    
2183
    # check that the type of the node (single versus dual homed) is the
2184
    # same as for the master
2185
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2186
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2187
    newbie_singlehomed = secondary_ip == primary_ip
2188
    if master_singlehomed != newbie_singlehomed:
2189
      if master_singlehomed:
2190
        raise errors.OpPrereqError("The master has no private ip but the"
2191
                                   " new node has one")
2192
      else:
2193
        raise errors.OpPrereqError("The master has a private ip but the"
2194
                                   " new node doesn't have one")
2195

    
2196
    # checks reachability
2197
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2198
      raise errors.OpPrereqError("Node not reachable by ping")
2199

    
2200
    if not newbie_singlehomed:
2201
      # check reachability from my secondary ip to newbie's secondary ip
2202
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2203
                           source=myself.secondary_ip):
2204
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2205
                                   " based ping to noded port")
2206

    
2207
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2208
    if self.op.readd:
2209
      exceptions = [node]
2210
    else:
2211
      exceptions = []
2212
    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2213
    # the new node will increase mc_max with one, so:
2214
    mc_max = min(mc_max + 1, cp_size)
2215
    self.master_candidate = mc_now < mc_max
2216

    
2217
    if self.op.readd:
2218
      self.new_node = self.cfg.GetNodeInfo(node)
2219
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2220
    else:
2221
      self.new_node = objects.Node(name=node,
2222
                                   primary_ip=primary_ip,
2223
                                   secondary_ip=secondary_ip,
2224
                                   master_candidate=self.master_candidate,
2225
                                   offline=False, drained=False)
2226

    
2227
  def Exec(self, feedback_fn):
2228
    """Adds the new node to the cluster.
2229

2230
    """
2231
    new_node = self.new_node
2232
    node = new_node.name
2233

    
2234
    # for re-adds, reset the offline/drained/master-candidate flags;
2235
    # we need to reset here, otherwise offline would prevent RPC calls
2236
    # later in the procedure; this also means that if the re-add
2237
    # fails, we are left with a non-offlined, broken node
2238
    if self.op.readd:
2239
      new_node.drained = new_node.offline = False
2240
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2241
      # if we demote the node, we do cleanup later in the procedure
2242
      new_node.master_candidate = self.master_candidate
2243

    
2244
    # notify the user about any possible mc promotion
2245
    if new_node.master_candidate:
2246
      self.LogInfo("Node will be a master candidate")
2247

    
2248
    # check connectivity
2249
    result = self.rpc.call_version([node])[node]
2250
    result.Raise()
2251
    if result.data:
2252
      if constants.PROTOCOL_VERSION == result.data:
2253
        logging.info("Communication to node %s fine, sw version %s match",
2254
                     node, result.data)
2255
      else:
2256
        raise errors.OpExecError("Version mismatch master version %s,"
2257
                                 " node version %s" %
2258
                                 (constants.PROTOCOL_VERSION, result.data))
2259
    else:
2260
      raise errors.OpExecError("Cannot get version from the new node")
2261

    
2262
    # setup ssh on node
2263
    logging.info("Copy ssh key to node %s", node)
2264
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2265
    keyarray = []
2266
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2267
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2268
                priv_key, pub_key]
2269

    
2270
    for i in keyfiles:
2271
      f = open(i, 'r')
2272
      try:
2273
        keyarray.append(f.read())
2274
      finally:
2275
        f.close()
2276

    
2277
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2278
                                    keyarray[2],
2279
                                    keyarray[3], keyarray[4], keyarray[5])
2280

    
2281
    msg = result.RemoteFailMsg()
2282
    if msg:
2283
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2284
                               " new node: %s" % msg)
2285

    
2286
    # Add node to our /etc/hosts, and add key to known_hosts
2287
    utils.AddHostToEtcHosts(new_node.name)
2288

    
2289
    if new_node.secondary_ip != new_node.primary_ip:
2290
      result = self.rpc.call_node_has_ip_address(new_node.name,
2291
                                                 new_node.secondary_ip)
2292
      if result.failed or not result.data:
2293
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2294
                                 " you gave (%s). Please fix and re-run this"
2295
                                 " command." % new_node.secondary_ip)
2296

    
2297
    node_verify_list = [self.cfg.GetMasterNode()]
2298
    node_verify_param = {
2299
      'nodelist': [node],
2300
      # TODO: do a node-net-test as well?
2301
    }
2302

    
2303
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2304
                                       self.cfg.GetClusterName())
2305
    for verifier in node_verify_list:
2306
      if result[verifier].failed or not result[verifier].data:
2307
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2308
                                 " for remote verification" % verifier)
2309
      if result[verifier].data['nodelist']:
2310
        for failed in result[verifier].data['nodelist']:
2311
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2312
                      (verifier, result[verifier].data['nodelist'][failed]))
2313
        raise errors.OpExecError("ssh/hostname verification failed.")
2314

    
2315
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2316
    # including the node just added
2317
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2318
    dist_nodes = self.cfg.GetNodeList()
2319
    if not self.op.readd:
2320
      dist_nodes.append(node)
2321
    if myself.name in dist_nodes:
2322
      dist_nodes.remove(myself.name)
2323

    
2324
    logging.debug("Copying hosts and known_hosts to all nodes")
2325
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2326
      result = self.rpc.call_upload_file(dist_nodes, fname)
2327
      for to_node, to_result in result.iteritems():
2328
        if to_result.failed or not to_result.data:
2329
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2330

    
2331
    to_copy = []
2332
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2333
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2334
      to_copy.append(constants.VNC_PASSWORD_FILE)
2335

    
2336
    for fname in to_copy:
2337
      result = self.rpc.call_upload_file([node], fname)
2338
      if result[node].failed or not result[node]:
2339
        logging.error("Could not copy file %s to node %s", fname, node)
2340

    
2341
    if self.op.readd:
2342
      self.context.ReaddNode(new_node)
2343
      # make sure we redistribute the config
2344
      self.cfg.Update(new_node)
2345
      # and make sure the new node will not have old files around
2346
      if not new_node.master_candidate:
2347
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2348
        msg = result.RemoteFailMsg()
2349
        if msg:
2350
          self.LogWarning("Node failed to demote itself from master"
2351
                          " candidate status: %s" % msg)
2352
    else:
2353
      self.context.AddNode(new_node)
2354

    
2355

    
2356
class LUSetNodeParams(LogicalUnit):
2357
  """Modifies the parameters of a node.
2358

2359
  """
2360
  HPATH = "node-modify"
2361
  HTYPE = constants.HTYPE_NODE
2362
  _OP_REQP = ["node_name"]
2363
  REQ_BGL = False
2364

    
2365
  def CheckArguments(self):
2366
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2367
    if node_name is None:
2368
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2369
    self.op.node_name = node_name
2370
    _CheckBooleanOpField(self.op, 'master_candidate')
2371
    _CheckBooleanOpField(self.op, 'offline')
2372
    _CheckBooleanOpField(self.op, 'drained')
2373
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2374
    if all_mods.count(None) == 3:
2375
      raise errors.OpPrereqError("Please pass at least one modification")
2376
    if all_mods.count(True) > 1:
2377
      raise errors.OpPrereqError("Can't set the node into more than one"
2378
                                 " state at the same time")
2379

    
2380
  def ExpandNames(self):
2381
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2382

    
2383
  def BuildHooksEnv(self):
2384
    """Build hooks env.
2385

2386
    This runs on the master node.
2387

2388
    """
2389
    env = {
2390
      "OP_TARGET": self.op.node_name,
2391
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2392
      "OFFLINE": str(self.op.offline),
2393
      "DRAINED": str(self.op.drained),
2394
      }
2395
    nl = [self.cfg.GetMasterNode(),
2396
          self.op.node_name]
2397
    return env, nl, nl
2398

    
2399
  def CheckPrereq(self):
2400
    """Check prerequisites.
2401

2402
    This only checks the instance list against the existing names.
2403

2404
    """
2405
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2406

    
2407
    if ((self.op.master_candidate == False or self.op.offline == True or
2408
         self.op.drained == True) and node.master_candidate):
2409
      # we will demote the node from master_candidate
2410
      if self.op.node_name == self.cfg.GetMasterNode():
2411
        raise errors.OpPrereqError("The master node has to be a"
2412
                                   " master candidate, online and not drained")
2413
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2414
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2415
      if num_candidates <= cp_size:
2416
        msg = ("Not enough master candidates (desired"
2417
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2418
        if self.op.force:
2419
          self.LogWarning(msg)
2420
        else:
2421
          raise errors.OpPrereqError(msg)
2422

    
2423
    if (self.op.master_candidate == True and
2424
        ((node.offline and not self.op.offline == False) or
2425
         (node.drained and not self.op.drained == False))):
2426
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2427
                                 " to master_candidate" % node.name)
2428

    
2429
    return
2430

    
2431
  def Exec(self, feedback_fn):
2432
    """Modifies a node.
2433

2434
    """
2435
    node = self.node
2436

    
2437
    result = []
2438
    changed_mc = False
2439

    
2440
    if self.op.offline is not None:
2441
      node.offline = self.op.offline
2442
      result.append(("offline", str(self.op.offline)))
2443
      if self.op.offline == True:
2444
        if node.master_candidate:
2445
          node.master_candidate = False
2446
          changed_mc = True
2447
          result.append(("master_candidate", "auto-demotion due to offline"))
2448
        if node.drained:
2449
          node.drained = False
2450
          result.append(("drained", "clear drained status due to offline"))
2451

    
2452
    if self.op.master_candidate is not None:
2453
      node.master_candidate = self.op.master_candidate
2454
      changed_mc = True
2455
      result.append(("master_candidate", str(self.op.master_candidate)))
2456
      if self.op.master_candidate == False:
2457
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2458
        msg = rrc.RemoteFailMsg()
2459
        if msg:
2460
          self.LogWarning("Node failed to demote itself: %s" % msg)
2461

    
2462
    if self.op.drained is not None:
2463
      node.drained = self.op.drained
2464
      result.append(("drained", str(self.op.drained)))
2465
      if self.op.drained == True:
2466
        if node.master_candidate:
2467
          node.master_candidate = False
2468
          changed_mc = True
2469
          result.append(("master_candidate", "auto-demotion due to drain"))
2470
          rrc = self.rpc.call_node_demote_from_mc(node.name)
2471
          msg = rrc.RemoteFailMsg()
2472
          if msg:
2473
            self.LogWarning("Node failed to demote itself: %s" % msg)
2474
        if node.offline:
2475
          node.offline = False
2476
          result.append(("offline", "clear offline status due to drain"))
2477

    
2478
    # this will trigger configuration file update, if needed
2479
    self.cfg.Update(node)
2480
    # this will trigger job queue propagation or cleanup
2481
    if changed_mc:
2482
      self.context.ReaddNode(node)
2483

    
2484
    return result
2485

    
2486

    
2487
class LUQueryClusterInfo(NoHooksLU):
2488
  """Query cluster configuration.
2489

2490
  """
2491
  _OP_REQP = []
2492
  REQ_BGL = False
2493

    
2494
  def ExpandNames(self):
2495
    self.needed_locks = {}
2496

    
2497
  def CheckPrereq(self):
2498
    """No prerequsites needed for this LU.
2499

2500
    """
2501
    pass
2502

    
2503
  def Exec(self, feedback_fn):
2504
    """Return cluster config.
2505

2506
    """
2507
    cluster = self.cfg.GetClusterInfo()
2508
    result = {
2509
      "software_version": constants.RELEASE_VERSION,
2510
      "protocol_version": constants.PROTOCOL_VERSION,
2511
      "config_version": constants.CONFIG_VERSION,
2512
      "os_api_version": constants.OS_API_VERSION,
2513
      "export_version": constants.EXPORT_VERSION,
2514
      "architecture": (platform.architecture()[0], platform.machine()),
2515
      "name": cluster.cluster_name,
2516
      "master": cluster.master_node,
2517
      "default_hypervisor": cluster.default_hypervisor,
2518
      "enabled_hypervisors": cluster.enabled_hypervisors,
2519
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2520
                        for hypervisor_name in cluster.enabled_hypervisors]),
2521
      "beparams": cluster.beparams,
2522
      "candidate_pool_size": cluster.candidate_pool_size,
2523
      "default_bridge": cluster.default_bridge,
2524
      "master_netdev": cluster.master_netdev,
2525
      "volume_group_name": cluster.volume_group_name,
2526
      "file_storage_dir": cluster.file_storage_dir,
2527
      }
2528

    
2529
    return result
2530

    
2531

    
2532
class LUQueryConfigValues(NoHooksLU):
2533
  """Return configuration values.
2534

2535
  """
2536
  _OP_REQP = []
2537
  REQ_BGL = False
2538
  _FIELDS_DYNAMIC = utils.FieldSet()
2539
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2540

    
2541
  def ExpandNames(self):
2542
    self.needed_locks = {}
2543

    
2544
    _CheckOutputFields(static=self._FIELDS_STATIC,
2545
                       dynamic=self._FIELDS_DYNAMIC,
2546
                       selected=self.op.output_fields)
2547

    
2548
  def CheckPrereq(self):
2549
    """No prerequisites.
2550

2551
    """
2552
    pass
2553

    
2554
  def Exec(self, feedback_fn):
2555
    """Dump a representation of the cluster config to the standard output.
2556

2557
    """
2558
    values = []
2559
    for field in self.op.output_fields:
2560
      if field == "cluster_name":
2561
        entry = self.cfg.GetClusterName()
2562
      elif field == "master_node":
2563
        entry = self.cfg.GetMasterNode()
2564
      elif field == "drain_flag":
2565
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2566
      else:
2567
        raise errors.ParameterError(field)
2568
      values.append(entry)
2569
    return values
2570

    
2571

    
2572
class LUActivateInstanceDisks(NoHooksLU):
2573
  """Bring up an instance's disks.
2574

2575
  """
2576
  _OP_REQP = ["instance_name"]
2577
  REQ_BGL = False
2578

    
2579
  def ExpandNames(self):
2580
    self._ExpandAndLockInstance()
2581
    self.needed_locks[locking.LEVEL_NODE] = []
2582
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2583

    
2584
  def DeclareLocks(self, level):
2585
    if level == locking.LEVEL_NODE:
2586
      self._LockInstancesNodes()
2587

    
2588
  def CheckPrereq(self):
2589
    """Check prerequisites.
2590

2591
    This checks that the instance is in the cluster.
2592

2593
    """
2594
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2595
    assert self.instance is not None, \
2596
      "Cannot retrieve locked instance %s" % self.op.instance_name
2597
    _CheckNodeOnline(self, self.instance.primary_node)
2598

    
2599
  def Exec(self, feedback_fn):
2600
    """Activate the disks.
2601

2602
    """
2603
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2604
    if not disks_ok:
2605
      raise errors.OpExecError("Cannot activate block devices")
2606

    
2607
    return disks_info
2608

    
2609

    
2610
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2611
  """Prepare the block devices for an instance.
2612

2613
  This sets up the block devices on all nodes.
2614

2615
  @type lu: L{LogicalUnit}
2616
  @param lu: the logical unit on whose behalf we execute
2617
  @type instance: L{objects.Instance}
2618
  @param instance: the instance for whose disks we assemble
2619
  @type ignore_secondaries: boolean
2620
  @param ignore_secondaries: if true, errors on secondary nodes
2621
      won't result in an error return from the function
2622
  @return: False if the operation failed, otherwise a list of
2623
      (host, instance_visible_name, node_visible_name)
2624
      with the mapping from node devices to instance devices
2625

2626
  """
2627
  device_info = []
2628
  disks_ok = True
2629
  iname = instance.name
2630
  # With the two passes mechanism we try to reduce the window of
2631
  # opportunity for the race condition of switching DRBD to primary
2632
  # before handshaking occured, but we do not eliminate it
2633

    
2634
  # The proper fix would be to wait (with some limits) until the
2635
  # connection has been made and drbd transitions from WFConnection
2636
  # into any other network-connected state (Connected, SyncTarget,
2637
  # SyncSource, etc.)
2638

    
2639
  # 1st pass, assemble on all nodes in secondary mode
2640
  for inst_disk in instance.disks:
2641
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2642
      lu.cfg.SetDiskID(node_disk, node)
2643
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2644
      msg = result.RemoteFailMsg()
2645
      if msg:
2646
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2647
                           " (is_primary=False, pass=1): %s",
2648
                           inst_disk.iv_name, node, msg)
2649
        if not ignore_secondaries:
2650
          disks_ok = False
2651

    
2652
  # FIXME: race condition on drbd migration to primary
2653

    
2654
  # 2nd pass, do only the primary node
2655
  for inst_disk in instance.disks:
2656
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2657
      if node != instance.primary_node:
2658
        continue
2659
      lu.cfg.SetDiskID(node_disk, node)
2660
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2661
      msg = result.RemoteFailMsg()
2662
      if msg:
2663
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2664
                           " (is_primary=True, pass=2): %s",
2665
                           inst_disk.iv_name, node, msg)
2666
        disks_ok = False
2667
    device_info.append((instance.primary_node, inst_disk.iv_name,
2668
                        result.payload))
2669

    
2670
  # leave the disks configured for the primary node
2671
  # this is a workaround that would be fixed better by
2672
  # improving the logical/physical id handling
2673
  for disk in instance.disks:
2674
    lu.cfg.SetDiskID(disk, instance.primary_node)
2675

    
2676
  return disks_ok, device_info
2677

    
2678

    
2679
def _StartInstanceDisks(lu, instance, force):
2680
  """Start the disks of an instance.
2681

2682
  """
2683
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2684
                                           ignore_secondaries=force)
2685
  if not disks_ok:
2686
    _ShutdownInstanceDisks(lu, instance)
2687
    if force is not None and not force:
2688
      lu.proc.LogWarning("", hint="If the message above refers to a"
2689
                         " secondary node,"
2690
                         " you can retry the operation using '--force'.")
2691
    raise errors.OpExecError("Disk consistency error")
2692

    
2693

    
2694
class LUDeactivateInstanceDisks(NoHooksLU):
2695
  """Shutdown an instance's disks.
2696

2697
  """
2698
  _OP_REQP = ["instance_name"]
2699
  REQ_BGL = False
2700

    
2701
  def ExpandNames(self):
2702
    self._ExpandAndLockInstance()
2703
    self.needed_locks[locking.LEVEL_NODE] = []
2704
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2705

    
2706
  def DeclareLocks(self, level):
2707
    if level == locking.LEVEL_NODE:
2708
      self._LockInstancesNodes()
2709

    
2710
  def CheckPrereq(self):
2711
    """Check prerequisites.
2712

2713
    This checks that the instance is in the cluster.
2714

2715
    """
2716
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2717
    assert self.instance is not None, \
2718
      "Cannot retrieve locked instance %s" % self.op.instance_name
2719

    
2720
  def Exec(self, feedback_fn):
2721
    """Deactivate the disks
2722

2723
    """
2724
    instance = self.instance
2725
    _SafeShutdownInstanceDisks(self, instance)
2726

    
2727

    
2728
def _SafeShutdownInstanceDisks(lu, instance):
2729
  """Shutdown block devices of an instance.
2730

2731
  This function checks if an instance is running, before calling
2732
  _ShutdownInstanceDisks.
2733

2734
  """
2735
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2736
                                      [instance.hypervisor])
2737
  ins_l = ins_l[instance.primary_node]
2738
  if ins_l.failed or not isinstance(ins_l.data, list):
2739
    raise errors.OpExecError("Can't contact node '%s'" %
2740
                             instance.primary_node)
2741

    
2742
  if instance.name in ins_l.data:
2743
    raise errors.OpExecError("Instance is running, can't shutdown"
2744
                             " block devices.")
2745

    
2746
  _ShutdownInstanceDisks(lu, instance)
2747

    
2748

    
2749
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2750
  """Shutdown block devices of an instance.
2751

2752
  This does the shutdown on all nodes of the instance.
2753

2754
  If the ignore_primary is false, errors on the primary node are
2755
  ignored.
2756

2757
  """
2758
  all_result = True
2759
  for disk in instance.disks:
2760
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2761
      lu.cfg.SetDiskID(top_disk, node)
2762
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2763
      msg = result.RemoteFailMsg()
2764
      if msg:
2765
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2766
                      disk.iv_name, node, msg)
2767
        if not ignore_primary or node != instance.primary_node:
2768
          all_result = False
2769
  return all_result
2770

    
2771

    
2772
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2773
  """Checks if a node has enough free memory.
2774

2775
  This function check if a given node has the needed amount of free
2776
  memory. In case the node has less memory or we cannot get the
2777
  information from the node, this function raise an OpPrereqError
2778
  exception.
2779

2780
  @type lu: C{LogicalUnit}
2781
  @param lu: a logical unit from which we get configuration data
2782
  @type node: C{str}
2783
  @param node: the node to check
2784
  @type reason: C{str}
2785
  @param reason: string to use in the error message
2786
  @type requested: C{int}
2787
  @param requested: the amount of memory in MiB to check for
2788
  @type hypervisor_name: C{str}
2789
  @param hypervisor_name: the hypervisor to ask for memory stats
2790
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2791
      we cannot check the node
2792

2793
  """
2794
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2795
  nodeinfo[node].Raise()
2796
  free_mem = nodeinfo[node].data.get('memory_free')
2797
  if not isinstance(free_mem, int):
2798
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2799
                             " was '%s'" % (node, free_mem))
2800
  if requested > free_mem:
2801
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2802
                             " needed %s MiB, available %s MiB" %
2803
                             (node, reason, requested, free_mem))
2804

    
2805

    
2806
class LUStartupInstance(LogicalUnit):
2807
  """Starts an instance.
2808

2809
  """
2810
  HPATH = "instance-start"
2811
  HTYPE = constants.HTYPE_INSTANCE
2812
  _OP_REQP = ["instance_name", "force"]
2813
  REQ_BGL = False
2814

    
2815
  def ExpandNames(self):
2816
    self._ExpandAndLockInstance()
2817

    
2818
  def BuildHooksEnv(self):
2819
    """Build hooks env.
2820

2821
    This runs on master, primary and secondary nodes of the instance.
2822

2823
    """
2824
    env = {
2825
      "FORCE": self.op.force,
2826
      }
2827
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2828
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2829
    return env, nl, nl
2830

    
2831
  def CheckPrereq(self):
2832
    """Check prerequisites.
2833

2834
    This checks that the instance is in the cluster.
2835

2836
    """
2837
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2838
    assert self.instance is not None, \
2839
      "Cannot retrieve locked instance %s" % self.op.instance_name
2840

    
2841
    # extra beparams
2842
    self.beparams = getattr(self.op, "beparams", {})
2843
    if self.beparams:
2844
      if not isinstance(self.beparams, dict):
2845
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2846
                                   " dict" % (type(self.beparams), ))
2847
      # fill the beparams dict
2848
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2849
      self.op.beparams = self.beparams
2850

    
2851
    # extra hvparams
2852
    self.hvparams = getattr(self.op, "hvparams", {})
2853
    if self.hvparams:
2854
      if not isinstance(self.hvparams, dict):
2855
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2856
                                   " dict" % (type(self.hvparams), ))
2857

    
2858
      # check hypervisor parameter syntax (locally)
2859
      cluster = self.cfg.GetClusterInfo()
2860
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2861
      filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2862
                                    instance.hvparams)
2863
      filled_hvp.update(self.hvparams)
2864
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2865
      hv_type.CheckParameterSyntax(filled_hvp)
2866
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2867
      self.op.hvparams = self.hvparams
2868

    
2869
    _CheckNodeOnline(self, instance.primary_node)
2870

    
2871
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2872
    # check bridges existence
2873
    _CheckInstanceBridgesExist(self, instance)
2874

    
2875
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2876
                                              instance.name,
2877
                                              instance.hypervisor)
2878
    remote_info.Raise()
2879
    if not remote_info.data:
2880
      _CheckNodeFreeMemory(self, instance.primary_node,
2881
                           "starting instance %s" % instance.name,
2882
                           bep[constants.BE_MEMORY], instance.hypervisor)
2883

    
2884
  def Exec(self, feedback_fn):
2885
    """Start the instance.
2886

2887
    """
2888
    instance = self.instance
2889
    force = self.op.force
2890

    
2891
    self.cfg.MarkInstanceUp(instance.name)
2892

    
2893
    node_current = instance.primary_node
2894

    
2895
    _StartInstanceDisks(self, instance, force)
2896

    
2897
    result = self.rpc.call_instance_start(node_current, instance,
2898
                                          self.hvparams, self.beparams)
2899
    msg = result.RemoteFailMsg()
2900
    if msg:
2901
      _ShutdownInstanceDisks(self, instance)
2902
      raise errors.OpExecError("Could not start instance: %s" % msg)
2903

    
2904

    
2905
class LURebootInstance(LogicalUnit):
2906
  """Reboot an instance.
2907

2908
  """
2909
  HPATH = "instance-reboot"
2910
  HTYPE = constants.HTYPE_INSTANCE
2911
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2912
  REQ_BGL = False
2913

    
2914
  def ExpandNames(self):
2915
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2916
                                   constants.INSTANCE_REBOOT_HARD,
2917
                                   constants.INSTANCE_REBOOT_FULL]:
2918
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2919
                                  (constants.INSTANCE_REBOOT_SOFT,
2920
                                   constants.INSTANCE_REBOOT_HARD,
2921
                                   constants.INSTANCE_REBOOT_FULL))
2922
    self._ExpandAndLockInstance()
2923

    
2924
  def BuildHooksEnv(self):
2925
    """Build hooks env.
2926

2927
    This runs on master, primary and secondary nodes of the instance.
2928

2929
    """
2930
    env = {
2931
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2932
      "REBOOT_TYPE": self.op.reboot_type,
2933
      }
2934
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2935
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2936
    return env, nl, nl
2937

    
2938
  def CheckPrereq(self):
2939
    """Check prerequisites.
2940

2941
    This checks that the instance is in the cluster.
2942

2943
    """
2944
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2945
    assert self.instance is not None, \
2946
      "Cannot retrieve locked instance %s" % self.op.instance_name
2947

    
2948
    _CheckNodeOnline(self, instance.primary_node)
2949

    
2950
    # check bridges existence
2951
    _CheckInstanceBridgesExist(self, instance)
2952

    
2953
  def Exec(self, feedback_fn):
2954
    """Reboot the instance.
2955

2956
    """
2957
    instance = self.instance
2958
    ignore_secondaries = self.op.ignore_secondaries
2959
    reboot_type = self.op.reboot_type
2960

    
2961
    node_current = instance.primary_node
2962

    
2963
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2964
                       constants.INSTANCE_REBOOT_HARD]:
2965
      for disk in instance.disks:
2966
        self.cfg.SetDiskID(disk, node_current)
2967
      result = self.rpc.call_instance_reboot(node_current, instance,
2968
                                             reboot_type)
2969
      msg = result.RemoteFailMsg()
2970
      if msg:
2971
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2972
    else:
2973
      result = self.rpc.call_instance_shutdown(node_current, instance)
2974
      msg = result.RemoteFailMsg()
2975
      if msg:
2976
        raise errors.OpExecError("Could not shutdown instance for"
2977
                                 " full reboot: %s" % msg)
2978
      _ShutdownInstanceDisks(self, instance)
2979
      _StartInstanceDisks(self, instance, ignore_secondaries)
2980
      result = self.rpc.call_instance_start(node_current, instance, None, None)
2981
      msg = result.RemoteFailMsg()
2982
      if msg:
2983
        _ShutdownInstanceDisks(self, instance)
2984
        raise errors.OpExecError("Could not start instance for"
2985
                                 " full reboot: %s" % msg)
2986

    
2987
    self.cfg.MarkInstanceUp(instance.name)
2988

    
2989

    
2990
class LUShutdownInstance(LogicalUnit):
2991
  """Shutdown an instance.
2992

2993
  """
2994
  HPATH = "instance-stop"
2995
  HTYPE = constants.HTYPE_INSTANCE
2996
  _OP_REQP = ["instance_name"]
2997
  REQ_BGL = False
2998

    
2999
  def ExpandNames(self):
3000
    self._ExpandAndLockInstance()
3001

    
3002
  def BuildHooksEnv(self):
3003
    """Build hooks env.
3004

3005
    This runs on master, primary and secondary nodes of the instance.
3006

3007
    """
3008
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3009
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3010
    return env, nl, nl
3011

    
3012
  def CheckPrereq(self):
3013
    """Check prerequisites.
3014

3015
    This checks that the instance is in the cluster.
3016

3017
    """
3018
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3019
    assert self.instance is not None, \
3020
      "Cannot retrieve locked instance %s" % self.op.instance_name
3021
    _CheckNodeOnline(self, self.instance.primary_node)
3022

    
3023
  def Exec(self, feedback_fn):
3024
    """Shutdown the instance.
3025

3026
    """
3027
    instance = self.instance
3028
    node_current = instance.primary_node
3029
    self.cfg.MarkInstanceDown(instance.name)
3030
    result = self.rpc.call_instance_shutdown(node_current, instance)
3031
    msg = result.RemoteFailMsg()
3032
    if msg:
3033
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3034

    
3035
    _ShutdownInstanceDisks(self, instance)
3036

    
3037

    
3038
class LUReinstallInstance(LogicalUnit):
3039
  """Reinstall an instance.
3040

3041
  """
3042
  HPATH = "instance-reinstall"
3043
  HTYPE = constants.HTYPE_INSTANCE
3044
  _OP_REQP = ["instance_name"]
3045
  REQ_BGL = False
3046

    
3047
  def ExpandNames(self):
3048
    self._ExpandAndLockInstance()
3049

    
3050
  def BuildHooksEnv(self):
3051
    """Build hooks env.
3052

3053
    This runs on master, primary and secondary nodes of the instance.
3054

3055
    """
3056
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3057
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3058
    return env, nl, nl
3059

    
3060
  def CheckPrereq(self):
3061
    """Check prerequisites.
3062

3063
    This checks that the instance is in the cluster and is not running.
3064

3065
    """
3066
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3067
    assert instance is not None, \
3068
      "Cannot retrieve locked instance %s" % self.op.instance_name
3069
    _CheckNodeOnline(self, instance.primary_node)
3070

    
3071
    if instance.disk_template == constants.DT_DISKLESS:
3072
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3073
                                 self.op.instance_name)
3074
    if instance.admin_up:
3075
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3076
                                 self.op.instance_name)
3077
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3078
                                              instance.name,
3079
                                              instance.hypervisor)
3080
    remote_info.Raise()
3081
    if remote_info.data:
3082
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3083
                                 (self.op.instance_name,
3084
                                  instance.primary_node))
3085

    
3086
    self.op.os_type = getattr(self.op, "os_type", None)
3087
    if self.op.os_type is not None:
3088
      # OS verification
3089
      pnode = self.cfg.GetNodeInfo(
3090
        self.cfg.ExpandNodeName(instance.primary_node))
3091
      if pnode is None:
3092
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3093
                                   self.op.pnode)
3094
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3095
      result.Raise()
3096
      if not isinstance(result.data, objects.OS):
3097
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3098
                                   " primary node"  % self.op.os_type)
3099

    
3100
    self.instance = instance
3101

    
3102
  def Exec(self, feedback_fn):
3103
    """Reinstall the instance.
3104

3105
    """
3106
    inst = self.instance
3107

    
3108
    if self.op.os_type is not None:
3109
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3110
      inst.os = self.op.os_type
3111
      self.cfg.Update(inst)
3112

    
3113
    _StartInstanceDisks(self, inst, None)
3114
    try:
3115
      feedback_fn("Running the instance OS create scripts...")
3116
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3117
      msg = result.RemoteFailMsg()
3118
      if msg:
3119
        raise errors.OpExecError("Could not install OS for instance %s"
3120
                                 " on node %s: %s" %
3121
                                 (inst.name, inst.primary_node, msg))
3122
    finally:
3123
      _ShutdownInstanceDisks(self, inst)
3124

    
3125

    
3126
class LURenameInstance(LogicalUnit):
3127
  """Rename an instance.
3128

3129
  """
3130
  HPATH = "instance-rename"
3131
  HTYPE = constants.HTYPE_INSTANCE
3132
  _OP_REQP = ["instance_name", "new_name"]
3133

    
3134
  def BuildHooksEnv(self):
3135
    """Build hooks env.
3136

3137
    This runs on master, primary and secondary nodes of the instance.
3138

3139
    """
3140
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3141
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3142
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3143
    return env, nl, nl
3144

    
3145
  def CheckPrereq(self):
3146
    """Check prerequisites.
3147

3148
    This checks that the instance is in the cluster and is not running.
3149

3150
    """
3151
    instance = self.cfg.GetInstanceInfo(
3152
      self.cfg.ExpandInstanceName(self.op.instance_name))
3153
    if instance is None:
3154
      raise errors.OpPrereqError("Instance '%s' not known" %
3155
                                 self.op.instance_name)
3156
    _CheckNodeOnline(self, instance.primary_node)
3157

    
3158
    if instance.admin_up:
3159
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3160
                                 self.op.instance_name)
3161
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3162
                                              instance.name,
3163
                                              instance.hypervisor)
3164
    remote_info.Raise()
3165
    if remote_info.data:
3166
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3167
                                 (self.op.instance_name,
3168
                                  instance.primary_node))
3169
    self.instance = instance
3170

    
3171
    # new name verification
3172
    name_info = utils.HostInfo(self.op.new_name)
3173

    
3174
    self.op.new_name = new_name = name_info.name
3175
    instance_list = self.cfg.GetInstanceList()
3176
    if new_name in instance_list:
3177
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3178
                                 new_name)
3179

    
3180
    if not getattr(self.op, "ignore_ip", False):
3181
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3182
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3183
                                   (name_info.ip, new_name))
3184

    
3185

    
3186
  def Exec(self, feedback_fn):
3187
    """Reinstall the instance.
3188

3189
    """
3190
    inst = self.instance
3191
    old_name = inst.name
3192

    
3193
    if inst.disk_template == constants.DT_FILE:
3194
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3195

    
3196
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3197
    # Change the instance lock. This is definitely safe while we hold the BGL
3198
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3199
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3200

    
3201
    # re-read the instance from the configuration after rename
3202
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3203

    
3204
    if inst.disk_template == constants.DT_FILE:
3205
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3206
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3207
                                                     old_file_storage_dir,
3208
                                                     new_file_storage_dir)
3209
      result.Raise()
3210
      if not result.data:
3211
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3212
                                 " directory '%s' to '%s' (but the instance"
3213
                                 " has been renamed in Ganeti)" % (
3214
                                 inst.primary_node, old_file_storage_dir,
3215
                                 new_file_storage_dir))
3216

    
3217
      if not result.data[0]:
3218
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3219
                                 " (but the instance has been renamed in"
3220
                                 " Ganeti)" % (old_file_storage_dir,
3221
                                               new_file_storage_dir))
3222

    
3223
    _StartInstanceDisks(self, inst, None)
3224
    try:
3225
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3226
                                                 old_name)
3227
      msg = result.RemoteFailMsg()
3228
      if msg:
3229
        msg = ("Could not run OS rename script for instance %s on node %s"
3230
               " (but the instance has been renamed in Ganeti): %s" %
3231
               (inst.name, inst.primary_node, msg))
3232
        self.proc.LogWarning(msg)
3233
    finally:
3234
      _ShutdownInstanceDisks(self, inst)
3235

    
3236

    
3237
class LURemoveInstance(LogicalUnit):
3238
  """Remove an instance.
3239

3240
  """
3241
  HPATH = "instance-remove"
3242
  HTYPE = constants.HTYPE_INSTANCE
3243
  _OP_REQP = ["instance_name", "ignore_failures"]
3244
  REQ_BGL = False
3245

    
3246
  def ExpandNames(self):
3247
    self._ExpandAndLockInstance()
3248
    self.needed_locks[locking.LEVEL_NODE] = []
3249
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3250

    
3251
  def DeclareLocks(self, level):
3252
    if level == locking.LEVEL_NODE:
3253
      self._LockInstancesNodes()
3254

    
3255
  def BuildHooksEnv(self):
3256
    """Build hooks env.
3257

3258
    This runs on master, primary and secondary nodes of the instance.
3259

3260
    """
3261
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3262
    nl = [self.cfg.GetMasterNode()]
3263
    return env, nl, nl
3264

    
3265
  def CheckPrereq(self):
3266
    """Check prerequisites.
3267

3268
    This checks that the instance is in the cluster.
3269

3270
    """
3271
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3272
    assert self.instance is not None, \
3273
      "Cannot retrieve locked instance %s" % self.op.instance_name
3274

    
3275
  def Exec(self, feedback_fn):
3276
    """Remove the instance.
3277

3278
    """
3279
    instance = self.instance
3280
    logging.info("Shutting down instance %s on node %s",
3281
                 instance.name, instance.primary_node)
3282

    
3283
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3284
    msg = result.RemoteFailMsg()
3285
    if msg:
3286
      if self.op.ignore_failures:
3287
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3288
      else:
3289
        raise errors.OpExecError("Could not shutdown instance %s on"
3290
                                 " node %s: %s" %
3291
                                 (instance.name, instance.primary_node, msg))
3292

    
3293
    logging.info("Removing block devices for instance %s", instance.name)
3294

    
3295
    if not _RemoveDisks(self, instance):
3296
      if self.op.ignore_failures:
3297
        feedback_fn("Warning: can't remove instance's disks")
3298
      else:
3299
        raise errors.OpExecError("Can't remove instance's disks")
3300

    
3301
    logging.info("Removing instance %s out of cluster config", instance.name)
3302

    
3303
    self.cfg.RemoveInstance(instance.name)
3304
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3305

    
3306

    
3307
class LUQueryInstances(NoHooksLU):
3308
  """Logical unit for querying instances.
3309

3310
  """
3311
  _OP_REQP = ["output_fields", "names", "use_locking"]
3312
  REQ_BGL = False
3313
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3314
                                    "admin_state",
3315
                                    "disk_template", "ip", "mac", "bridge",
3316
                                    "sda_size", "sdb_size", "vcpus", "tags",
3317
                                    "network_port", "beparams",
3318
                                    r"(disk)\.(size)/([0-9]+)",
3319
                                    r"(disk)\.(sizes)", "disk_usage",
3320
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3321
                                    r"(nic)\.(macs|ips|bridges)",
3322
                                    r"(disk|nic)\.(count)",
3323
                                    "serial_no", "hypervisor", "hvparams",] +
3324
                                  ["hv/%s" % name
3325
                                   for name in constants.HVS_PARAMETERS] +
3326
                                  ["be/%s" % name
3327
                                   for name in constants.BES_PARAMETERS])
3328
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3329

    
3330

    
3331
  def ExpandNames(self):
3332
    _CheckOutputFields(static=self._FIELDS_STATIC,
3333
                       dynamic=self._FIELDS_DYNAMIC,
3334
                       selected=self.op.output_fields)
3335

    
3336
    self.needed_locks = {}
3337
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3338
    self.share_locks[locking.LEVEL_NODE] = 1
3339

    
3340
    if self.op.names:
3341
      self.wanted = _GetWantedInstances(self, self.op.names)
3342
    else:
3343
      self.wanted = locking.ALL_SET
3344

    
3345
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3346
    self.do_locking = self.do_node_query and self.op.use_locking
3347
    if self.do_locking:
3348
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3349
      self.needed_locks[locking.LEVEL_NODE] = []
3350
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3351

    
3352
  def DeclareLocks(self, level):
3353
    if level == locking.LEVEL_NODE and self.do_locking:
3354
      self._LockInstancesNodes()
3355

    
3356
  def CheckPrereq(self):
3357
    """Check prerequisites.
3358

3359
    """
3360
    pass
3361

    
3362
  def Exec(self, feedback_fn):
3363
    """Computes the list of nodes and their attributes.
3364

3365
    """
3366
    all_info = self.cfg.GetAllInstancesInfo()
3367
    if self.wanted == locking.ALL_SET:
3368
      # caller didn't specify instance names, so ordering is not important
3369
      if self.do_locking:
3370
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3371
      else:
3372
        instance_names = all_info.keys()
3373
      instance_names = utils.NiceSort(instance_names)
3374
    else:
3375
      # caller did specify names, so we must keep the ordering
3376
      if self.do_locking:
3377
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3378
      else:
3379
        tgt_set = all_info.keys()
3380
      missing = set(self.wanted).difference(tgt_set)
3381
      if missing:
3382
        raise errors.OpExecError("Some instances were removed before"
3383
                                 " retrieving their data: %s" % missing)
3384
      instance_names = self.wanted
3385

    
3386
    instance_list = [all_info[iname] for iname in instance_names]
3387

    
3388
    # begin data gathering
3389

    
3390
    nodes = frozenset([inst.primary_node for inst in instance_list])
3391
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3392

    
3393
    bad_nodes = []
3394
    off_nodes = []
3395
    if self.do_node_query:
3396
      live_data = {}
3397
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3398
      for name in nodes:
3399
        result = node_data[name]
3400
        if result.offline:
3401
          # offline nodes will be in both lists
3402
          off_nodes.append(name)
3403
        if result.failed:
3404
          bad_nodes.append(name)
3405
        else:
3406
          if result.data:
3407
            live_data.update(result.data)
3408
            # else no instance is alive
3409
    else:
3410
      live_data = dict([(name, {}) for name in instance_names])
3411

    
3412
    # end data gathering
3413

    
3414
    HVPREFIX = "hv/"
3415
    BEPREFIX = "be/"
3416
    output = []
3417
    for instance in instance_list:
3418
      iout = []
3419
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3420
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3421
      for field in self.op.output_fields:
3422
        st_match = self._FIELDS_STATIC.Matches(field)
3423
        if field == "name":
3424
          val = instance.name
3425
        elif field == "os":
3426
          val = instance.os
3427
        elif field == "pnode":
3428
          val = instance.primary_node
3429
        elif field == "snodes":
3430
          val = list(instance.secondary_nodes)
3431
        elif field == "admin_state":
3432
          val = instance.admin_up
3433
        elif field == "oper_state":
3434
          if instance.primary_node in bad_nodes:
3435
            val = None
3436
          else:
3437
            val = bool(live_data.get(instance.name))
3438
        elif field == "status":
3439
          if instance.primary_node in off_nodes:
3440
            val = "ERROR_nodeoffline"
3441
          elif instance.primary_node in bad_nodes:
3442
            val = "ERROR_nodedown"
3443
          else:
3444
            running = bool(live_data.get(instance.name))
3445
            if running:
3446
              if instance.admin_up:
3447
                val = "running"
3448
              else:
3449
                val = "ERROR_up"
3450
            else:
3451
              if instance.admin_up:
3452
                val = "ERROR_down"
3453
              else:
3454
                val = "ADMIN_down"
3455
        elif field == "oper_ram":
3456
          if instance.primary_node in bad_nodes:
3457
            val = None
3458
          elif instance.name in live_data:
3459
            val = live_data[instance.name].get("memory", "?")
3460
          else:
3461
            val = "-"
3462
        elif field == "vcpus":
3463
          val = i_be[constants.BE_VCPUS]
3464
        elif field == "disk_template":
3465
          val = instance.disk_template
3466
        elif field == "ip":
3467
          if instance.nics:
3468
            val = instance.nics[0].ip
3469
          else:
3470
            val = None
3471
        elif field == "bridge":
3472
          if instance.nics:
3473
            val = instance.nics[0].bridge
3474
          else:
3475
            val = None
3476
        elif field == "mac":
3477
          if instance.nics:
3478
            val = instance.nics[0].mac
3479
          else:
3480
            val = None
3481
        elif field == "sda_size" or field == "sdb_size":
3482
          idx = ord(field[2]) - ord('a')
3483
          try:
3484
            val = instance.FindDisk(idx).size
3485
          except errors.OpPrereqError:
3486
            val = None
3487
        elif field == "disk_usage": # total disk usage per node
3488
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3489
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3490
        elif field == "tags":
3491
          val = list(instance.GetTags())
3492
        elif field == "serial_no":
3493
          val = instance.serial_no
3494
        elif field == "network_port":
3495
          val = instance.network_port
3496
        elif field == "hypervisor":
3497
          val = instance.hypervisor
3498
        elif field == "hvparams":
3499
          val = i_hv
3500
        elif (field.startswith(HVPREFIX) and
3501
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3502
          val = i_hv.get(field[len(HVPREFIX):], None)
3503
        elif field == "beparams":
3504
          val = i_be
3505
        elif (field.startswith(BEPREFIX) and
3506
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3507
          val = i_be.get(field[len(BEPREFIX):], None)
3508
        elif st_match and st_match.groups():
3509
          # matches a variable list
3510
          st_groups = st_match.groups()
3511
          if st_groups and st_groups[0] == "disk":
3512
            if st_groups[1] == "count":
3513
              val = len(instance.disks)
3514
            elif st_groups[1] == "sizes":
3515
              val = [disk.size for disk in instance.disks]
3516
            elif st_groups[1] == "size":
3517
              try:
3518
                val = instance.FindDisk(st_groups[2]).size
3519
              except errors.OpPrereqError:
3520
                val = None
3521
            else:
3522
              assert False, "Unhandled disk parameter"
3523
          elif st_groups[0] == "nic":
3524
            if st_groups[1] == "count":
3525
              val = len(instance.nics)
3526
            elif st_groups[1] == "macs":
3527
              val = [nic.mac for nic in instance.nics]
3528
            elif st_groups[1] == "ips":
3529
              val = [nic.ip for nic in instance.nics]
3530
            elif st_groups[1] == "bridges":
3531
              val = [nic.bridge for nic in instance.nics]
3532
            else:
3533
              # index-based item
3534
              nic_idx = int(st_groups[2])
3535
              if nic_idx >= len(instance.nics):
3536
                val = None
3537
              else:
3538
                if st_groups[1] == "mac":
3539
                  val = instance.nics[nic_idx].mac
3540
                elif st_groups[1] == "ip":
3541
                  val = instance.nics[nic_idx].ip
3542
                elif st_groups[1] == "bridge":
3543
                  val = instance.nics[nic_idx].bridge
3544
                else:
3545
                  assert False, "Unhandled NIC parameter"
3546
          else:
3547
            assert False, ("Declared but unhandled variable parameter '%s'" %
3548
                           field)
3549
        else:
3550
          assert False, "Declared but unhandled parameter '%s'" % field
3551
        iout.append(val)
3552
      output.append(iout)
3553

    
3554
    return output
3555

    
3556

    
3557
class LUFailoverInstance(LogicalUnit):
3558
  """Failover an instance.
3559

3560
  """
3561
  HPATH = "instance-failover"
3562
  HTYPE = constants.HTYPE_INSTANCE
3563
  _OP_REQP = ["instance_name", "ignore_consistency"]
3564
  REQ_BGL = False
3565

    
3566
  def ExpandNames(self):
3567
    self._ExpandAndLockInstance()
3568
    self.needed_locks[locking.LEVEL_NODE] = []
3569
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3570

    
3571
  def DeclareLocks(self, level):
3572
    if level == locking.LEVEL_NODE:
3573
      self._LockInstancesNodes()
3574

    
3575
  def BuildHooksEnv(self):
3576
    """Build hooks env.
3577

3578
    This runs on master, primary and secondary nodes of the instance.
3579

3580
    """
3581
    env = {
3582
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3583
      }
3584
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3585
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3586
    return env, nl, nl
3587

    
3588
  def CheckPrereq(self):
3589
    """Check prerequisites.
3590

3591
    This checks that the instance is in the cluster.
3592

3593
    """
3594
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3595
    assert self.instance is not None, \
3596
      "Cannot retrieve locked instance %s" % self.op.instance_name
3597

    
3598
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3599
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3600
      raise errors.OpPrereqError("Instance's disk layout is not"
3601
                                 " network mirrored, cannot failover.")
3602

    
3603
    secondary_nodes = instance.secondary_nodes
3604
    if not secondary_nodes:
3605
      raise errors.ProgrammerError("no secondary node but using "
3606
                                   "a mirrored disk template")
3607

    
3608
    target_node = secondary_nodes[0]
3609
    _CheckNodeOnline(self, target_node)
3610
    _CheckNodeNotDrained(self, target_node)
3611

    
3612
    if instance.admin_up:
3613
      # check memory requirements on the secondary node
3614
      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3615
                           instance.name, bep[constants.BE_MEMORY],
3616
                           instance.hypervisor)
3617
    else:
3618
      self.LogInfo("Not checking memory on the secondary node as"
3619
                   " instance will not be started")
3620

    
3621
    # check bridge existence
3622
    brlist = [nic.bridge for nic in instance.nics]
3623
    result = self.rpc.call_bridges_exist(target_node, brlist)
3624
    result.Raise()
3625
    if not result.data:
3626
      raise errors.OpPrereqError("One or more target bridges %s does not"
3627
                                 " exist on destination node '%s'" %
3628
                                 (brlist, target_node))
3629

    
3630
  def Exec(self, feedback_fn):
3631
    """Failover an instance.
3632

3633
    The failover is done by shutting it down on its present node and
3634
    starting it on the secondary.
3635

3636
    """
3637
    instance = self.instance
3638

    
3639
    source_node = instance.primary_node
3640
    target_node = instance.secondary_nodes[0]
3641

    
3642
    feedback_fn("* checking disk consistency between source and target")
3643
    for dev in instance.disks:
3644
      # for drbd, these are drbd over lvm
3645
      if not _CheckDiskConsistency(self, dev, target_node, False):
3646
        if instance.admin_up and not self.op.ignore_consistency:
3647
          raise errors.OpExecError("Disk %s is degraded on target node,"
3648
                                   " aborting failover." % dev.iv_name)
3649

    
3650
    feedback_fn("* shutting down instance on source node")
3651
    logging.info("Shutting down instance %s on node %s",
3652
                 instance.name, source_node)
3653

    
3654
    result = self.rpc.call_instance_shutdown(source_node, instance)
3655
    msg = result.RemoteFailMsg()
3656
    if msg:
3657
      if self.op.ignore_consistency:
3658
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3659
                             " Proceeding anyway. Please make sure node"
3660
                             " %s is down. Error details: %s",
3661
                             instance.name, source_node, source_node, msg)
3662
      else:
3663
        raise errors.OpExecError("Could not shutdown instance %s on"
3664
                                 " node %s: %s" %
3665
                                 (instance.name, source_node, msg))
3666

    
3667
    feedback_fn("* deactivating the instance's disks on source node")
3668
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3669
      raise errors.OpExecError("Can't shut down the instance's disks.")
3670

    
3671
    instance.primary_node = target_node
3672
    # distribute new instance config to the other nodes
3673
    self.cfg.Update(instance)
3674

    
3675
    # Only start the instance if it's marked as up
3676
    if instance.admin_up:
3677
      feedback_fn("* activating the instance's disks on target node")
3678
      logging.info("Starting instance %s on node %s",
3679
                   instance.name, target_node)
3680

    
3681
      disks_ok, _ = _AssembleInstanceDisks(self, instance,
3682
                                               ignore_secondaries=True)
3683
      if not disks_ok:
3684
        _ShutdownInstanceDisks(self, instance)
3685
        raise errors.OpExecError("Can't activate the instance's disks")
3686

    
3687
      feedback_fn("* starting the instance on the target node")
3688
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3689
      msg = result.RemoteFailMsg()
3690
      if msg:
3691
        _ShutdownInstanceDisks(self, instance)
3692
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3693
                                 (instance.name, target_node, msg))
3694

    
3695

    
3696
class LUMigrateInstance(LogicalUnit):
3697
  """Migrate an instance.
3698

3699
  This is migration without shutting down, compared to the failover,
3700
  which is done with shutdown.
3701

3702
  """
3703
  HPATH = "instance-migrate"
3704
  HTYPE = constants.HTYPE_INSTANCE
3705
  _OP_REQP = ["instance_name", "live", "cleanup"]
3706

    
3707
  REQ_BGL = False
3708

    
3709
  def ExpandNames(self):
3710
    self._ExpandAndLockInstance()
3711
    self.needed_locks[locking.LEVEL_NODE] = []
3712
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3713

    
3714
  def DeclareLocks(self, level):
3715
    if level == locking.LEVEL_NODE:
3716
      self._LockInstancesNodes()
3717

    
3718
  def BuildHooksEnv(self):
3719
    """Build hooks env.
3720

3721
    This runs on master, primary and secondary nodes of the instance.
3722

3723
    """
3724
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3725
    env["MIGRATE_LIVE"] = self.op.live
3726
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3727
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3728
    return env, nl, nl
3729

    
3730
  def CheckPrereq(self):
3731
    """Check prerequisites.
3732

3733
    This checks that the instance is in the cluster.
3734

3735
    """
3736
    instance = self.cfg.GetInstanceInfo(
3737
      self.cfg.ExpandInstanceName(self.op.instance_name))
3738
    if instance is None:
3739
      raise errors.OpPrereqError("Instance '%s' not known" %
3740
                                 self.op.instance_name)
3741

    
3742
    if instance.disk_template != constants.DT_DRBD8:
3743
      raise errors.OpPrereqError("Instance's disk layout is not"
3744
                                 " drbd8, cannot migrate.")
3745

    
3746
    secondary_nodes = instance.secondary_nodes
3747
    if not secondary_nodes:
3748
      raise errors.ConfigurationError("No secondary node but using"
3749
                                      " drbd8 disk template")
3750

    
3751
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3752

    
3753
    target_node = secondary_nodes[0]
3754
    # check memory requirements on the secondary node
3755
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3756
                         instance.name, i_be[constants.BE_MEMORY],
3757
                         instance.hypervisor)
3758

    
3759
    # check bridge existence
3760
    brlist = [nic.bridge for nic in instance.nics]
3761
    result = self.rpc.call_bridges_exist(target_node, brlist)
3762
    if result.failed or not result.data:
3763
      raise errors.OpPrereqError("One or more target bridges %s does not"
3764
                                 " exist on destination node '%s'" %
3765
                                 (brlist, target_node))
3766

    
3767
    if not self.op.cleanup:
3768
      _CheckNodeNotDrained(self, target_node)
3769
      result = self.rpc.call_instance_migratable(instance.primary_node,
3770
                                                 instance)
3771
      msg = result.RemoteFailMsg()
3772
      if msg:
3773
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3774
                                   msg)
3775

    
3776
    self.instance = instance
3777

    
3778
  def _WaitUntilSync(self):
3779
    """Poll with custom rpc for disk sync.
3780

3781
    This uses our own step-based rpc call.
3782

3783
    """
3784
    self.feedback_fn("* wait until resync is done")
3785
    all_done = False
3786
    while not all_done:
3787
      all_done = True
3788
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3789
                                            self.nodes_ip,
3790
                                            self.instance.disks)
3791
      min_percent = 100
3792
      for node, nres in result.items():
3793
        msg = nres.RemoteFailMsg()
3794
        if msg:
3795
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3796
                                   (node, msg))
3797
        node_done, node_percent = nres.payload
3798
        all_done = all_done and node_done
3799
        if node_percent is not None:
3800
          min_percent = min(min_percent, node_percent)
3801
      if not all_done:
3802
        if min_percent < 100:
3803
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3804
        time.sleep(2)
3805

    
3806
  def _EnsureSecondary(self, node):
3807
    """Demote a node to secondary.
3808

3809
    """
3810
    self.feedback_fn("* switching node %s to secondary mode" % node)
3811

    
3812
    for dev in self.instance.disks:
3813
      self.cfg.SetDiskID(dev, node)
3814

    
3815
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3816
                                          self.instance.disks)
3817
    msg = result.RemoteFailMsg()
3818
    if msg:
3819
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3820
                               " error %s" % (node, msg))
3821

    
3822
  def _GoStandalone(self):
3823
    """Disconnect from the network.
3824

3825
    """
3826
    self.feedback_fn("* changing into standalone mode")
3827
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3828
                                               self.instance.disks)
3829
    for node, nres in result.items():
3830
      msg = nres.RemoteFailMsg()
3831
      if msg:
3832
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3833
                                 " error %s" % (node, msg))
3834

    
3835
  def _GoReconnect(self, multimaster):
3836
    """Reconnect to the network.
3837

3838
    """
3839
    if multimaster:
3840
      msg = "dual-master"
3841
    else:
3842
      msg = "single-master"
3843
    self.feedback_fn("* changing disks into %s mode" % msg)
3844
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3845
                                           self.instance.disks,
3846
                                           self.instance.name, multimaster)
3847
    for node, nres in result.items():
3848
      msg = nres.RemoteFailMsg()
3849
      if msg:
3850
        raise errors.OpExecError("Cannot change disks config on node %s,"
3851
                                 " error: %s" % (node, msg))
3852

    
3853
  def _ExecCleanup(self):
3854
    """Try to cleanup after a failed migration.
3855

3856
    The cleanup is done by:
3857
      - check that the instance is running only on one node
3858
        (and update the config if needed)
3859
      - change disks on its secondary node to secondary
3860
      - wait until disks are fully synchronized
3861
      - disconnect from the network
3862
      - change disks into single-master mode
3863
      - wait again until disks are fully synchronized
3864

3865
    """
3866
    instance = self.instance
3867
    target_node = self.target_node
3868
    source_node = self.source_node
3869

    
3870
    # check running on only one node
3871
    self.feedback_fn("* checking where the instance actually runs"
3872
                     " (if this hangs, the hypervisor might be in"
3873
                     " a bad state)")
3874
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3875
    for node, result in ins_l.items():
3876
      result.Raise()
3877
      if not isinstance(result.data, list):
3878
        raise errors.OpExecError("Can't contact node '%s'" % node)
3879

    
3880
    runningon_source = instance.name in ins_l[source_node].data
3881
    runningon_target = instance.name in ins_l[target_node].data
3882

    
3883
    if runningon_source and runningon_target:
3884
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3885
                               " or the hypervisor is confused. You will have"
3886
                               " to ensure manually that it runs only on one"
3887
                               " and restart this operation.")
3888

    
3889
    if not (runningon_source or runningon_target):
3890
      raise errors.OpExecError("Instance does not seem to be running at all."
3891
                               " In this case, it's safer to repair by"
3892
                               " running 'gnt-instance stop' to ensure disk"
3893
                               " shutdown, and then restarting it.")
3894

    
3895
    if runningon_target:
3896
      # the migration has actually succeeded, we need to update the config
3897
      self.feedback_fn("* instance running on secondary node (%s),"
3898
                       " updating config" % target_node)
3899
      instance.primary_node = target_node
3900
      self.cfg.Update(instance)
3901
      demoted_node = source_node
3902
    else:
3903
      self.feedback_fn("* instance confirmed to be running on its"
3904
                       " primary node (%s)" % source_node)
3905
      demoted_node = target_node
3906

    
3907
    self._EnsureSecondary(demoted_node)
3908
    try:
3909
      self._WaitUntilSync()
3910
    except errors.OpExecError:
3911
      # we ignore here errors, since if the device is standalone, it
3912
      # won't be able to sync
3913
      pass
3914
    self._GoStandalone()
3915
    self._GoReconnect(False)
3916
    self._WaitUntilSync()
3917

    
3918
    self.feedback_fn("* done")
3919

    
3920
  def _RevertDiskStatus(self):
3921
    """Try to revert the disk status after a failed migration.
3922

3923
    """
3924
    target_node = self.target_node
3925
    try:
3926
      self._EnsureSecondary(target_node)
3927
      self._GoStandalone()
3928
      self._GoReconnect(False)
3929
      self._WaitUntilSync()
3930
    except errors.OpExecError, err:
3931
      self.LogWarning("Migration failed and I can't reconnect the"
3932
                      " drives: error '%s'\n"
3933
                      "Please look and recover the instance status" %
3934
                      str(err))
3935

    
3936
  def _AbortMigration(self):
3937
    """Call the hypervisor code to abort a started migration.
3938

3939
    """
3940
    instance = self.instance
3941
    target_node = self.target_node
3942
    migration_info = self.migration_info
3943

    
3944
    abort_result = self.rpc.call_finalize_migration(target_node,
3945
                                                    instance,
3946
                                                    migration_info,
3947
                                                    False)
3948
    abort_msg = abort_result.RemoteFailMsg()
3949
    if abort_msg:
3950
      logging.error("Aborting migration failed on target node %s: %s" %
3951
                    (target_node, abort_msg))
3952
      # Don't raise an exception here, as we stil have to try to revert the
3953
      # disk status, even if this step failed.
3954

    
3955
  def _ExecMigration(self):
3956
    """Migrate an instance.
3957

3958
    The migrate is done by:
3959
      - change the disks into dual-master mode
3960
      - wait until disks are fully synchronized again
3961
      - migrate the instance
3962
      - change disks on the new secondary node (the old primary) to secondary
3963
      - wait until disks are fully synchronized
3964
      - change disks into single-master mode
3965

3966
    """
3967
    instance = self.instance
3968
    target_node = self.target_node
3969
    source_node = self.source_node
3970

    
3971
    self.feedback_fn("* checking disk consistency between source and target")
3972
    for dev in instance.disks:
3973
      if not _CheckDiskConsistency(self, dev, target_node, False):
3974
        raise errors.OpExecError("Disk %s is degraded or not fully"
3975
                                 " synchronized on target node,"
3976
                                 " aborting migrate." % dev.iv_name)
3977

    
3978
    # First get the migration information from the remote node
3979
    result = self.rpc.call_migration_info(source_node, instance)
3980
    msg = result.RemoteFailMsg()
3981
    if msg:
3982
      log_err = ("Failed fetching source migration information from %s: %s" %
3983
                 (source_node, msg))
3984
      logging.error(log_err)
3985
      raise errors.OpExecError(log_err)
3986

    
3987
    self.migration_info = migration_info = result.payload
3988

    
3989
    # Then switch the disks to master/master mode
3990
    self._EnsureSecondary(target_node)
3991
    self._GoStandalone()
3992
    self._GoReconnect(True)
3993
    self._WaitUntilSync()
3994

    
3995
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3996
    result = self.rpc.call_accept_instance(target_node,
3997
                                           instance,
3998
                                           migration_info,
3999
                                           self.nodes_ip[target_node])
4000

    
4001
    msg = result.RemoteFailMsg()
4002
    if msg:
4003