Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ abae1b2b

History | View | Annotate | Download (262.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq (except when tasklets are used)
51
    - implement Exec (except when tasklets are used)
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  @ivar dry_run_result: the value (if any) that will be returned to the caller
60
      in dry-run mode (signalled by opcode dry_run parameter)
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overridden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92
    self.LogStep = processor.LogStep
93
    # support for dry-run
94
    self.dry_run_result = None
95

    
96
    # Tasklets
97
    self.tasklets = None
98

    
99
    for attr_name in self._OP_REQP:
100
      attr_val = getattr(op, attr_name, None)
101
      if attr_val is None:
102
        raise errors.OpPrereqError("Required parameter '%s' missing" %
103
                                   attr_name)
104

    
105
    self.CheckArguments()
106

    
107
  def __GetSSH(self):
108
    """Returns the SshRunner object
109

110
    """
111
    if not self.__ssh:
112
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113
    return self.__ssh
114

    
115
  ssh = property(fget=__GetSSH)
116

    
117
  def CheckArguments(self):
118
    """Check syntactic validity for the opcode arguments.
119

120
    This method is for doing a simple syntactic check and ensure
121
    validity of opcode parameters, without any cluster-related
122
    checks. While the same can be accomplished in ExpandNames and/or
123
    CheckPrereq, doing these separate is better because:
124

125
      - ExpandNames is left as as purely a lock-related function
126
      - CheckPrereq is run after we have acquired locks (and possible
127
        waited for them)
128

129
    The function is allowed to change the self.op attribute so that
130
    later methods can no longer worry about missing parameters.
131

132
    """
133
    pass
134

    
135
  def ExpandNames(self):
136
    """Expand names for this LU.
137

138
    This method is called before starting to execute the opcode, and it should
139
    update all the parameters of the opcode to their canonical form (e.g. a
140
    short node name must be fully expanded after this method has successfully
141
    completed). This way locking, hooks, logging, ecc. can work correctly.
142

143
    LUs which implement this method must also populate the self.needed_locks
144
    member, as a dict with lock levels as keys, and a list of needed lock names
145
    as values. Rules:
146

147
      - use an empty dict if you don't need any lock
148
      - if you don't need any lock at a particular level omit that level
149
      - don't put anything for the BGL level
150
      - if you want all locks at a level use locking.ALL_SET as a value
151

152
    If you need to share locks (rather than acquire them exclusively) at one
153
    level you can modify self.share_locks, setting a true value (usually 1) for
154
    that level. By default locks are not shared.
155

156
    This function can also define a list of tasklets, which then will be
157
    executed in order instead of the usual LU-level CheckPrereq and Exec
158
    functions, if those are not defined by the LU.
159

160
    Examples::
161

162
      # Acquire all nodes and one instance
163
      self.needed_locks = {
164
        locking.LEVEL_NODE: locking.ALL_SET,
165
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
166
      }
167
      # Acquire just two nodes
168
      self.needed_locks = {
169
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
170
      }
171
      # Acquire no locks
172
      self.needed_locks = {} # No, you can't leave it to the default value None
173

174
    """
175
    # The implementation of this method is mandatory only if the new LU is
176
    # concurrent, so that old LUs don't need to be changed all at the same
177
    # time.
178
    if self.REQ_BGL:
179
      self.needed_locks = {} # Exclusive LUs don't need locks.
180
    else:
181
      raise NotImplementedError
182

    
183
  def DeclareLocks(self, level):
184
    """Declare LU locking needs for a level
185

186
    While most LUs can just declare their locking needs at ExpandNames time,
187
    sometimes there's the need to calculate some locks after having acquired
188
    the ones before. This function is called just before acquiring locks at a
189
    particular level, but after acquiring the ones at lower levels, and permits
190
    such calculations. It can be used to modify self.needed_locks, and by
191
    default it does nothing.
192

193
    This function is only called if you have something already set in
194
    self.needed_locks for the level.
195

196
    @param level: Locking level which is going to be locked
197
    @type level: member of ganeti.locking.LEVELS
198

199
    """
200

    
201
  def CheckPrereq(self):
202
    """Check prerequisites for this LU.
203

204
    This method should check that the prerequisites for the execution
205
    of this LU are fulfilled. It can do internode communication, but
206
    it should be idempotent - no cluster or system changes are
207
    allowed.
208

209
    The method should raise errors.OpPrereqError in case something is
210
    not fulfilled. Its return value is ignored.
211

212
    This method should also update all the parameters of the opcode to
213
    their canonical form if it hasn't been done by ExpandNames before.
214

215
    """
216
    if self.tasklets is not None:
217
      for (idx, tl) in enumerate(self.tasklets):
218
        logging.debug("Checking prerequisites for tasklet %s/%s",
219
                      idx + 1, len(self.tasklets))
220
        tl.CheckPrereq()
221
    else:
222
      raise NotImplementedError
223

    
224
  def Exec(self, feedback_fn):
225
    """Execute the LU.
226

227
    This method should implement the actual work. It should raise
228
    errors.OpExecError for failures that are somewhat dealt with in
229
    code, or expected.
230

231
    """
232
    if self.tasklets is not None:
233
      for (idx, tl) in enumerate(self.tasklets):
234
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
235
        tl.Exec(feedback_fn)
236
    else:
237
      raise NotImplementedError
238

    
239
  def BuildHooksEnv(self):
240
    """Build hooks environment for this LU.
241

242
    This method should return a three-node tuple consisting of: a dict
243
    containing the environment that will be used for running the
244
    specific hook for this LU, a list of node names on which the hook
245
    should run before the execution, and a list of node names on which
246
    the hook should run after the execution.
247

248
    The keys of the dict must not have 'GANETI_' prefixed as this will
249
    be handled in the hooks runner. Also note additional keys will be
250
    added by the hooks runner. If the LU doesn't define any
251
    environment, an empty dict (and not None) should be returned.
252

253
    No nodes should be returned as an empty list (and not None).
254

255
    Note that if the HPATH for a LU class is None, this function will
256
    not be called.
257

258
    """
259
    raise NotImplementedError
260

    
261
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262
    """Notify the LU about the results of its hooks.
263

264
    This method is called every time a hooks phase is executed, and notifies
265
    the Logical Unit about the hooks' result. The LU can then use it to alter
266
    its result based on the hooks.  By default the method does nothing and the
267
    previous result is passed back unchanged but any LU can define it if it
268
    wants to use the local cluster hook-scripts somehow.
269

270
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
271
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272
    @param hook_results: the results of the multi-node hooks rpc call
273
    @param feedback_fn: function used send feedback back to the caller
274
    @param lu_result: the previous Exec result this LU had, or None
275
        in the PRE phase
276
    @return: the new Exec result, based on the previous result
277
        and hook results
278

279
    """
280
    return lu_result
281

    
282
  def _ExpandAndLockInstance(self):
283
    """Helper function to expand and lock an instance.
284

285
    Many LUs that work on an instance take its name in self.op.instance_name
286
    and need to expand it and then declare the expanded name for locking. This
287
    function does it, and then updates self.op.instance_name to the expanded
288
    name. It also initializes needed_locks as a dict, if this hasn't been done
289
    before.
290

291
    """
292
    if self.needed_locks is None:
293
      self.needed_locks = {}
294
    else:
295
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296
        "_ExpandAndLockInstance called with instance-level locks set"
297
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298
    if expanded_name is None:
299
      raise errors.OpPrereqError("Instance '%s' not known" %
300
                                  self.op.instance_name)
301
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302
    self.op.instance_name = expanded_name
303

    
304
  def _LockInstancesNodes(self, primary_only=False):
305
    """Helper function to declare instances' nodes for locking.
306

307
    This function should be called after locking one or more instances to lock
308
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309
    with all primary or secondary nodes for instances already locked and
310
    present in self.needed_locks[locking.LEVEL_INSTANCE].
311

312
    It should be called from DeclareLocks, and for safety only works if
313
    self.recalculate_locks[locking.LEVEL_NODE] is set.
314

315
    In the future it may grow parameters to just lock some instance's nodes, or
316
    to just lock primaries or secondary nodes, if needed.
317

318
    If should be called in DeclareLocks in a way similar to::
319

320
      if level == locking.LEVEL_NODE:
321
        self._LockInstancesNodes()
322

323
    @type primary_only: boolean
324
    @param primary_only: only lock primary nodes of locked instances
325

326
    """
327
    assert locking.LEVEL_NODE in self.recalculate_locks, \
328
      "_LockInstancesNodes helper function called with no nodes to recalculate"
329

    
330
    # TODO: check if we're really been called with the instance locks held
331

    
332
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333
    # future we might want to have different behaviors depending on the value
334
    # of self.recalculate_locks[locking.LEVEL_NODE]
335
    wanted_nodes = []
336
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337
      instance = self.context.cfg.GetInstanceInfo(instance_name)
338
      wanted_nodes.append(instance.primary_node)
339
      if not primary_only:
340
        wanted_nodes.extend(instance.secondary_nodes)
341

    
342
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
346

    
347
    del self.recalculate_locks[locking.LEVEL_NODE]
348

    
349

    
350
class NoHooksLU(LogicalUnit):
351
  """Simple LU which runs no hooks.
352

353
  This LU is intended as a parent for other LogicalUnits which will
354
  run no hooks, in order to reduce duplicate code.
355

356
  """
357
  HPATH = None
358
  HTYPE = None
359

    
360

    
361
class Tasklet:
362
  """Tasklet base class.
363

364
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
366
  tasklets know nothing about locks.
367

368
  Subclasses must follow these rules:
369
    - Implement CheckPrereq
370
    - Implement Exec
371

372
  """
373
  def __init__(self, lu):
374
    self.lu = lu
375

    
376
    # Shortcuts
377
    self.cfg = lu.cfg
378
    self.rpc = lu.rpc
379

    
380
  def CheckPrereq(self):
381
    """Check prerequisites for this tasklets.
382

383
    This method should check whether the prerequisites for the execution of
384
    this tasklet are fulfilled. It can do internode communication, but it
385
    should be idempotent - no cluster or system changes are allowed.
386

387
    The method should raise errors.OpPrereqError in case something is not
388
    fulfilled. Its return value is ignored.
389

390
    This method should also update all parameters to their canonical form if it
391
    hasn't been done before.
392

393
    """
394
    raise NotImplementedError
395

    
396
  def Exec(self, feedback_fn):
397
    """Execute the tasklet.
398

399
    This method should implement the actual work. It should raise
400
    errors.OpExecError for failures that are somewhat dealt with in code, or
401
    expected.
402

403
    """
404
    raise NotImplementedError
405

    
406

    
407
def _GetWantedNodes(lu, nodes):
408
  """Returns list of checked and expanded node names.
409

410
  @type lu: L{LogicalUnit}
411
  @param lu: the logical unit on whose behalf we execute
412
  @type nodes: list
413
  @param nodes: list of node names or None for all nodes
414
  @rtype: list
415
  @return: the list of nodes, sorted
416
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
417

418
  """
419
  if not isinstance(nodes, list):
420
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
421

    
422
  if not nodes:
423
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424
      " non-empty list of nodes whose name is to be expanded.")
425

    
426
  wanted = []
427
  for name in nodes:
428
    node = lu.cfg.ExpandNodeName(name)
429
    if node is None:
430
      raise errors.OpPrereqError("No such node name '%s'" % name)
431
    wanted.append(node)
432

    
433
  return utils.NiceSort(wanted)
434

    
435

    
436
def _GetWantedInstances(lu, instances):
437
  """Returns list of checked and expanded instance names.
438

439
  @type lu: L{LogicalUnit}
440
  @param lu: the logical unit on whose behalf we execute
441
  @type instances: list
442
  @param instances: list of instance names or None for all instances
443
  @rtype: list
444
  @return: the list of instances, sorted
445
  @raise errors.OpPrereqError: if the instances parameter is wrong type
446
  @raise errors.OpPrereqError: if any of the passed instances is not found
447

448
  """
449
  if not isinstance(instances, list):
450
    raise errors.OpPrereqError("Invalid argument type 'instances'")
451

    
452
  if instances:
453
    wanted = []
454

    
455
    for name in instances:
456
      instance = lu.cfg.ExpandInstanceName(name)
457
      if instance is None:
458
        raise errors.OpPrereqError("No such instance name '%s'" % name)
459
      wanted.append(instance)
460

    
461
  else:
462
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
463
  return wanted
464

    
465

    
466
def _CheckOutputFields(static, dynamic, selected):
467
  """Checks whether all selected fields are valid.
468

469
  @type static: L{utils.FieldSet}
470
  @param static: static fields set
471
  @type dynamic: L{utils.FieldSet}
472
  @param dynamic: dynamic fields set
473

474
  """
475
  f = utils.FieldSet()
476
  f.Extend(static)
477
  f.Extend(dynamic)
478

    
479
  delta = f.NonMatching(selected)
480
  if delta:
481
    raise errors.OpPrereqError("Unknown output fields selected: %s"
482
                               % ",".join(delta))
483

    
484

    
485
def _CheckBooleanOpField(op, name):
486
  """Validates boolean opcode parameters.
487

488
  This will ensure that an opcode parameter is either a boolean value,
489
  or None (but that it always exists).
490

491
  """
492
  val = getattr(op, name, None)
493
  if not (val is None or isinstance(val, bool)):
494
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
495
                               (name, str(val)))
496
  setattr(op, name, val)
497

    
498

    
499
def _CheckNodeOnline(lu, node):
500
  """Ensure that a given node is online.
501

502
  @param lu: the LU on behalf of which we make the check
503
  @param node: the node to check
504
  @raise errors.OpPrereqError: if the node is offline
505

506
  """
507
  if lu.cfg.GetNodeInfo(node).offline:
508
    raise errors.OpPrereqError("Can't use offline node %s" % node)
509

    
510

    
511
def _CheckNodeNotDrained(lu, node):
512
  """Ensure that a given node is not drained.
513

514
  @param lu: the LU on behalf of which we make the check
515
  @param node: the node to check
516
  @raise errors.OpPrereqError: if the node is drained
517

518
  """
519
  if lu.cfg.GetNodeInfo(node).drained:
520
    raise errors.OpPrereqError("Can't use drained node %s" % node)
521

    
522

    
523
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524
                          memory, vcpus, nics, disk_template, disks,
525
                          bep, hvp, hypervisor_name):
526
  """Builds instance related env variables for hooks
527

528
  This builds the hook environment from individual variables.
529

530
  @type name: string
531
  @param name: the name of the instance
532
  @type primary_node: string
533
  @param primary_node: the name of the instance's primary node
534
  @type secondary_nodes: list
535
  @param secondary_nodes: list of secondary nodes as strings
536
  @type os_type: string
537
  @param os_type: the name of the instance's OS
538
  @type status: boolean
539
  @param status: the should_run status of the instance
540
  @type memory: string
541
  @param memory: the memory size of the instance
542
  @type vcpus: string
543
  @param vcpus: the count of VCPUs the instance has
544
  @type nics: list
545
  @param nics: list of tuples (ip, mac, mode, link) representing
546
      the NICs the instance has
547
  @type disk_template: string
548
  @param disk_template: the disk template of the instance
549
  @type disks: list
550
  @param disks: the list of (size, mode) pairs
551
  @type bep: dict
552
  @param bep: the backend parameters for the instance
553
  @type hvp: dict
554
  @param hvp: the hypervisor parameters for the instance
555
  @type hypervisor_name: string
556
  @param hypervisor_name: the hypervisor for the instance
557
  @rtype: dict
558
  @return: the hook environment for this instance
559

560
  """
561
  if status:
562
    str_status = "up"
563
  else:
564
    str_status = "down"
565
  env = {
566
    "OP_TARGET": name,
567
    "INSTANCE_NAME": name,
568
    "INSTANCE_PRIMARY": primary_node,
569
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570
    "INSTANCE_OS_TYPE": os_type,
571
    "INSTANCE_STATUS": str_status,
572
    "INSTANCE_MEMORY": memory,
573
    "INSTANCE_VCPUS": vcpus,
574
    "INSTANCE_DISK_TEMPLATE": disk_template,
575
    "INSTANCE_HYPERVISOR": hypervisor_name,
576
  }
577

    
578
  if nics:
579
    nic_count = len(nics)
580
    for idx, (ip, mac, mode, link) in enumerate(nics):
581
      if ip is None:
582
        ip = ""
583
      env["INSTANCE_NIC%d_IP" % idx] = ip
584
      env["INSTANCE_NIC%d_MAC" % idx] = mac
585
      env["INSTANCE_NIC%d_MODE" % idx] = mode
586
      env["INSTANCE_NIC%d_LINK" % idx] = link
587
      if mode == constants.NIC_MODE_BRIDGED:
588
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
589
  else:
590
    nic_count = 0
591

    
592
  env["INSTANCE_NIC_COUNT"] = nic_count
593

    
594
  if disks:
595
    disk_count = len(disks)
596
    for idx, (size, mode) in enumerate(disks):
597
      env["INSTANCE_DISK%d_SIZE" % idx] = size
598
      env["INSTANCE_DISK%d_MODE" % idx] = mode
599
  else:
600
    disk_count = 0
601

    
602
  env["INSTANCE_DISK_COUNT"] = disk_count
603

    
604
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
605
    for key, value in source.items():
606
      env["INSTANCE_%s_%s" % (kind, key)] = value
607

    
608
  return env
609

    
610
def _NICListToTuple(lu, nics):
611
  """Build a list of nic information tuples.
612

613
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
614
  value in LUQueryInstanceData.
615

616
  @type lu:  L{LogicalUnit}
617
  @param lu: the logical unit on whose behalf we execute
618
  @type nics: list of L{objects.NIC}
619
  @param nics: list of nics to convert to hooks tuples
620

621
  """
622
  hooks_nics = []
623
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
624
  for nic in nics:
625
    ip = nic.ip
626
    mac = nic.mac
627
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
628
    mode = filled_params[constants.NIC_MODE]
629
    link = filled_params[constants.NIC_LINK]
630
    hooks_nics.append((ip, mac, mode, link))
631
  return hooks_nics
632

    
633
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
634
  """Builds instance related env variables for hooks from an object.
635

636
  @type lu: L{LogicalUnit}
637
  @param lu: the logical unit on whose behalf we execute
638
  @type instance: L{objects.Instance}
639
  @param instance: the instance for which we should build the
640
      environment
641
  @type override: dict
642
  @param override: dictionary with key/values that will override
643
      our values
644
  @rtype: dict
645
  @return: the hook environment dictionary
646

647
  """
648
  cluster = lu.cfg.GetClusterInfo()
649
  bep = cluster.FillBE(instance)
650
  hvp = cluster.FillHV(instance)
651
  args = {
652
    'name': instance.name,
653
    'primary_node': instance.primary_node,
654
    'secondary_nodes': instance.secondary_nodes,
655
    'os_type': instance.os,
656
    'status': instance.admin_up,
657
    'memory': bep[constants.BE_MEMORY],
658
    'vcpus': bep[constants.BE_VCPUS],
659
    'nics': _NICListToTuple(lu, instance.nics),
660
    'disk_template': instance.disk_template,
661
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
662
    'bep': bep,
663
    'hvp': hvp,
664
    'hypervisor_name': instance.hypervisor,
665
  }
666
  if override:
667
    args.update(override)
668
  return _BuildInstanceHookEnv(**args)
669

    
670

    
671
def _AdjustCandidatePool(lu):
672
  """Adjust the candidate pool after node operations.
673

674
  """
675
  mod_list = lu.cfg.MaintainCandidatePool()
676
  if mod_list:
677
    lu.LogInfo("Promoted nodes to master candidate role: %s",
678
               ", ".join(node.name for node in mod_list))
679
    for name in mod_list:
680
      lu.context.ReaddNode(name)
681
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
682
  if mc_now > mc_max:
683
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
684
               (mc_now, mc_max))
685

    
686

    
687
def _CheckNicsBridgesExist(lu, target_nics, target_node,
688
                               profile=constants.PP_DEFAULT):
689
  """Check that the brigdes needed by a list of nics exist.
690

691
  """
692
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
693
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
694
                for nic in target_nics]
695
  brlist = [params[constants.NIC_LINK] for params in paramslist
696
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
697
  if brlist:
698
    result = lu.rpc.call_bridges_exist(target_node, brlist)
699
    result.Raise("Error checking bridges on destination node '%s'" %
700
                 target_node, prereq=True)
701

    
702

    
703
def _CheckInstanceBridgesExist(lu, instance, node=None):
704
  """Check that the brigdes needed by an instance exist.
705

706
  """
707
  if node is None:
708
    node = instance.primary_node
709
  _CheckNicsBridgesExist(lu, instance.nics, node)
710

    
711

    
712
def _GetNodePrimaryInstances(cfg, node_name):
713
  """Returns primary instances on a node.
714

715
  """
716
  instances = []
717

    
718
  for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
719
    if node_name == inst.primary_node:
720
      instances.append(inst)
721

    
722
  return instances
723

    
724

    
725
def _GetNodeSecondaryInstances(cfg, node_name):
726
  """Returns secondary instances on a node.
727

728
  """
729
  instances = []
730

    
731
  for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
732
    if node_name in inst.secondary_nodes:
733
      instances.append(inst)
734

    
735
  return instances
736

    
737

    
738
class LUDestroyCluster(NoHooksLU):
739
  """Logical unit for destroying the cluster.
740

741
  """
742
  _OP_REQP = []
743

    
744
  def CheckPrereq(self):
745
    """Check prerequisites.
746

747
    This checks whether the cluster is empty.
748

749
    Any errors are signaled by raising errors.OpPrereqError.
750

751
    """
752
    master = self.cfg.GetMasterNode()
753

    
754
    nodelist = self.cfg.GetNodeList()
755
    if len(nodelist) != 1 or nodelist[0] != master:
756
      raise errors.OpPrereqError("There are still %d node(s) in"
757
                                 " this cluster." % (len(nodelist) - 1))
758
    instancelist = self.cfg.GetInstanceList()
759
    if instancelist:
760
      raise errors.OpPrereqError("There are still %d instance(s) in"
761
                                 " this cluster." % len(instancelist))
762

    
763
  def Exec(self, feedback_fn):
764
    """Destroys the cluster.
765

766
    """
767
    master = self.cfg.GetMasterNode()
768
    result = self.rpc.call_node_stop_master(master, False)
769
    result.Raise("Could not disable the master role")
770
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
771
    utils.CreateBackup(priv_key)
772
    utils.CreateBackup(pub_key)
773
    return master
774

    
775

    
776
class LUVerifyCluster(LogicalUnit):
777
  """Verifies the cluster status.
778

779
  """
780
  HPATH = "cluster-verify"
781
  HTYPE = constants.HTYPE_CLUSTER
782
  _OP_REQP = ["skip_checks"]
783
  REQ_BGL = False
784

    
785
  def ExpandNames(self):
786
    self.needed_locks = {
787
      locking.LEVEL_NODE: locking.ALL_SET,
788
      locking.LEVEL_INSTANCE: locking.ALL_SET,
789
    }
790
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
791

    
792
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
793
                  node_result, feedback_fn, master_files,
794
                  drbd_map, vg_name):
795
    """Run multiple tests against a node.
796

797
    Test list:
798

799
      - compares ganeti version
800
      - checks vg existence and size > 20G
801
      - checks config file checksum
802
      - checks ssh to other nodes
803

804
    @type nodeinfo: L{objects.Node}
805
    @param nodeinfo: the node to check
806
    @param file_list: required list of files
807
    @param local_cksum: dictionary of local files and their checksums
808
    @param node_result: the results from the node
809
    @param feedback_fn: function used to accumulate results
810
    @param master_files: list of files that only masters should have
811
    @param drbd_map: the useddrbd minors for this node, in
812
        form of minor: (instance, must_exist) which correspond to instances
813
        and their running status
814
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
815

816
    """
817
    node = nodeinfo.name
818

    
819
    # main result, node_result should be a non-empty dict
820
    if not node_result or not isinstance(node_result, dict):
821
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
822
      return True
823

    
824
    # compares ganeti version
825
    local_version = constants.PROTOCOL_VERSION
826
    remote_version = node_result.get('version', None)
827
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
828
            len(remote_version) == 2):
829
      feedback_fn("  - ERROR: connection to %s failed" % (node))
830
      return True
831

    
832
    if local_version != remote_version[0]:
833
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
834
                  " node %s %s" % (local_version, node, remote_version[0]))
835
      return True
836

    
837
    # node seems compatible, we can actually try to look into its results
838

    
839
    bad = False
840

    
841
    # full package version
842
    if constants.RELEASE_VERSION != remote_version[1]:
843
      feedback_fn("  - WARNING: software version mismatch: master %s,"
844
                  " node %s %s" %
845
                  (constants.RELEASE_VERSION, node, remote_version[1]))
846

    
847
    # checks vg existence and size > 20G
848
    if vg_name is not None:
849
      vglist = node_result.get(constants.NV_VGLIST, None)
850
      if not vglist:
851
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
852
                        (node,))
853
        bad = True
854
      else:
855
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
856
                                              constants.MIN_VG_SIZE)
857
        if vgstatus:
858
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
859
          bad = True
860

    
861
    # checks config file checksum
862

    
863
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
864
    if not isinstance(remote_cksum, dict):
865
      bad = True
866
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
867
    else:
868
      for file_name in file_list:
869
        node_is_mc = nodeinfo.master_candidate
870
        must_have_file = file_name not in master_files
871
        if file_name not in remote_cksum:
872
          if node_is_mc or must_have_file:
873
            bad = True
874
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
875
        elif remote_cksum[file_name] != local_cksum[file_name]:
876
          if node_is_mc or must_have_file:
877
            bad = True
878
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
879
          else:
880
            # not candidate and this is not a must-have file
881
            bad = True
882
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
883
                        " candidates (and the file is outdated)" % file_name)
884
        else:
885
          # all good, except non-master/non-must have combination
886
          if not node_is_mc and not must_have_file:
887
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
888
                        " candidates" % file_name)
889

    
890
    # checks ssh to any
891

    
892
    if constants.NV_NODELIST not in node_result:
893
      bad = True
894
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
895
    else:
896
      if node_result[constants.NV_NODELIST]:
897
        bad = True
898
        for node in node_result[constants.NV_NODELIST]:
899
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
900
                          (node, node_result[constants.NV_NODELIST][node]))
901

    
902
    if constants.NV_NODENETTEST not in node_result:
903
      bad = True
904
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
905
    else:
906
      if node_result[constants.NV_NODENETTEST]:
907
        bad = True
908
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
909
        for node in nlist:
910
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
911
                          (node, node_result[constants.NV_NODENETTEST][node]))
912

    
913
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
914
    if isinstance(hyp_result, dict):
915
      for hv_name, hv_result in hyp_result.iteritems():
916
        if hv_result is not None:
917
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
918
                      (hv_name, hv_result))
919

    
920
    # check used drbd list
921
    if vg_name is not None:
922
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
923
      if not isinstance(used_minors, (tuple, list)):
924
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
925
                    str(used_minors))
926
      else:
927
        for minor, (iname, must_exist) in drbd_map.items():
928
          if minor not in used_minors and must_exist:
929
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
930
                        " not active" % (minor, iname))
931
            bad = True
932
        for minor in used_minors:
933
          if minor not in drbd_map:
934
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
935
                        minor)
936
            bad = True
937

    
938
    return bad
939

    
940
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
941
                      node_instance, feedback_fn, n_offline):
942
    """Verify an instance.
943

944
    This function checks to see if the required block devices are
945
    available on the instance's node.
946

947
    """
948
    bad = False
949

    
950
    node_current = instanceconfig.primary_node
951

    
952
    node_vol_should = {}
953
    instanceconfig.MapLVsByNode(node_vol_should)
954

    
955
    for node in node_vol_should:
956
      if node in n_offline:
957
        # ignore missing volumes on offline nodes
958
        continue
959
      for volume in node_vol_should[node]:
960
        if node not in node_vol_is or volume not in node_vol_is[node]:
961
          feedback_fn("  - ERROR: volume %s missing on node %s" %
962
                          (volume, node))
963
          bad = True
964

    
965
    if instanceconfig.admin_up:
966
      if ((node_current not in node_instance or
967
          not instance in node_instance[node_current]) and
968
          node_current not in n_offline):
969
        feedback_fn("  - ERROR: instance %s not running on node %s" %
970
                        (instance, node_current))
971
        bad = True
972

    
973
    for node in node_instance:
974
      if (not node == node_current):
975
        if instance in node_instance[node]:
976
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
977
                          (instance, node))
978
          bad = True
979

    
980
    return bad
981

    
982
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
983
    """Verify if there are any unknown volumes in the cluster.
984

985
    The .os, .swap and backup volumes are ignored. All other volumes are
986
    reported as unknown.
987

988
    """
989
    bad = False
990

    
991
    for node in node_vol_is:
992
      for volume in node_vol_is[node]:
993
        if node not in node_vol_should or volume not in node_vol_should[node]:
994
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
995
                      (volume, node))
996
          bad = True
997
    return bad
998

    
999
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1000
    """Verify the list of running instances.
1001

1002
    This checks what instances are running but unknown to the cluster.
1003

1004
    """
1005
    bad = False
1006
    for node in node_instance:
1007
      for runninginstance in node_instance[node]:
1008
        if runninginstance not in instancelist:
1009
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
1010
                          (runninginstance, node))
1011
          bad = True
1012
    return bad
1013

    
1014
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1015
    """Verify N+1 Memory Resilience.
1016

1017
    Check that if one single node dies we can still start all the instances it
1018
    was primary for.
1019

1020
    """
1021
    bad = False
1022

    
1023
    for node, nodeinfo in node_info.iteritems():
1024
      # This code checks that every node which is now listed as secondary has
1025
      # enough memory to host all instances it is supposed to should a single
1026
      # other node in the cluster fail.
1027
      # FIXME: not ready for failover to an arbitrary node
1028
      # FIXME: does not support file-backed instances
1029
      # WARNING: we currently take into account down instances as well as up
1030
      # ones, considering that even if they're down someone might want to start
1031
      # them even in the event of a node failure.
1032
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1033
        needed_mem = 0
1034
        for instance in instances:
1035
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1036
          if bep[constants.BE_AUTO_BALANCE]:
1037
            needed_mem += bep[constants.BE_MEMORY]
1038
        if nodeinfo['mfree'] < needed_mem:
1039
          feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
1040
                      " failovers should node %s fail" % (node, prinode))
1041
          bad = True
1042
    return bad
1043

    
1044
  def CheckPrereq(self):
1045
    """Check prerequisites.
1046

1047
    Transform the list of checks we're going to skip into a set and check that
1048
    all its members are valid.
1049

1050
    """
1051
    self.skip_set = frozenset(self.op.skip_checks)
1052
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1053
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
1054

    
1055
  def BuildHooksEnv(self):
1056
    """Build hooks env.
1057

1058
    Cluster-Verify hooks just ran in the post phase and their failure makes
1059
    the output be logged in the verify output and the verification to fail.
1060

1061
    """
1062
    all_nodes = self.cfg.GetNodeList()
1063
    env = {
1064
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1065
      }
1066
    for node in self.cfg.GetAllNodesInfo().values():
1067
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1068

    
1069
    return env, [], all_nodes
1070

    
1071
  def Exec(self, feedback_fn):
1072
    """Verify integrity of cluster, performing various test on nodes.
1073

1074
    """
1075
    bad = False
1076
    feedback_fn("* Verifying global settings")
1077
    for msg in self.cfg.VerifyConfig():
1078
      feedback_fn("  - ERROR: %s" % msg)
1079

    
1080
    vg_name = self.cfg.GetVGName()
1081
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1082
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1083
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1084
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1085
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1086
                        for iname in instancelist)
1087
    i_non_redundant = [] # Non redundant instances
1088
    i_non_a_balanced = [] # Non auto-balanced instances
1089
    n_offline = [] # List of offline nodes
1090
    n_drained = [] # List of nodes being drained
1091
    node_volume = {}
1092
    node_instance = {}
1093
    node_info = {}
1094
    instance_cfg = {}
1095

    
1096
    # FIXME: verify OS list
1097
    # do local checksums
1098
    master_files = [constants.CLUSTER_CONF_FILE]
1099

    
1100
    file_names = ssconf.SimpleStore().GetFileList()
1101
    file_names.append(constants.SSL_CERT_FILE)
1102
    file_names.append(constants.RAPI_CERT_FILE)
1103
    file_names.extend(master_files)
1104

    
1105
    local_checksums = utils.FingerprintFiles(file_names)
1106

    
1107
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1108
    node_verify_param = {
1109
      constants.NV_FILELIST: file_names,
1110
      constants.NV_NODELIST: [node.name for node in nodeinfo
1111
                              if not node.offline],
1112
      constants.NV_HYPERVISOR: hypervisors,
1113
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1114
                                  node.secondary_ip) for node in nodeinfo
1115
                                 if not node.offline],
1116
      constants.NV_INSTANCELIST: hypervisors,
1117
      constants.NV_VERSION: None,
1118
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1119
      }
1120
    if vg_name is not None:
1121
      node_verify_param[constants.NV_VGLIST] = None
1122
      node_verify_param[constants.NV_LVLIST] = vg_name
1123
      node_verify_param[constants.NV_DRBDLIST] = None
1124
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1125
                                           self.cfg.GetClusterName())
1126

    
1127
    cluster = self.cfg.GetClusterInfo()
1128
    master_node = self.cfg.GetMasterNode()
1129
    all_drbd_map = self.cfg.ComputeDRBDMap()
1130

    
1131
    for node_i in nodeinfo:
1132
      node = node_i.name
1133

    
1134
      if node_i.offline:
1135
        feedback_fn("* Skipping offline node %s" % (node,))
1136
        n_offline.append(node)
1137
        continue
1138

    
1139
      if node == master_node:
1140
        ntype = "master"
1141
      elif node_i.master_candidate:
1142
        ntype = "master candidate"
1143
      elif node_i.drained:
1144
        ntype = "drained"
1145
        n_drained.append(node)
1146
      else:
1147
        ntype = "regular"
1148
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1149

    
1150
      msg = all_nvinfo[node].fail_msg
1151
      if msg:
1152
        feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1153
        bad = True
1154
        continue
1155

    
1156
      nresult = all_nvinfo[node].payload
1157
      node_drbd = {}
1158
      for minor, instance in all_drbd_map[node].items():
1159
        if instance not in instanceinfo:
1160
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1161
                      instance)
1162
          # ghost instance should not be running, but otherwise we
1163
          # don't give double warnings (both ghost instance and
1164
          # unallocated minor in use)
1165
          node_drbd[minor] = (instance, False)
1166
        else:
1167
          instance = instanceinfo[instance]
1168
          node_drbd[minor] = (instance.name, instance.admin_up)
1169
      result = self._VerifyNode(node_i, file_names, local_checksums,
1170
                                nresult, feedback_fn, master_files,
1171
                                node_drbd, vg_name)
1172
      bad = bad or result
1173

    
1174
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1175
      if vg_name is None:
1176
        node_volume[node] = {}
1177
      elif isinstance(lvdata, basestring):
1178
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1179
                    (node, utils.SafeEncode(lvdata)))
1180
        bad = True
1181
        node_volume[node] = {}
1182
      elif not isinstance(lvdata, dict):
1183
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1184
        bad = True
1185
        continue
1186
      else:
1187
        node_volume[node] = lvdata
1188

    
1189
      # node_instance
1190
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1191
      if not isinstance(idata, list):
1192
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1193
                    (node,))
1194
        bad = True
1195
        continue
1196

    
1197
      node_instance[node] = idata
1198

    
1199
      # node_info
1200
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1201
      if not isinstance(nodeinfo, dict):
1202
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1203
        bad = True
1204
        continue
1205

    
1206
      try:
1207
        node_info[node] = {
1208
          "mfree": int(nodeinfo['memory_free']),
1209
          "pinst": [],
1210
          "sinst": [],
1211
          # dictionary holding all instances this node is secondary for,
1212
          # grouped by their primary node. Each key is a cluster node, and each
1213
          # value is a list of instances which have the key as primary and the
1214
          # current node as secondary.  this is handy to calculate N+1 memory
1215
          # availability if you can only failover from a primary to its
1216
          # secondary.
1217
          "sinst-by-pnode": {},
1218
        }
1219
        # FIXME: devise a free space model for file based instances as well
1220
        if vg_name is not None:
1221
          if (constants.NV_VGLIST not in nresult or
1222
              vg_name not in nresult[constants.NV_VGLIST]):
1223
            feedback_fn("  - ERROR: node %s didn't return data for the"
1224
                        " volume group '%s' - it is either missing or broken" %
1225
                        (node, vg_name))
1226
            bad = True
1227
            continue
1228
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1229
      except (ValueError, KeyError):
1230
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1231
                    " from node %s" % (node,))
1232
        bad = True
1233
        continue
1234

    
1235
    node_vol_should = {}
1236

    
1237
    for instance in instancelist:
1238
      feedback_fn("* Verifying instance %s" % instance)
1239
      inst_config = instanceinfo[instance]
1240
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1241
                                     node_instance, feedback_fn, n_offline)
1242
      bad = bad or result
1243
      inst_nodes_offline = []
1244

    
1245
      inst_config.MapLVsByNode(node_vol_should)
1246

    
1247
      instance_cfg[instance] = inst_config
1248

    
1249
      pnode = inst_config.primary_node
1250
      if pnode in node_info:
1251
        node_info[pnode]['pinst'].append(instance)
1252
      elif pnode not in n_offline:
1253
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1254
                    " %s failed" % (instance, pnode))
1255
        bad = True
1256

    
1257
      if pnode in n_offline:
1258
        inst_nodes_offline.append(pnode)
1259

    
1260
      # If the instance is non-redundant we cannot survive losing its primary
1261
      # node, so we are not N+1 compliant. On the other hand we have no disk
1262
      # templates with more than one secondary so that situation is not well
1263
      # supported either.
1264
      # FIXME: does not support file-backed instances
1265
      if len(inst_config.secondary_nodes) == 0:
1266
        i_non_redundant.append(instance)
1267
      elif len(inst_config.secondary_nodes) > 1:
1268
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1269
                    % instance)
1270

    
1271
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1272
        i_non_a_balanced.append(instance)
1273

    
1274
      for snode in inst_config.secondary_nodes:
1275
        if snode in node_info:
1276
          node_info[snode]['sinst'].append(instance)
1277
          if pnode not in node_info[snode]['sinst-by-pnode']:
1278
            node_info[snode]['sinst-by-pnode'][pnode] = []
1279
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1280
        elif snode not in n_offline:
1281
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1282
                      " %s failed" % (instance, snode))
1283
          bad = True
1284
        if snode in n_offline:
1285
          inst_nodes_offline.append(snode)
1286

    
1287
      if inst_nodes_offline:
1288
        # warn that the instance lives on offline nodes, and set bad=True
1289
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1290
                    ", ".join(inst_nodes_offline))
1291
        bad = True
1292

    
1293
    feedback_fn("* Verifying orphan volumes")
1294
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1295
                                       feedback_fn)
1296
    bad = bad or result
1297

    
1298
    feedback_fn("* Verifying remaining instances")
1299
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1300
                                         feedback_fn)
1301
    bad = bad or result
1302

    
1303
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1304
      feedback_fn("* Verifying N+1 Memory redundancy")
1305
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1306
      bad = bad or result
1307

    
1308
    feedback_fn("* Other Notes")
1309
    if i_non_redundant:
1310
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1311
                  % len(i_non_redundant))
1312

    
1313
    if i_non_a_balanced:
1314
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1315
                  % len(i_non_a_balanced))
1316

    
1317
    if n_offline:
1318
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1319

    
1320
    if n_drained:
1321
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1322

    
1323
    return not bad
1324

    
1325
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1326
    """Analyze the post-hooks' result
1327

1328
    This method analyses the hook result, handles it, and sends some
1329
    nicely-formatted feedback back to the user.
1330

1331
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1332
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1333
    @param hooks_results: the results of the multi-node hooks rpc call
1334
    @param feedback_fn: function used send feedback back to the caller
1335
    @param lu_result: previous Exec result
1336
    @return: the new Exec result, based on the previous result
1337
        and hook results
1338

1339
    """
1340
    # We only really run POST phase hooks, and are only interested in
1341
    # their results
1342
    if phase == constants.HOOKS_PHASE_POST:
1343
      # Used to change hooks' output to proper indentation
1344
      indent_re = re.compile('^', re.M)
1345
      feedback_fn("* Hooks Results")
1346
      if not hooks_results:
1347
        feedback_fn("  - ERROR: general communication failure")
1348
        lu_result = 1
1349
      else:
1350
        for node_name in hooks_results:
1351
          show_node_header = True
1352
          res = hooks_results[node_name]
1353
          msg = res.fail_msg
1354
          if msg:
1355
            if res.offline:
1356
              # no need to warn or set fail return value
1357
              continue
1358
            feedback_fn("    Communication failure in hooks execution: %s" %
1359
                        msg)
1360
            lu_result = 1
1361
            continue
1362
          for script, hkr, output in res.payload:
1363
            if hkr == constants.HKR_FAIL:
1364
              # The node header is only shown once, if there are
1365
              # failing hooks on that node
1366
              if show_node_header:
1367
                feedback_fn("  Node %s:" % node_name)
1368
                show_node_header = False
1369
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1370
              output = indent_re.sub('      ', output)
1371
              feedback_fn("%s" % output)
1372
              lu_result = 1
1373

    
1374
      return lu_result
1375

    
1376

    
1377
class LUVerifyDisks(NoHooksLU):
1378
  """Verifies the cluster disks status.
1379

1380
  """
1381
  _OP_REQP = []
1382
  REQ_BGL = False
1383

    
1384
  def ExpandNames(self):
1385
    self.needed_locks = {
1386
      locking.LEVEL_NODE: locking.ALL_SET,
1387
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1388
    }
1389
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1390

    
1391
  def CheckPrereq(self):
1392
    """Check prerequisites.
1393

1394
    This has no prerequisites.
1395

1396
    """
1397
    pass
1398

    
1399
  def Exec(self, feedback_fn):
1400
    """Verify integrity of cluster disks.
1401

1402
    @rtype: tuple of three items
1403
    @return: a tuple of (dict of node-to-node_error, list of instances
1404
        which need activate-disks, dict of instance: (node, volume) for
1405
        missing volumes
1406

1407
    """
1408
    result = res_nodes, res_instances, res_missing = {}, [], {}
1409

    
1410
    vg_name = self.cfg.GetVGName()
1411
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1412
    instances = [self.cfg.GetInstanceInfo(name)
1413
                 for name in self.cfg.GetInstanceList()]
1414

    
1415
    nv_dict = {}
1416
    for inst in instances:
1417
      inst_lvs = {}
1418
      if (not inst.admin_up or
1419
          inst.disk_template not in constants.DTS_NET_MIRROR):
1420
        continue
1421
      inst.MapLVsByNode(inst_lvs)
1422
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1423
      for node, vol_list in inst_lvs.iteritems():
1424
        for vol in vol_list:
1425
          nv_dict[(node, vol)] = inst
1426

    
1427
    if not nv_dict:
1428
      return result
1429

    
1430
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1431

    
1432
    for node in nodes:
1433
      # node_volume
1434
      node_res = node_lvs[node]
1435
      if node_res.offline:
1436
        continue
1437
      msg = node_res.fail_msg
1438
      if msg:
1439
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1440
        res_nodes[node] = msg
1441
        continue
1442

    
1443
      lvs = node_res.payload
1444
      for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1445
        inst = nv_dict.pop((node, lv_name), None)
1446
        if (not lv_online and inst is not None
1447
            and inst.name not in res_instances):
1448
          res_instances.append(inst.name)
1449

    
1450
    # any leftover items in nv_dict are missing LVs, let's arrange the
1451
    # data better
1452
    for key, inst in nv_dict.iteritems():
1453
      if inst.name not in res_missing:
1454
        res_missing[inst.name] = []
1455
      res_missing[inst.name].append(key)
1456

    
1457
    return result
1458

    
1459

    
1460
class LURenameCluster(LogicalUnit):
1461
  """Rename the cluster.
1462

1463
  """
1464
  HPATH = "cluster-rename"
1465
  HTYPE = constants.HTYPE_CLUSTER
1466
  _OP_REQP = ["name"]
1467

    
1468
  def BuildHooksEnv(self):
1469
    """Build hooks env.
1470

1471
    """
1472
    env = {
1473
      "OP_TARGET": self.cfg.GetClusterName(),
1474
      "NEW_NAME": self.op.name,
1475
      }
1476
    mn = self.cfg.GetMasterNode()
1477
    return env, [mn], [mn]
1478

    
1479
  def CheckPrereq(self):
1480
    """Verify that the passed name is a valid one.
1481

1482
    """
1483
    hostname = utils.HostInfo(self.op.name)
1484

    
1485
    new_name = hostname.name
1486
    self.ip = new_ip = hostname.ip
1487
    old_name = self.cfg.GetClusterName()
1488
    old_ip = self.cfg.GetMasterIP()
1489
    if new_name == old_name and new_ip == old_ip:
1490
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1491
                                 " cluster has changed")
1492
    if new_ip != old_ip:
1493
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1494
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1495
                                   " reachable on the network. Aborting." %
1496
                                   new_ip)
1497

    
1498
    self.op.name = new_name
1499

    
1500
  def Exec(self, feedback_fn):
1501
    """Rename the cluster.
1502

1503
    """
1504
    clustername = self.op.name
1505
    ip = self.ip
1506

    
1507
    # shutdown the master IP
1508
    master = self.cfg.GetMasterNode()
1509
    result = self.rpc.call_node_stop_master(master, False)
1510
    result.Raise("Could not disable the master role")
1511

    
1512
    try:
1513
      cluster = self.cfg.GetClusterInfo()
1514
      cluster.cluster_name = clustername
1515
      cluster.master_ip = ip
1516
      self.cfg.Update(cluster)
1517

    
1518
      # update the known hosts file
1519
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1520
      node_list = self.cfg.GetNodeList()
1521
      try:
1522
        node_list.remove(master)
1523
      except ValueError:
1524
        pass
1525
      result = self.rpc.call_upload_file(node_list,
1526
                                         constants.SSH_KNOWN_HOSTS_FILE)
1527
      for to_node, to_result in result.iteritems():
1528
        msg = to_result.fail_msg
1529
        if msg:
1530
          msg = ("Copy of file %s to node %s failed: %s" %
1531
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1532
          self.proc.LogWarning(msg)
1533

    
1534
    finally:
1535
      result = self.rpc.call_node_start_master(master, False, False)
1536
      msg = result.fail_msg
1537
      if msg:
1538
        self.LogWarning("Could not re-enable the master role on"
1539
                        " the master, please restart manually: %s", msg)
1540

    
1541

    
1542
def _RecursiveCheckIfLVMBased(disk):
1543
  """Check if the given disk or its children are lvm-based.
1544

1545
  @type disk: L{objects.Disk}
1546
  @param disk: the disk to check
1547
  @rtype: boolean
1548
  @return: boolean indicating whether a LD_LV dev_type was found or not
1549

1550
  """
1551
  if disk.children:
1552
    for chdisk in disk.children:
1553
      if _RecursiveCheckIfLVMBased(chdisk):
1554
        return True
1555
  return disk.dev_type == constants.LD_LV
1556

    
1557

    
1558
class LUSetClusterParams(LogicalUnit):
1559
  """Change the parameters of the cluster.
1560

1561
  """
1562
  HPATH = "cluster-modify"
1563
  HTYPE = constants.HTYPE_CLUSTER
1564
  _OP_REQP = []
1565
  REQ_BGL = False
1566

    
1567
  def CheckArguments(self):
1568
    """Check parameters
1569

1570
    """
1571
    if not hasattr(self.op, "candidate_pool_size"):
1572
      self.op.candidate_pool_size = None
1573
    if self.op.candidate_pool_size is not None:
1574
      try:
1575
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1576
      except (ValueError, TypeError), err:
1577
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1578
                                   str(err))
1579
      if self.op.candidate_pool_size < 1:
1580
        raise errors.OpPrereqError("At least one master candidate needed")
1581

    
1582
  def ExpandNames(self):
1583
    # FIXME: in the future maybe other cluster params won't require checking on
1584
    # all nodes to be modified.
1585
    self.needed_locks = {
1586
      locking.LEVEL_NODE: locking.ALL_SET,
1587
    }
1588
    self.share_locks[locking.LEVEL_NODE] = 1
1589

    
1590
  def BuildHooksEnv(self):
1591
    """Build hooks env.
1592

1593
    """
1594
    env = {
1595
      "OP_TARGET": self.cfg.GetClusterName(),
1596
      "NEW_VG_NAME": self.op.vg_name,
1597
      }
1598
    mn = self.cfg.GetMasterNode()
1599
    return env, [mn], [mn]
1600

    
1601
  def CheckPrereq(self):
1602
    """Check prerequisites.
1603

1604
    This checks whether the given params don't conflict and
1605
    if the given volume group is valid.
1606

1607
    """
1608
    if self.op.vg_name is not None and not self.op.vg_name:
1609
      instances = self.cfg.GetAllInstancesInfo().values()
1610
      for inst in instances:
1611
        for disk in inst.disks:
1612
          if _RecursiveCheckIfLVMBased(disk):
1613
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1614
                                       " lvm-based instances exist")
1615

    
1616
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1617

    
1618
    # if vg_name not None, checks given volume group on all nodes
1619
    if self.op.vg_name:
1620
      vglist = self.rpc.call_vg_list(node_list)
1621
      for node in node_list:
1622
        msg = vglist[node].fail_msg
1623
        if msg:
1624
          # ignoring down node
1625
          self.LogWarning("Error while gathering data on node %s"
1626
                          " (ignoring node): %s", node, msg)
1627
          continue
1628
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1629
                                              self.op.vg_name,
1630
                                              constants.MIN_VG_SIZE)
1631
        if vgstatus:
1632
          raise errors.OpPrereqError("Error on node '%s': %s" %
1633
                                     (node, vgstatus))
1634

    
1635
    self.cluster = cluster = self.cfg.GetClusterInfo()
1636
    # validate params changes
1637
    if self.op.beparams:
1638
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1639
      self.new_beparams = objects.FillDict(
1640
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1641

    
1642
    if self.op.nicparams:
1643
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1644
      self.new_nicparams = objects.FillDict(
1645
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1646
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1647

    
1648
    # hypervisor list/parameters
1649
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1650
    if self.op.hvparams:
1651
      if not isinstance(self.op.hvparams, dict):
1652
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1653
      for hv_name, hv_dict in self.op.hvparams.items():
1654
        if hv_name not in self.new_hvparams:
1655
          self.new_hvparams[hv_name] = hv_dict
1656
        else:
1657
          self.new_hvparams[hv_name].update(hv_dict)
1658

    
1659
    if self.op.enabled_hypervisors is not None:
1660
      self.hv_list = self.op.enabled_hypervisors
1661
      if not self.hv_list:
1662
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1663
                                   " least one member")
1664
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1665
      if invalid_hvs:
1666
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1667
                                   " entries: %s" % invalid_hvs)
1668
    else:
1669
      self.hv_list = cluster.enabled_hypervisors
1670

    
1671
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1672
      # either the enabled list has changed, or the parameters have, validate
1673
      for hv_name, hv_params in self.new_hvparams.items():
1674
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1675
            (self.op.enabled_hypervisors and
1676
             hv_name in self.op.enabled_hypervisors)):
1677
          # either this is a new hypervisor, or its parameters have changed
1678
          hv_class = hypervisor.GetHypervisor(hv_name)
1679
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1680
          hv_class.CheckParameterSyntax(hv_params)
1681
          _CheckHVParams(self, node_list, hv_name, hv_params)
1682

    
1683
  def Exec(self, feedback_fn):
1684
    """Change the parameters of the cluster.
1685

1686
    """
1687
    if self.op.vg_name is not None:
1688
      new_volume = self.op.vg_name
1689
      if not new_volume:
1690
        new_volume = None
1691
      if new_volume != self.cfg.GetVGName():
1692
        self.cfg.SetVGName(new_volume)
1693
      else:
1694
        feedback_fn("Cluster LVM configuration already in desired"
1695
                    " state, not changing")
1696
    if self.op.hvparams:
1697
      self.cluster.hvparams = self.new_hvparams
1698
    if self.op.enabled_hypervisors is not None:
1699
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1700
    if self.op.beparams:
1701
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1702
    if self.op.nicparams:
1703
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1704

    
1705
    if self.op.candidate_pool_size is not None:
1706
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1707
      # we need to update the pool size here, otherwise the save will fail
1708
      _AdjustCandidatePool(self)
1709

    
1710
    self.cfg.Update(self.cluster)
1711

    
1712

    
1713
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1714
  """Distribute additional files which are part of the cluster configuration.
1715

1716
  ConfigWriter takes care of distributing the config and ssconf files, but
1717
  there are more files which should be distributed to all nodes. This function
1718
  makes sure those are copied.
1719

1720
  @param lu: calling logical unit
1721
  @param additional_nodes: list of nodes not in the config to distribute to
1722

1723
  """
1724
  # 1. Gather target nodes
1725
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1726
  dist_nodes = lu.cfg.GetNodeList()
1727
  if additional_nodes is not None:
1728
    dist_nodes.extend(additional_nodes)
1729
  if myself.name in dist_nodes:
1730
    dist_nodes.remove(myself.name)
1731
  # 2. Gather files to distribute
1732
  dist_files = set([constants.ETC_HOSTS,
1733
                    constants.SSH_KNOWN_HOSTS_FILE,
1734
                    constants.RAPI_CERT_FILE,
1735
                    constants.RAPI_USERS_FILE,
1736
                    constants.HMAC_CLUSTER_KEY,
1737
                   ])
1738

    
1739
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1740
  for hv_name in enabled_hypervisors:
1741
    hv_class = hypervisor.GetHypervisor(hv_name)
1742
    dist_files.update(hv_class.GetAncillaryFiles())
1743

    
1744
  # 3. Perform the files upload
1745
  for fname in dist_files:
1746
    if os.path.exists(fname):
1747
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1748
      for to_node, to_result in result.items():
1749
        msg = to_result.fail_msg
1750
        if msg:
1751
          msg = ("Copy of file %s to node %s failed: %s" %
1752
                 (fname, to_node, msg))
1753
          lu.proc.LogWarning(msg)
1754

    
1755

    
1756
class LURedistributeConfig(NoHooksLU):
1757
  """Force the redistribution of cluster configuration.
1758

1759
  This is a very simple LU.
1760

1761
  """
1762
  _OP_REQP = []
1763
  REQ_BGL = False
1764

    
1765
  def ExpandNames(self):
1766
    self.needed_locks = {
1767
      locking.LEVEL_NODE: locking.ALL_SET,
1768
    }
1769
    self.share_locks[locking.LEVEL_NODE] = 1
1770

    
1771
  def CheckPrereq(self):
1772
    """Check prerequisites.
1773

1774
    """
1775

    
1776
  def Exec(self, feedback_fn):
1777
    """Redistribute the configuration.
1778

1779
    """
1780
    self.cfg.Update(self.cfg.GetClusterInfo())
1781
    _RedistributeAncillaryFiles(self)
1782

    
1783

    
1784
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1785
  """Sleep and poll for an instance's disk to sync.
1786

1787
  """
1788
  if not instance.disks:
1789
    return True
1790

    
1791
  if not oneshot:
1792
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1793

    
1794
  node = instance.primary_node
1795

    
1796
  for dev in instance.disks:
1797
    lu.cfg.SetDiskID(dev, node)
1798

    
1799
  retries = 0
1800
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1801
  while True:
1802
    max_time = 0
1803
    done = True
1804
    cumul_degraded = False
1805
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1806
    msg = rstats.fail_msg
1807
    if msg:
1808
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1809
      retries += 1
1810
      if retries >= 10:
1811
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1812
                                 " aborting." % node)
1813
      time.sleep(6)
1814
      continue
1815
    rstats = rstats.payload
1816
    retries = 0
1817
    for i, mstat in enumerate(rstats):
1818
      if mstat is None:
1819
        lu.LogWarning("Can't compute data for node %s/%s",
1820
                           node, instance.disks[i].iv_name)
1821
        continue
1822
      # we ignore the ldisk parameter
1823
      perc_done, est_time, is_degraded, _ = mstat
1824
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1825
      if perc_done is not None:
1826
        done = False
1827
        if est_time is not None:
1828
          rem_time = "%d estimated seconds remaining" % est_time
1829
          max_time = est_time
1830
        else:
1831
          rem_time = "no time estimate"
1832
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1833
                        (instance.disks[i].iv_name, perc_done, rem_time))
1834

    
1835
    # if we're done but degraded, let's do a few small retries, to
1836
    # make sure we see a stable and not transient situation; therefore
1837
    # we force restart of the loop
1838
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1839
      logging.info("Degraded disks found, %d retries left", degr_retries)
1840
      degr_retries -= 1
1841
      time.sleep(1)
1842
      continue
1843

    
1844
    if done or oneshot:
1845
      break
1846

    
1847
    time.sleep(min(60, max_time))
1848

    
1849
  if done:
1850
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1851
  return not cumul_degraded
1852

    
1853

    
1854
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1855
  """Check that mirrors are not degraded.
1856

1857
  The ldisk parameter, if True, will change the test from the
1858
  is_degraded attribute (which represents overall non-ok status for
1859
  the device(s)) to the ldisk (representing the local storage status).
1860

1861
  """
1862
  lu.cfg.SetDiskID(dev, node)
1863
  if ldisk:
1864
    idx = 6
1865
  else:
1866
    idx = 5
1867

    
1868
  result = True
1869
  if on_primary or dev.AssembleOnSecondary():
1870
    rstats = lu.rpc.call_blockdev_find(node, dev)
1871
    msg = rstats.fail_msg
1872
    if msg:
1873
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1874
      result = False
1875
    elif not rstats.payload:
1876
      lu.LogWarning("Can't find disk on node %s", node)
1877
      result = False
1878
    else:
1879
      result = result and (not rstats.payload[idx])
1880
  if dev.children:
1881
    for child in dev.children:
1882
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1883

    
1884
  return result
1885

    
1886

    
1887
class LUDiagnoseOS(NoHooksLU):
1888
  """Logical unit for OS diagnose/query.
1889

1890
  """
1891
  _OP_REQP = ["output_fields", "names"]
1892
  REQ_BGL = False
1893
  _FIELDS_STATIC = utils.FieldSet()
1894
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1895

    
1896
  def ExpandNames(self):
1897
    if self.op.names:
1898
      raise errors.OpPrereqError("Selective OS query not supported")
1899

    
1900
    _CheckOutputFields(static=self._FIELDS_STATIC,
1901
                       dynamic=self._FIELDS_DYNAMIC,
1902
                       selected=self.op.output_fields)
1903

    
1904
    # Lock all nodes, in shared mode
1905
    # Temporary removal of locks, should be reverted later
1906
    # TODO: reintroduce locks when they are lighter-weight
1907
    self.needed_locks = {}
1908
    #self.share_locks[locking.LEVEL_NODE] = 1
1909
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1910

    
1911
  def CheckPrereq(self):
1912
    """Check prerequisites.
1913

1914
    """
1915

    
1916
  @staticmethod
1917
  def _DiagnoseByOS(node_list, rlist):
1918
    """Remaps a per-node return list into an a per-os per-node dictionary
1919

1920
    @param node_list: a list with the names of all nodes
1921
    @param rlist: a map with node names as keys and OS objects as values
1922

1923
    @rtype: dict
1924
    @return: a dictionary with osnames as keys and as value another map, with
1925
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
1926

1927
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1928
                                     (/srv/..., False, "invalid api")],
1929
                           "node2": [(/srv/..., True, "")]}
1930
          }
1931

1932
    """
1933
    all_os = {}
1934
    # we build here the list of nodes that didn't fail the RPC (at RPC
1935
    # level), so that nodes with a non-responding node daemon don't
1936
    # make all OSes invalid
1937
    good_nodes = [node_name for node_name in rlist
1938
                  if not rlist[node_name].fail_msg]
1939
    for node_name, nr in rlist.items():
1940
      if nr.fail_msg or not nr.payload:
1941
        continue
1942
      for name, path, status, diagnose in nr.payload:
1943
        if name not in all_os:
1944
          # build a list of nodes for this os containing empty lists
1945
          # for each node in node_list
1946
          all_os[name] = {}
1947
          for nname in good_nodes:
1948
            all_os[name][nname] = []
1949
        all_os[name][node_name].append((path, status, diagnose))
1950
    return all_os
1951

    
1952
  def Exec(self, feedback_fn):
1953
    """Compute the list of OSes.
1954

1955
    """
1956
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1957
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1958
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1959
    output = []
1960
    for os_name, os_data in pol.items():
1961
      row = []
1962
      for field in self.op.output_fields:
1963
        if field == "name":
1964
          val = os_name
1965
        elif field == "valid":
1966
          val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1967
        elif field == "node_status":
1968
          # this is just a copy of the dict
1969
          val = {}
1970
          for node_name, nos_list in os_data.items():
1971
            val[node_name] = nos_list
1972
        else:
1973
          raise errors.ParameterError(field)
1974
        row.append(val)
1975
      output.append(row)
1976

    
1977
    return output
1978

    
1979

    
1980
class LURemoveNode(LogicalUnit):
1981
  """Logical unit for removing a node.
1982

1983
  """
1984
  HPATH = "node-remove"
1985
  HTYPE = constants.HTYPE_NODE
1986
  _OP_REQP = ["node_name"]
1987

    
1988
  def BuildHooksEnv(self):
1989
    """Build hooks env.
1990

1991
    This doesn't run on the target node in the pre phase as a failed
1992
    node would then be impossible to remove.
1993

1994
    """
1995
    env = {
1996
      "OP_TARGET": self.op.node_name,
1997
      "NODE_NAME": self.op.node_name,
1998
      }
1999
    all_nodes = self.cfg.GetNodeList()
2000
    all_nodes.remove(self.op.node_name)
2001
    return env, all_nodes, all_nodes
2002

    
2003
  def CheckPrereq(self):
2004
    """Check prerequisites.
2005

2006
    This checks:
2007
     - the node exists in the configuration
2008
     - it does not have primary or secondary instances
2009
     - it's not the master
2010

2011
    Any errors are signaled by raising errors.OpPrereqError.
2012

2013
    """
2014
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2015
    if node is None:
2016
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2017

    
2018
    instance_list = self.cfg.GetInstanceList()
2019

    
2020
    masternode = self.cfg.GetMasterNode()
2021
    if node.name == masternode:
2022
      raise errors.OpPrereqError("Node is the master node,"
2023
                                 " you need to failover first.")
2024

    
2025
    for instance_name in instance_list:
2026
      instance = self.cfg.GetInstanceInfo(instance_name)
2027
      if node.name in instance.all_nodes:
2028
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2029
                                   " please remove first." % instance_name)
2030
    self.op.node_name = node.name
2031
    self.node = node
2032

    
2033
  def Exec(self, feedback_fn):
2034
    """Removes the node from the cluster.
2035

2036
    """
2037
    node = self.node
2038
    logging.info("Stopping the node daemon and removing configs from node %s",
2039
                 node.name)
2040

    
2041
    self.context.RemoveNode(node.name)
2042

    
2043
    result = self.rpc.call_node_leave_cluster(node.name)
2044
    msg = result.fail_msg
2045
    if msg:
2046
      self.LogWarning("Errors encountered on the remote node while leaving"
2047
                      " the cluster: %s", msg)
2048

    
2049
    # Promote nodes to master candidate as needed
2050
    _AdjustCandidatePool(self)
2051

    
2052

    
2053
class LUQueryNodes(NoHooksLU):
2054
  """Logical unit for querying nodes.
2055

2056
  """
2057
  _OP_REQP = ["output_fields", "names", "use_locking"]
2058
  REQ_BGL = False
2059
  _FIELDS_DYNAMIC = utils.FieldSet(
2060
    "dtotal", "dfree",
2061
    "mtotal", "mnode", "mfree",
2062
    "bootid",
2063
    "ctotal", "cnodes", "csockets",
2064
    )
2065

    
2066
  _FIELDS_STATIC = utils.FieldSet(
2067
    "name", "pinst_cnt", "sinst_cnt",
2068
    "pinst_list", "sinst_list",
2069
    "pip", "sip", "tags",
2070
    "serial_no",
2071
    "master_candidate",
2072
    "master",
2073
    "offline",
2074
    "drained",
2075
    "role",
2076
    )
2077

    
2078
  def ExpandNames(self):
2079
    _CheckOutputFields(static=self._FIELDS_STATIC,
2080
                       dynamic=self._FIELDS_DYNAMIC,
2081
                       selected=self.op.output_fields)
2082

    
2083
    self.needed_locks = {}
2084
    self.share_locks[locking.LEVEL_NODE] = 1
2085

    
2086
    if self.op.names:
2087
      self.wanted = _GetWantedNodes(self, self.op.names)
2088
    else:
2089
      self.wanted = locking.ALL_SET
2090

    
2091
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2092
    self.do_locking = self.do_node_query and self.op.use_locking
2093
    if self.do_locking:
2094
      # if we don't request only static fields, we need to lock the nodes
2095
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2096

    
2097

    
2098
  def CheckPrereq(self):
2099
    """Check prerequisites.
2100

2101
    """
2102
    # The validation of the node list is done in the _GetWantedNodes,
2103
    # if non empty, and if empty, there's no validation to do
2104
    pass
2105

    
2106
  def Exec(self, feedback_fn):
2107
    """Computes the list of nodes and their attributes.
2108

2109
    """
2110
    all_info = self.cfg.GetAllNodesInfo()
2111
    if self.do_locking:
2112
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2113
    elif self.wanted != locking.ALL_SET:
2114
      nodenames = self.wanted
2115
      missing = set(nodenames).difference(all_info.keys())
2116
      if missing:
2117
        raise errors.OpExecError(
2118
          "Some nodes were removed before retrieving their data: %s" % missing)
2119
    else:
2120
      nodenames = all_info.keys()
2121

    
2122
    nodenames = utils.NiceSort(nodenames)
2123
    nodelist = [all_info[name] for name in nodenames]
2124

    
2125
    # begin data gathering
2126

    
2127
    if self.do_node_query:
2128
      live_data = {}
2129
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2130
                                          self.cfg.GetHypervisorType())
2131
      for name in nodenames:
2132
        nodeinfo = node_data[name]
2133
        if not nodeinfo.fail_msg and nodeinfo.payload:
2134
          nodeinfo = nodeinfo.payload
2135
          fn = utils.TryConvert
2136
          live_data[name] = {
2137
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2138
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2139
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2140
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2141
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2142
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2143
            "bootid": nodeinfo.get('bootid', None),
2144
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2145
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2146
            }
2147
        else:
2148
          live_data[name] = {}
2149
    else:
2150
      live_data = dict.fromkeys(nodenames, {})
2151

    
2152
    node_to_primary = dict([(name, set()) for name in nodenames])
2153
    node_to_secondary = dict([(name, set()) for name in nodenames])
2154

    
2155
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2156
                             "sinst_cnt", "sinst_list"))
2157
    if inst_fields & frozenset(self.op.output_fields):
2158
      instancelist = self.cfg.GetInstanceList()
2159

    
2160
      for instance_name in instancelist:
2161
        inst = self.cfg.GetInstanceInfo(instance_name)
2162
        if inst.primary_node in node_to_primary:
2163
          node_to_primary[inst.primary_node].add(inst.name)
2164
        for secnode in inst.secondary_nodes:
2165
          if secnode in node_to_secondary:
2166
            node_to_secondary[secnode].add(inst.name)
2167

    
2168
    master_node = self.cfg.GetMasterNode()
2169

    
2170
    # end data gathering
2171

    
2172
    output = []
2173
    for node in nodelist:
2174
      node_output = []
2175
      for field in self.op.output_fields:
2176
        if field == "name":
2177
          val = node.name
2178
        elif field == "pinst_list":
2179
          val = list(node_to_primary[node.name])
2180
        elif field == "sinst_list":
2181
          val = list(node_to_secondary[node.name])
2182
        elif field == "pinst_cnt":
2183
          val = len(node_to_primary[node.name])
2184
        elif field == "sinst_cnt":
2185
          val = len(node_to_secondary[node.name])
2186
        elif field == "pip":
2187
          val = node.primary_ip
2188
        elif field == "sip":
2189
          val = node.secondary_ip
2190
        elif field == "tags":
2191
          val = list(node.GetTags())
2192
        elif field == "serial_no":
2193
          val = node.serial_no
2194
        elif field == "master_candidate":
2195
          val = node.master_candidate
2196
        elif field == "master":
2197
          val = node.name == master_node
2198
        elif field == "offline":
2199
          val = node.offline
2200
        elif field == "drained":
2201
          val = node.drained
2202
        elif self._FIELDS_DYNAMIC.Matches(field):
2203
          val = live_data[node.name].get(field, None)
2204
        elif field == "role":
2205
          if node.name == master_node:
2206
            val = "M"
2207
          elif node.master_candidate:
2208
            val = "C"
2209
          elif node.drained:
2210
            val = "D"
2211
          elif node.offline:
2212
            val = "O"
2213
          else:
2214
            val = "R"
2215
        else:
2216
          raise errors.ParameterError(field)
2217
        node_output.append(val)
2218
      output.append(node_output)
2219

    
2220
    return output
2221

    
2222

    
2223
class LUQueryNodeVolumes(NoHooksLU):
2224
  """Logical unit for getting volumes on node(s).
2225

2226
  """
2227
  _OP_REQP = ["nodes", "output_fields"]
2228
  REQ_BGL = False
2229
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2230
  _FIELDS_STATIC = utils.FieldSet("node")
2231

    
2232
  def ExpandNames(self):
2233
    _CheckOutputFields(static=self._FIELDS_STATIC,
2234
                       dynamic=self._FIELDS_DYNAMIC,
2235
                       selected=self.op.output_fields)
2236

    
2237
    self.needed_locks = {}
2238
    self.share_locks[locking.LEVEL_NODE] = 1
2239
    if not self.op.nodes:
2240
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2241
    else:
2242
      self.needed_locks[locking.LEVEL_NODE] = \
2243
        _GetWantedNodes(self, self.op.nodes)
2244

    
2245
  def CheckPrereq(self):
2246
    """Check prerequisites.
2247

2248
    This checks that the fields required are valid output fields.
2249

2250
    """
2251
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2252

    
2253
  def Exec(self, feedback_fn):
2254
    """Computes the list of nodes and their attributes.
2255

2256
    """
2257
    nodenames = self.nodes
2258
    volumes = self.rpc.call_node_volumes(nodenames)
2259

    
2260
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2261
             in self.cfg.GetInstanceList()]
2262

    
2263
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2264

    
2265
    output = []
2266
    for node in nodenames:
2267
      nresult = volumes[node]
2268
      if nresult.offline:
2269
        continue
2270
      msg = nresult.fail_msg
2271
      if msg:
2272
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2273
        continue
2274

    
2275
      node_vols = nresult.payload[:]
2276
      node_vols.sort(key=lambda vol: vol['dev'])
2277

    
2278
      for vol in node_vols:
2279
        node_output = []
2280
        for field in self.op.output_fields:
2281
          if field == "node":
2282
            val = node
2283
          elif field == "phys":
2284
            val = vol['dev']
2285
          elif field == "vg":
2286
            val = vol['vg']
2287
          elif field == "name":
2288
            val = vol['name']
2289
          elif field == "size":
2290
            val = int(float(vol['size']))
2291
          elif field == "instance":
2292
            for inst in ilist:
2293
              if node not in lv_by_node[inst]:
2294
                continue
2295
              if vol['name'] in lv_by_node[inst][node]:
2296
                val = inst.name
2297
                break
2298
            else:
2299
              val = '-'
2300
          else:
2301
            raise errors.ParameterError(field)
2302
          node_output.append(str(val))
2303

    
2304
        output.append(node_output)
2305

    
2306
    return output
2307

    
2308

    
2309
class LUAddNode(LogicalUnit):
2310
  """Logical unit for adding node to the cluster.
2311

2312
  """
2313
  HPATH = "node-add"
2314
  HTYPE = constants.HTYPE_NODE
2315
  _OP_REQP = ["node_name"]
2316

    
2317
  def BuildHooksEnv(self):
2318
    """Build hooks env.
2319

2320
    This will run on all nodes before, and on all nodes + the new node after.
2321

2322
    """
2323
    env = {
2324
      "OP_TARGET": self.op.node_name,
2325
      "NODE_NAME": self.op.node_name,
2326
      "NODE_PIP": self.op.primary_ip,
2327
      "NODE_SIP": self.op.secondary_ip,
2328
      }
2329
    nodes_0 = self.cfg.GetNodeList()
2330
    nodes_1 = nodes_0 + [self.op.node_name, ]
2331
    return env, nodes_0, nodes_1
2332

    
2333
  def CheckPrereq(self):
2334
    """Check prerequisites.
2335

2336
    This checks:
2337
     - the new node is not already in the config
2338
     - it is resolvable
2339
     - its parameters (single/dual homed) matches the cluster
2340

2341
    Any errors are signaled by raising errors.OpPrereqError.
2342

2343
    """
2344
    node_name = self.op.node_name
2345
    cfg = self.cfg
2346

    
2347
    dns_data = utils.HostInfo(node_name)
2348

    
2349
    node = dns_data.name
2350
    primary_ip = self.op.primary_ip = dns_data.ip
2351
    secondary_ip = getattr(self.op, "secondary_ip", None)
2352
    if secondary_ip is None:
2353
      secondary_ip = primary_ip
2354
    if not utils.IsValidIP(secondary_ip):
2355
      raise errors.OpPrereqError("Invalid secondary IP given")
2356
    self.op.secondary_ip = secondary_ip
2357

    
2358
    node_list = cfg.GetNodeList()
2359
    if not self.op.readd and node in node_list:
2360
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2361
                                 node)
2362
    elif self.op.readd and node not in node_list:
2363
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2364

    
2365
    for existing_node_name in node_list:
2366
      existing_node = cfg.GetNodeInfo(existing_node_name)
2367

    
2368
      if self.op.readd and node == existing_node_name:
2369
        if (existing_node.primary_ip != primary_ip or
2370
            existing_node.secondary_ip != secondary_ip):
2371
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2372
                                     " address configuration as before")
2373
        continue
2374

    
2375
      if (existing_node.primary_ip == primary_ip or
2376
          existing_node.secondary_ip == primary_ip or
2377
          existing_node.primary_ip == secondary_ip or
2378
          existing_node.secondary_ip == secondary_ip):
2379
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2380
                                   " existing node %s" % existing_node.name)
2381

    
2382
    # check that the type of the node (single versus dual homed) is the
2383
    # same as for the master
2384
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2385
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2386
    newbie_singlehomed = secondary_ip == primary_ip
2387
    if master_singlehomed != newbie_singlehomed:
2388
      if master_singlehomed:
2389
        raise errors.OpPrereqError("The master has no private ip but the"
2390
                                   " new node has one")
2391
      else:
2392
        raise errors.OpPrereqError("The master has a private ip but the"
2393
                                   " new node doesn't have one")
2394

    
2395
    # checks reachability
2396
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2397
      raise errors.OpPrereqError("Node not reachable by ping")
2398

    
2399
    if not newbie_singlehomed:
2400
      # check reachability from my secondary ip to newbie's secondary ip
2401
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2402
                           source=myself.secondary_ip):
2403
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2404
                                   " based ping to noded port")
2405

    
2406
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2407
    if self.op.readd:
2408
      exceptions = [node]
2409
    else:
2410
      exceptions = []
2411
    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2412
    # the new node will increase mc_max with one, so:
2413
    mc_max = min(mc_max + 1, cp_size)
2414
    self.master_candidate = mc_now < mc_max
2415

    
2416
    if self.op.readd:
2417
      self.new_node = self.cfg.GetNodeInfo(node)
2418
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2419
    else:
2420
      self.new_node = objects.Node(name=node,
2421
                                   primary_ip=primary_ip,
2422
                                   secondary_ip=secondary_ip,
2423
                                   master_candidate=self.master_candidate,
2424
                                   offline=False, drained=False)
2425

    
2426
  def Exec(self, feedback_fn):
2427
    """Adds the new node to the cluster.
2428

2429
    """
2430
    new_node = self.new_node
2431
    node = new_node.name
2432

    
2433
    # for re-adds, reset the offline/drained/master-candidate flags;
2434
    # we need to reset here, otherwise offline would prevent RPC calls
2435
    # later in the procedure; this also means that if the re-add
2436
    # fails, we are left with a non-offlined, broken node
2437
    if self.op.readd:
2438
      new_node.drained = new_node.offline = False
2439
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2440
      # if we demote the node, we do cleanup later in the procedure
2441
      new_node.master_candidate = self.master_candidate
2442

    
2443
    # notify the user about any possible mc promotion
2444
    if new_node.master_candidate:
2445
      self.LogInfo("Node will be a master candidate")
2446

    
2447
    # check connectivity
2448
    result = self.rpc.call_version([node])[node]
2449
    result.Raise("Can't get version information from node %s" % node)
2450
    if constants.PROTOCOL_VERSION == result.payload:
2451
      logging.info("Communication to node %s fine, sw version %s match",
2452
                   node, result.payload)
2453
    else:
2454
      raise errors.OpExecError("Version mismatch master version %s,"
2455
                               " node version %s" %
2456
                               (constants.PROTOCOL_VERSION, result.payload))
2457

    
2458
    # setup ssh on node
2459
    logging.info("Copy ssh key to node %s", node)
2460
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2461
    keyarray = []
2462
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2463
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2464
                priv_key, pub_key]
2465

    
2466
    for i in keyfiles:
2467
      f = open(i, 'r')
2468
      try:
2469
        keyarray.append(f.read())
2470
      finally:
2471
        f.close()
2472

    
2473
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2474
                                    keyarray[2],
2475
                                    keyarray[3], keyarray[4], keyarray[5])
2476
    result.Raise("Cannot transfer ssh keys to the new node")
2477

    
2478
    # Add node to our /etc/hosts, and add key to known_hosts
2479
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2480
      utils.AddHostToEtcHosts(new_node.name)
2481

    
2482
    if new_node.secondary_ip != new_node.primary_ip:
2483
      result = self.rpc.call_node_has_ip_address(new_node.name,
2484
                                                 new_node.secondary_ip)
2485
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2486
                   prereq=True)
2487
      if not result.payload:
2488
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2489
                                 " you gave (%s). Please fix and re-run this"
2490
                                 " command." % new_node.secondary_ip)
2491

    
2492
    node_verify_list = [self.cfg.GetMasterNode()]
2493
    node_verify_param = {
2494
      'nodelist': [node],
2495
      # TODO: do a node-net-test as well?
2496
    }
2497

    
2498
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2499
                                       self.cfg.GetClusterName())
2500
    for verifier in node_verify_list:
2501
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2502
      nl_payload = result[verifier].payload['nodelist']
2503
      if nl_payload:
2504
        for failed in nl_payload:
2505
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2506
                      (verifier, nl_payload[failed]))
2507
        raise errors.OpExecError("ssh/hostname verification failed.")
2508

    
2509
    if self.op.readd:
2510
      _RedistributeAncillaryFiles(self)
2511
      self.context.ReaddNode(new_node)
2512
      # make sure we redistribute the config
2513
      self.cfg.Update(new_node)
2514
      # and make sure the new node will not have old files around
2515
      if not new_node.master_candidate:
2516
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2517
        msg = result.RemoteFailMsg()
2518
        if msg:
2519
          self.LogWarning("Node failed to demote itself from master"
2520
                          " candidate status: %s" % msg)
2521
    else:
2522
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2523
      self.context.AddNode(new_node)
2524

    
2525

    
2526
class LUSetNodeParams(LogicalUnit):
2527
  """Modifies the parameters of a node.
2528

2529
  """
2530
  HPATH = "node-modify"
2531
  HTYPE = constants.HTYPE_NODE
2532
  _OP_REQP = ["node_name"]
2533
  REQ_BGL = False
2534

    
2535
  def CheckArguments(self):
2536
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2537
    if node_name is None:
2538
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2539
    self.op.node_name = node_name
2540
    _CheckBooleanOpField(self.op, 'master_candidate')
2541
    _CheckBooleanOpField(self.op, 'offline')
2542
    _CheckBooleanOpField(self.op, 'drained')
2543
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2544
    if all_mods.count(None) == 3:
2545
      raise errors.OpPrereqError("Please pass at least one modification")
2546
    if all_mods.count(True) > 1:
2547
      raise errors.OpPrereqError("Can't set the node into more than one"
2548
                                 " state at the same time")
2549

    
2550
  def ExpandNames(self):
2551
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2552

    
2553
  def BuildHooksEnv(self):
2554
    """Build hooks env.
2555

2556
    This runs on the master node.
2557

2558
    """
2559
    env = {
2560
      "OP_TARGET": self.op.node_name,
2561
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2562
      "OFFLINE": str(self.op.offline),
2563
      "DRAINED": str(self.op.drained),
2564
      }
2565
    nl = [self.cfg.GetMasterNode(),
2566
          self.op.node_name]
2567
    return env, nl, nl
2568

    
2569
  def CheckPrereq(self):
2570
    """Check prerequisites.
2571

2572
    This only checks the instance list against the existing names.
2573

2574
    """
2575
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2576

    
2577
    if ((self.op.master_candidate == False or self.op.offline == True or
2578
         self.op.drained == True) and node.master_candidate):
2579
      # we will demote the node from master_candidate
2580
      if self.op.node_name == self.cfg.GetMasterNode():
2581
        raise errors.OpPrereqError("The master node has to be a"
2582
                                   " master candidate, online and not drained")
2583
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2584
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2585
      if num_candidates <= cp_size:
2586
        msg = ("Not enough master candidates (desired"
2587
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2588
        if self.op.force:
2589
          self.LogWarning(msg)
2590
        else:
2591
          raise errors.OpPrereqError(msg)
2592

    
2593
    if (self.op.master_candidate == True and
2594
        ((node.offline and not self.op.offline == False) or
2595
         (node.drained and not self.op.drained == False))):
2596
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2597
                                 " to master_candidate" % node.name)
2598

    
2599
    return
2600

    
2601
  def Exec(self, feedback_fn):
2602
    """Modifies a node.
2603

2604
    """
2605
    node = self.node
2606

    
2607
    result = []
2608
    changed_mc = False
2609

    
2610
    if self.op.offline is not None:
2611
      node.offline = self.op.offline
2612
      result.append(("offline", str(self.op.offline)))
2613
      if self.op.offline == True:
2614
        if node.master_candidate:
2615
          node.master_candidate = False
2616
          changed_mc = True
2617
          result.append(("master_candidate", "auto-demotion due to offline"))
2618
        if node.drained:
2619
          node.drained = False
2620
          result.append(("drained", "clear drained status due to offline"))
2621

    
2622
    if self.op.master_candidate is not None:
2623
      node.master_candidate = self.op.master_candidate
2624
      changed_mc = True
2625
      result.append(("master_candidate", str(self.op.master_candidate)))
2626
      if self.op.master_candidate == False:
2627
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2628
        msg = rrc.fail_msg
2629
        if msg:
2630
          self.LogWarning("Node failed to demote itself: %s" % msg)
2631

    
2632
    if self.op.drained is not None:
2633
      node.drained = self.op.drained
2634
      result.append(("drained", str(self.op.drained)))
2635
      if self.op.drained == True:
2636
        if node.master_candidate:
2637
          node.master_candidate = False
2638
          changed_mc = True
2639
          result.append(("master_candidate", "auto-demotion due to drain"))
2640
          rrc = self.rpc.call_node_demote_from_mc(node.name)
2641
          msg = rrc.RemoteFailMsg()
2642
          if msg:
2643
            self.LogWarning("Node failed to demote itself: %s" % msg)
2644
        if node.offline:
2645
          node.offline = False
2646
          result.append(("offline", "clear offline status due to drain"))
2647

    
2648
    # this will trigger configuration file update, if needed
2649
    self.cfg.Update(node)
2650
    # this will trigger job queue propagation or cleanup
2651
    if changed_mc:
2652
      self.context.ReaddNode(node)
2653

    
2654
    return result
2655

    
2656

    
2657
class LUPowercycleNode(NoHooksLU):
2658
  """Powercycles a node.
2659

2660
  """
2661
  _OP_REQP = ["node_name", "force"]
2662
  REQ_BGL = False
2663

    
2664
  def CheckArguments(self):
2665
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2666
    if node_name is None:
2667
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2668
    self.op.node_name = node_name
2669
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
2670
      raise errors.OpPrereqError("The node is the master and the force"
2671
                                 " parameter was not set")
2672

    
2673
  def ExpandNames(self):
2674
    """Locking for PowercycleNode.
2675

2676
    This is a last-resource option and shouldn't block on other
2677
    jobs. Therefore, we grab no locks.
2678

2679
    """
2680
    self.needed_locks = {}
2681

    
2682
  def CheckPrereq(self):
2683
    """Check prerequisites.
2684

2685
    This LU has no prereqs.
2686

2687
    """
2688
    pass
2689

    
2690
  def Exec(self, feedback_fn):
2691
    """Reboots a node.
2692

2693
    """
2694
    result = self.rpc.call_node_powercycle(self.op.node_name,
2695
                                           self.cfg.GetHypervisorType())
2696
    result.Raise("Failed to schedule the reboot")
2697
    return result.payload
2698

    
2699

    
2700
class LUQueryClusterInfo(NoHooksLU):
2701
  """Query cluster configuration.
2702

2703
  """
2704
  _OP_REQP = []
2705
  REQ_BGL = False
2706

    
2707
  def ExpandNames(self):
2708
    self.needed_locks = {}
2709

    
2710
  def CheckPrereq(self):
2711
    """No prerequsites needed for this LU.
2712

2713
    """
2714
    pass
2715

    
2716
  def Exec(self, feedback_fn):
2717
    """Return cluster config.
2718

2719
    """
2720
    cluster = self.cfg.GetClusterInfo()
2721
    result = {
2722
      "software_version": constants.RELEASE_VERSION,
2723
      "protocol_version": constants.PROTOCOL_VERSION,
2724
      "config_version": constants.CONFIG_VERSION,
2725
      "os_api_version": max(constants.OS_API_VERSIONS),
2726
      "export_version": constants.EXPORT_VERSION,
2727
      "architecture": (platform.architecture()[0], platform.machine()),
2728
      "name": cluster.cluster_name,
2729
      "master": cluster.master_node,
2730
      "default_hypervisor": cluster.enabled_hypervisors[0],
2731
      "enabled_hypervisors": cluster.enabled_hypervisors,
2732
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2733
                        for hypervisor_name in cluster.enabled_hypervisors]),
2734
      "beparams": cluster.beparams,
2735
      "nicparams": cluster.nicparams,
2736
      "candidate_pool_size": cluster.candidate_pool_size,
2737
      "master_netdev": cluster.master_netdev,
2738
      "volume_group_name": cluster.volume_group_name,
2739
      "file_storage_dir": cluster.file_storage_dir,
2740
      }
2741

    
2742
    return result
2743

    
2744

    
2745
class LUQueryConfigValues(NoHooksLU):
2746
  """Return configuration values.
2747

2748
  """
2749
  _OP_REQP = []
2750
  REQ_BGL = False
2751
  _FIELDS_DYNAMIC = utils.FieldSet()
2752
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2753

    
2754
  def ExpandNames(self):
2755
    self.needed_locks = {}
2756

    
2757
    _CheckOutputFields(static=self._FIELDS_STATIC,
2758
                       dynamic=self._FIELDS_DYNAMIC,
2759
                       selected=self.op.output_fields)
2760

    
2761
  def CheckPrereq(self):
2762
    """No prerequisites.
2763

2764
    """
2765
    pass
2766

    
2767
  def Exec(self, feedback_fn):
2768
    """Dump a representation of the cluster config to the standard output.
2769

2770
    """
2771
    values = []
2772
    for field in self.op.output_fields:
2773
      if field == "cluster_name":
2774
        entry = self.cfg.GetClusterName()
2775
      elif field == "master_node":
2776
        entry = self.cfg.GetMasterNode()
2777
      elif field == "drain_flag":
2778
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2779
      else:
2780
        raise errors.ParameterError(field)
2781
      values.append(entry)
2782
    return values
2783

    
2784

    
2785
class LUActivateInstanceDisks(NoHooksLU):
2786
  """Bring up an instance's disks.
2787

2788
  """
2789
  _OP_REQP = ["instance_name"]
2790
  REQ_BGL = False
2791

    
2792
  def ExpandNames(self):
2793
    self._ExpandAndLockInstance()
2794
    self.needed_locks[locking.LEVEL_NODE] = []
2795
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2796

    
2797
  def DeclareLocks(self, level):
2798
    if level == locking.LEVEL_NODE:
2799
      self._LockInstancesNodes()
2800

    
2801
  def CheckPrereq(self):
2802
    """Check prerequisites.
2803

2804
    This checks that the instance is in the cluster.
2805

2806
    """
2807
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2808
    assert self.instance is not None, \
2809
      "Cannot retrieve locked instance %s" % self.op.instance_name
2810
    _CheckNodeOnline(self, self.instance.primary_node)
2811

    
2812
  def Exec(self, feedback_fn):
2813
    """Activate the disks.
2814

2815
    """
2816
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2817
    if not disks_ok:
2818
      raise errors.OpExecError("Cannot activate block devices")
2819

    
2820
    return disks_info
2821

    
2822

    
2823
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2824
  """Prepare the block devices for an instance.
2825

2826
  This sets up the block devices on all nodes.
2827

2828
  @type lu: L{LogicalUnit}
2829
  @param lu: the logical unit on whose behalf we execute
2830
  @type instance: L{objects.Instance}
2831
  @param instance: the instance for whose disks we assemble
2832
  @type ignore_secondaries: boolean
2833
  @param ignore_secondaries: if true, errors on secondary nodes
2834
      won't result in an error return from the function
2835
  @return: False if the operation failed, otherwise a list of
2836
      (host, instance_visible_name, node_visible_name)
2837
      with the mapping from node devices to instance devices
2838

2839
  """
2840
  device_info = []
2841
  disks_ok = True
2842
  iname = instance.name
2843
  # With the two passes mechanism we try to reduce the window of
2844
  # opportunity for the race condition of switching DRBD to primary
2845
  # before handshaking occured, but we do not eliminate it
2846

    
2847
  # The proper fix would be to wait (with some limits) until the
2848
  # connection has been made and drbd transitions from WFConnection
2849
  # into any other network-connected state (Connected, SyncTarget,
2850
  # SyncSource, etc.)
2851

    
2852
  # 1st pass, assemble on all nodes in secondary mode
2853
  for inst_disk in instance.disks:
2854
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2855
      lu.cfg.SetDiskID(node_disk, node)
2856
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2857
      msg = result.fail_msg
2858
      if msg:
2859
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2860
                           " (is_primary=False, pass=1): %s",
2861
                           inst_disk.iv_name, node, msg)
2862
        if not ignore_secondaries:
2863
          disks_ok = False
2864

    
2865
  # FIXME: race condition on drbd migration to primary
2866

    
2867
  # 2nd pass, do only the primary node
2868
  for inst_disk in instance.disks:
2869
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2870
      if node != instance.primary_node:
2871
        continue
2872
      lu.cfg.SetDiskID(node_disk, node)
2873
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2874
      msg = result.fail_msg
2875
      if msg:
2876
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2877
                           " (is_primary=True, pass=2): %s",
2878
                           inst_disk.iv_name, node, msg)
2879
        disks_ok = False
2880
    device_info.append((instance.primary_node, inst_disk.iv_name,
2881
                        result.payload))
2882

    
2883
  # leave the disks configured for the primary node
2884
  # this is a workaround that would be fixed better by
2885
  # improving the logical/physical id handling
2886
  for disk in instance.disks:
2887
    lu.cfg.SetDiskID(disk, instance.primary_node)
2888

    
2889
  return disks_ok, device_info
2890

    
2891

    
2892
def _StartInstanceDisks(lu, instance, force):
2893
  """Start the disks of an instance.
2894

2895
  """
2896
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2897
                                           ignore_secondaries=force)
2898
  if not disks_ok:
2899
    _ShutdownInstanceDisks(lu, instance)
2900
    if force is not None and not force:
2901
      lu.proc.LogWarning("", hint="If the message above refers to a"
2902
                         " secondary node,"
2903
                         " you can retry the operation using '--force'.")
2904
    raise errors.OpExecError("Disk consistency error")
2905

    
2906

    
2907
class LUDeactivateInstanceDisks(NoHooksLU):
2908
  """Shutdown an instance's disks.
2909

2910
  """
2911
  _OP_REQP = ["instance_name"]
2912
  REQ_BGL = False
2913

    
2914
  def ExpandNames(self):
2915
    self._ExpandAndLockInstance()
2916
    self.needed_locks[locking.LEVEL_NODE] = []
2917
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2918

    
2919
  def DeclareLocks(self, level):
2920
    if level == locking.LEVEL_NODE:
2921
      self._LockInstancesNodes()
2922

    
2923
  def CheckPrereq(self):
2924
    """Check prerequisites.
2925

2926
    This checks that the instance is in the cluster.
2927

2928
    """
2929
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2930
    assert self.instance is not None, \
2931
      "Cannot retrieve locked instance %s" % self.op.instance_name
2932

    
2933
  def Exec(self, feedback_fn):
2934
    """Deactivate the disks
2935

2936
    """
2937
    instance = self.instance
2938
    _SafeShutdownInstanceDisks(self, instance)
2939

    
2940

    
2941
def _SafeShutdownInstanceDisks(lu, instance):
2942
  """Shutdown block devices of an instance.
2943

2944
  This function checks if an instance is running, before calling
2945
  _ShutdownInstanceDisks.
2946

2947
  """
2948
  pnode = instance.primary_node
2949
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2950
  ins_l.Raise("Can't contact node %s" % pnode)
2951

    
2952
  if instance.name in ins_l.payload:
2953
    raise errors.OpExecError("Instance is running, can't shutdown"
2954
                             " block devices.")
2955

    
2956
  _ShutdownInstanceDisks(lu, instance)
2957

    
2958

    
2959
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2960
  """Shutdown block devices of an instance.
2961

2962
  This does the shutdown on all nodes of the instance.
2963

2964
  If the ignore_primary is false, errors on the primary node are
2965
  ignored.
2966

2967
  """
2968
  all_result = True
2969
  for disk in instance.disks:
2970
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2971
      lu.cfg.SetDiskID(top_disk, node)
2972
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2973
      msg = result.fail_msg
2974
      if msg:
2975
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2976
                      disk.iv_name, node, msg)
2977
        if not ignore_primary or node != instance.primary_node:
2978
          all_result = False
2979
  return all_result
2980

    
2981

    
2982
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2983
  """Checks if a node has enough free memory.
2984

2985
  This function check if a given node has the needed amount of free
2986
  memory. In case the node has less memory or we cannot get the
2987
  information from the node, this function raise an OpPrereqError
2988
  exception.
2989

2990
  @type lu: C{LogicalUnit}
2991
  @param lu: a logical unit from which we get configuration data
2992
  @type node: C{str}
2993
  @param node: the node to check
2994
  @type reason: C{str}
2995
  @param reason: string to use in the error message
2996
  @type requested: C{int}
2997
  @param requested: the amount of memory in MiB to check for
2998
  @type hypervisor_name: C{str}
2999
  @param hypervisor_name: the hypervisor to ask for memory stats
3000
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3001
      we cannot check the node
3002

3003
  """
3004
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3005
  nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3006
  free_mem = nodeinfo[node].payload.get('memory_free', None)
3007
  if not isinstance(free_mem, int):
3008
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3009
                               " was '%s'" % (node, free_mem))
3010
  if requested > free_mem:
3011
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3012
                               " needed %s MiB, available %s MiB" %
3013
                               (node, reason, requested, free_mem))
3014

    
3015

    
3016
class LUStartupInstance(LogicalUnit):
3017
  """Starts an instance.
3018

3019
  """
3020
  HPATH = "instance-start"
3021
  HTYPE = constants.HTYPE_INSTANCE
3022
  _OP_REQP = ["instance_name", "force"]
3023
  REQ_BGL = False
3024

    
3025
  def ExpandNames(self):
3026
    self._ExpandAndLockInstance()
3027

    
3028
  def BuildHooksEnv(self):
3029
    """Build hooks env.
3030

3031
    This runs on master, primary and secondary nodes of the instance.
3032

3033
    """
3034
    env = {
3035
      "FORCE": self.op.force,
3036
      }
3037
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3038
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3039
    return env, nl, nl
3040

    
3041
  def CheckPrereq(self):
3042
    """Check prerequisites.
3043

3044
    This checks that the instance is in the cluster.
3045

3046
    """
3047
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3048
    assert self.instance is not None, \
3049
      "Cannot retrieve locked instance %s" % self.op.instance_name
3050

    
3051
    # extra beparams
3052
    self.beparams = getattr(self.op, "beparams", {})
3053
    if self.beparams:
3054
      if not isinstance(self.beparams, dict):
3055
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3056
                                   " dict" % (type(self.beparams), ))
3057
      # fill the beparams dict
3058
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3059
      self.op.beparams = self.beparams
3060

    
3061
    # extra hvparams
3062
    self.hvparams = getattr(self.op, "hvparams", {})
3063
    if self.hvparams:
3064
      if not isinstance(self.hvparams, dict):
3065
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3066
                                   " dict" % (type(self.hvparams), ))
3067

    
3068
      # check hypervisor parameter syntax (locally)
3069
      cluster = self.cfg.GetClusterInfo()
3070
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3071
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3072
                                    instance.hvparams)
3073
      filled_hvp.update(self.hvparams)
3074
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3075
      hv_type.CheckParameterSyntax(filled_hvp)
3076
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3077
      self.op.hvparams = self.hvparams
3078

    
3079
    _CheckNodeOnline(self, instance.primary_node)
3080

    
3081
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3082
    # check bridges existence
3083
    _CheckInstanceBridgesExist(self, instance)
3084

    
3085
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3086
                                              instance.name,
3087
                                              instance.hypervisor)
3088
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3089
                      prereq=True)
3090
    if not remote_info.payload: # not running already
3091
      _CheckNodeFreeMemory(self, instance.primary_node,
3092
                           "starting instance %s" % instance.name,
3093
                           bep[constants.BE_MEMORY], instance.hypervisor)
3094

    
3095
  def Exec(self, feedback_fn):
3096
    """Start the instance.
3097

3098
    """
3099
    instance = self.instance
3100
    force = self.op.force
3101

    
3102
    self.cfg.MarkInstanceUp(instance.name)
3103

    
3104
    node_current = instance.primary_node
3105

    
3106
    _StartInstanceDisks(self, instance, force)
3107

    
3108
    result = self.rpc.call_instance_start(node_current, instance,
3109
                                          self.hvparams, self.beparams)
3110
    msg = result.fail_msg
3111
    if msg:
3112
      _ShutdownInstanceDisks(self, instance)
3113
      raise errors.OpExecError("Could not start instance: %s" % msg)
3114

    
3115

    
3116
class LURebootInstance(LogicalUnit):
3117
  """Reboot an instance.
3118

3119
  """
3120
  HPATH = "instance-reboot"
3121
  HTYPE = constants.HTYPE_INSTANCE
3122
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3123
  REQ_BGL = False
3124

    
3125
  def ExpandNames(self):
3126
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3127
                                   constants.INSTANCE_REBOOT_HARD,
3128
                                   constants.INSTANCE_REBOOT_FULL]:
3129
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3130
                                  (constants.INSTANCE_REBOOT_SOFT,
3131
                                   constants.INSTANCE_REBOOT_HARD,
3132
                                   constants.INSTANCE_REBOOT_FULL))
3133
    self._ExpandAndLockInstance()
3134

    
3135
  def BuildHooksEnv(self):
3136
    """Build hooks env.
3137

3138
    This runs on master, primary and secondary nodes of the instance.
3139

3140
    """
3141
    env = {
3142
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3143
      "REBOOT_TYPE": self.op.reboot_type,
3144
      }
3145
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3146
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3147
    return env, nl, nl
3148

    
3149
  def CheckPrereq(self):
3150
    """Check prerequisites.
3151

3152
    This checks that the instance is in the cluster.
3153

3154
    """
3155
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3156
    assert self.instance is not None, \
3157
      "Cannot retrieve locked instance %s" % self.op.instance_name
3158

    
3159
    _CheckNodeOnline(self, instance.primary_node)
3160

    
3161
    # check bridges existence
3162
    _CheckInstanceBridgesExist(self, instance)
3163

    
3164
  def Exec(self, feedback_fn):
3165
    """Reboot the instance.
3166

3167
    """
3168
    instance = self.instance
3169
    ignore_secondaries = self.op.ignore_secondaries
3170
    reboot_type = self.op.reboot_type
3171

    
3172
    node_current = instance.primary_node
3173

    
3174
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3175
                       constants.INSTANCE_REBOOT_HARD]:
3176
      for disk in instance.disks:
3177
        self.cfg.SetDiskID(disk, node_current)
3178
      result = self.rpc.call_instance_reboot(node_current, instance,
3179
                                             reboot_type)
3180
      result.Raise("Could not reboot instance")
3181
    else:
3182
      result = self.rpc.call_instance_shutdown(node_current, instance)
3183
      result.Raise("Could not shutdown instance for full reboot")
3184
      _ShutdownInstanceDisks(self, instance)
3185
      _StartInstanceDisks(self, instance, ignore_secondaries)
3186
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3187
      msg = result.fail_msg
3188
      if msg:
3189
        _ShutdownInstanceDisks(self, instance)
3190
        raise errors.OpExecError("Could not start instance for"
3191
                                 " full reboot: %s" % msg)
3192

    
3193
    self.cfg.MarkInstanceUp(instance.name)
3194

    
3195

    
3196
class LUShutdownInstance(LogicalUnit):
3197
  """Shutdown an instance.
3198

3199
  """
3200
  HPATH = "instance-stop"
3201
  HTYPE = constants.HTYPE_INSTANCE
3202
  _OP_REQP = ["instance_name"]
3203
  REQ_BGL = False
3204

    
3205
  def ExpandNames(self):
3206
    self._ExpandAndLockInstance()
3207

    
3208
  def BuildHooksEnv(self):
3209
    """Build hooks env.
3210

3211
    This runs on master, primary and secondary nodes of the instance.
3212

3213
    """
3214
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3215
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3216
    return env, nl, nl
3217

    
3218
  def CheckPrereq(self):
3219
    """Check prerequisites.
3220

3221
    This checks that the instance is in the cluster.
3222

3223
    """
3224
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3225
    assert self.instance is not None, \
3226
      "Cannot retrieve locked instance %s" % self.op.instance_name
3227
    _CheckNodeOnline(self, self.instance.primary_node)
3228

    
3229
  def Exec(self, feedback_fn):
3230
    """Shutdown the instance.
3231

3232
    """
3233
    instance = self.instance
3234
    node_current = instance.primary_node
3235
    self.cfg.MarkInstanceDown(instance.name)
3236
    result = self.rpc.call_instance_shutdown(node_current, instance)
3237
    msg = result.fail_msg
3238
    if msg:
3239
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3240

    
3241
    _ShutdownInstanceDisks(self, instance)
3242

    
3243

    
3244
class LUReinstallInstance(LogicalUnit):
3245
  """Reinstall an instance.
3246

3247
  """
3248
  HPATH = "instance-reinstall"
3249
  HTYPE = constants.HTYPE_INSTANCE
3250
  _OP_REQP = ["instance_name"]
3251
  REQ_BGL = False
3252

    
3253
  def ExpandNames(self):
3254
    self._ExpandAndLockInstance()
3255

    
3256
  def BuildHooksEnv(self):
3257
    """Build hooks env.
3258

3259
    This runs on master, primary and secondary nodes of the instance.
3260

3261
    """
3262
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3263
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3264
    return env, nl, nl
3265

    
3266
  def CheckPrereq(self):
3267
    """Check prerequisites.
3268

3269
    This checks that the instance is in the cluster and is not running.
3270

3271
    """
3272
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3273
    assert instance is not None, \
3274
      "Cannot retrieve locked instance %s" % self.op.instance_name
3275
    _CheckNodeOnline(self, instance.primary_node)
3276

    
3277
    if instance.disk_template == constants.DT_DISKLESS:
3278
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3279
                                 self.op.instance_name)
3280
    if instance.admin_up:
3281
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3282
                                 self.op.instance_name)
3283
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3284
                                              instance.name,
3285
                                              instance.hypervisor)
3286
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3287
                      prereq=True)
3288
    if remote_info.payload:
3289
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3290
                                 (self.op.instance_name,
3291
                                  instance.primary_node))
3292

    
3293
    self.op.os_type = getattr(self.op, "os_type", None)
3294
    if self.op.os_type is not None:
3295
      # OS verification
3296
      pnode = self.cfg.GetNodeInfo(
3297
        self.cfg.ExpandNodeName(instance.primary_node))
3298
      if pnode is None:
3299
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3300
                                   self.op.pnode)
3301
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3302
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3303
                   (self.op.os_type, pnode.name), prereq=True)
3304

    
3305
    self.instance = instance
3306

    
3307
  def Exec(self, feedback_fn):
3308
    """Reinstall the instance.
3309

3310
    """
3311
    inst = self.instance
3312

    
3313
    if self.op.os_type is not None:
3314
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3315
      inst.os = self.op.os_type
3316
      self.cfg.Update(inst)
3317

    
3318
    _StartInstanceDisks(self, inst, None)
3319
    try:
3320
      feedback_fn("Running the instance OS create scripts...")
3321
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3322
      result.Raise("Could not install OS for instance %s on node %s" %
3323
                   (inst.name, inst.primary_node))
3324
    finally:
3325
      _ShutdownInstanceDisks(self, inst)
3326

    
3327

    
3328
class LURenameInstance(LogicalUnit):
3329
  """Rename an instance.
3330

3331
  """
3332
  HPATH = "instance-rename"
3333
  HTYPE = constants.HTYPE_INSTANCE
3334
  _OP_REQP = ["instance_name", "new_name"]
3335

    
3336
  def BuildHooksEnv(self):
3337
    """Build hooks env.
3338

3339
    This runs on master, primary and secondary nodes of the instance.
3340

3341
    """
3342
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3343
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3344
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3345
    return env, nl, nl
3346

    
3347
  def CheckPrereq(self):
3348
    """Check prerequisites.
3349

3350
    This checks that the instance is in the cluster and is not running.
3351

3352
    """
3353
    instance = self.cfg.GetInstanceInfo(
3354
      self.cfg.ExpandInstanceName(self.op.instance_name))
3355
    if instance is None:
3356
      raise errors.OpPrereqError("Instance '%s' not known" %
3357
                                 self.op.instance_name)
3358
    _CheckNodeOnline(self, instance.primary_node)
3359

    
3360
    if instance.admin_up:
3361
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3362
                                 self.op.instance_name)
3363
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3364
                                              instance.name,
3365
                                              instance.hypervisor)
3366
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3367
                      prereq=True)
3368
    if remote_info.payload:
3369
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3370
                                 (self.op.instance_name,
3371
                                  instance.primary_node))
3372
    self.instance = instance
3373

    
3374
    # new name verification
3375
    name_info = utils.HostInfo(self.op.new_name)
3376

    
3377
    self.op.new_name = new_name = name_info.name
3378
    instance_list = self.cfg.GetInstanceList()
3379
    if new_name in instance_list:
3380
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3381
                                 new_name)
3382

    
3383
    if not getattr(self.op, "ignore_ip", False):
3384
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3385
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3386
                                   (name_info.ip, new_name))
3387

    
3388

    
3389
  def Exec(self, feedback_fn):
3390
    """Reinstall the instance.
3391

3392
    """
3393
    inst = self.instance
3394
    old_name = inst.name
3395

    
3396
    if inst.disk_template == constants.DT_FILE:
3397
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3398

    
3399
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3400
    # Change the instance lock. This is definitely safe while we hold the BGL
3401
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3402
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3403

    
3404
    # re-read the instance from the configuration after rename
3405
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3406

    
3407
    if inst.disk_template == constants.DT_FILE:
3408
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3409
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3410
                                                     old_file_storage_dir,
3411
                                                     new_file_storage_dir)
3412
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
3413
                   " (but the instance has been renamed in Ganeti)" %
3414
                   (inst.primary_node, old_file_storage_dir,
3415
                    new_file_storage_dir))
3416

    
3417
    _StartInstanceDisks(self, inst, None)
3418
    try:
3419
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3420
                                                 old_name)
3421
      msg = result.fail_msg
3422
      if msg:
3423
        msg = ("Could not run OS rename script for instance %s on node %s"
3424
               " (but the instance has been renamed in Ganeti): %s" %
3425
               (inst.name, inst.primary_node, msg))
3426
        self.proc.LogWarning(msg)
3427
    finally:
3428
      _ShutdownInstanceDisks(self, inst)
3429

    
3430

    
3431
class LURemoveInstance(LogicalUnit):
3432
  """Remove an instance.
3433

3434
  """
3435
  HPATH = "instance-remove"
3436
  HTYPE = constants.HTYPE_INSTANCE
3437
  _OP_REQP = ["instance_name", "ignore_failures"]
3438
  REQ_BGL = False
3439

    
3440
  def ExpandNames(self):
3441
    self._ExpandAndLockInstance()
3442
    self.needed_locks[locking.LEVEL_NODE] = []
3443
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3444

    
3445
  def DeclareLocks(self, level):
3446
    if level == locking.LEVEL_NODE:
3447
      self._LockInstancesNodes()
3448

    
3449
  def BuildHooksEnv(self):
3450
    """Build hooks env.
3451

3452
    This runs on master, primary and secondary nodes of the instance.
3453

3454
    """
3455
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3456
    nl = [self.cfg.GetMasterNode()]
3457
    return env, nl, nl
3458

    
3459
  def CheckPrereq(self):
3460
    """Check prerequisites.
3461

3462
    This checks that the instance is in the cluster.
3463

3464
    """
3465
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3466
    assert self.instance is not None, \
3467
      "Cannot retrieve locked instance %s" % self.op.instance_name
3468

    
3469
  def Exec(self, feedback_fn):
3470
    """Remove the instance.
3471

3472
    """
3473
    instance = self.instance
3474
    logging.info("Shutting down instance %s on node %s",
3475
                 instance.name, instance.primary_node)
3476

    
3477
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3478
    msg = result.fail_msg
3479
    if msg:
3480
      if self.op.ignore_failures:
3481
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3482
      else:
3483
        raise errors.OpExecError("Could not shutdown instance %s on"
3484
                                 " node %s: %s" %
3485
                                 (instance.name, instance.primary_node, msg))
3486

    
3487
    logging.info("Removing block devices for instance %s", instance.name)
3488

    
3489
    if not _RemoveDisks(self, instance):
3490
      if self.op.ignore_failures:
3491
        feedback_fn("Warning: can't remove instance's disks")
3492
      else:
3493
        raise errors.OpExecError("Can't remove instance's disks")
3494

    
3495
    logging.info("Removing instance %s out of cluster config", instance.name)
3496

    
3497
    self.cfg.RemoveInstance(instance.name)
3498
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3499

    
3500

    
3501
class LUQueryInstances(NoHooksLU):
3502
  """Logical unit for querying instances.
3503

3504
  """
3505
  _OP_REQP = ["output_fields", "names", "use_locking"]
3506
  REQ_BGL = False
3507
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3508
                                    "admin_state",
3509
                                    "disk_template", "ip", "mac", "bridge",
3510
                                    "nic_mode", "nic_link",
3511
                                    "sda_size", "sdb_size", "vcpus", "tags",
3512
                                    "network_port", "beparams",
3513
                                    r"(disk)\.(size)/([0-9]+)",
3514
                                    r"(disk)\.(sizes)", "disk_usage",
3515
                                    r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3516
                                    r"(nic)\.(bridge)/([0-9]+)",
3517
                                    r"(nic)\.(macs|ips|modes|links|bridges)",
3518
                                    r"(disk|nic)\.(count)",
3519
                                    "serial_no", "hypervisor", "hvparams",] +
3520
                                  ["hv/%s" % name
3521
                                   for name in constants.HVS_PARAMETERS] +
3522
                                  ["be/%s" % name
3523
                                   for name in constants.BES_PARAMETERS])
3524
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3525

    
3526

    
3527
  def ExpandNames(self):
3528
    _CheckOutputFields(static=self._FIELDS_STATIC,
3529
                       dynamic=self._FIELDS_DYNAMIC,
3530
                       selected=self.op.output_fields)
3531

    
3532
    self.needed_locks = {}
3533
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3534
    self.share_locks[locking.LEVEL_NODE] = 1
3535

    
3536
    if self.op.names:
3537
      self.wanted = _GetWantedInstances(self, self.op.names)
3538
    else:
3539
      self.wanted = locking.ALL_SET
3540

    
3541
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3542
    self.do_locking = self.do_node_query and self.op.use_locking
3543
    if self.do_locking:
3544
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3545
      self.needed_locks[locking.LEVEL_NODE] = []
3546
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3547

    
3548
  def DeclareLocks(self, level):
3549
    if level == locking.LEVEL_NODE and self.do_locking:
3550
      self._LockInstancesNodes()
3551

    
3552
  def CheckPrereq(self):
3553
    """Check prerequisites.
3554

3555
    """
3556
    pass
3557

    
3558
  def Exec(self, feedback_fn):
3559
    """Computes the list of nodes and their attributes.
3560

3561
    """
3562
    all_info = self.cfg.GetAllInstancesInfo()
3563
    if self.wanted == locking.ALL_SET:
3564
      # caller didn't specify instance names, so ordering is not important
3565
      if self.do_locking:
3566
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3567
      else:
3568
        instance_names = all_info.keys()
3569
      instance_names = utils.NiceSort(instance_names)
3570
    else:
3571
      # caller did specify names, so we must keep the ordering
3572
      if self.do_locking:
3573
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3574
      else:
3575
        tgt_set = all_info.keys()
3576
      missing = set(self.wanted).difference(tgt_set)
3577
      if missing:
3578
        raise errors.OpExecError("Some instances were removed before"
3579
                                 " retrieving their data: %s" % missing)
3580
      instance_names = self.wanted
3581

    
3582
    instance_list = [all_info[iname] for iname in instance_names]
3583

    
3584
    # begin data gathering
3585

    
3586
    nodes = frozenset([inst.primary_node for inst in instance_list])
3587
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3588

    
3589
    bad_nodes = []
3590
    off_nodes = []
3591
    if self.do_node_query:
3592
      live_data = {}
3593
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3594
      for name in nodes:
3595
        result = node_data[name]
3596
        if result.offline:
3597
          # offline nodes will be in both lists
3598
          off_nodes.append(name)
3599
        if result.failed or result.fail_msg:
3600
          bad_nodes.append(name)
3601
        else:
3602
          if result.payload:
3603
            live_data.update(result.payload)
3604
          # else no instance is alive
3605
    else:
3606
      live_data = dict([(name, {}) for name in instance_names])
3607

    
3608
    # end data gathering
3609

    
3610
    HVPREFIX = "hv/"
3611
    BEPREFIX = "be/"
3612
    output = []
3613
    cluster = self.cfg.GetClusterInfo()
3614
    for instance in instance_list:
3615
      iout = []
3616
      i_hv = cluster.FillHV(instance)
3617
      i_be = cluster.FillBE(instance)
3618
      i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3619
                                 nic.nicparams) for nic in instance.nics]
3620
      for field in self.op.output_fields:
3621
        st_match = self._FIELDS_STATIC.Matches(field)
3622
        if field == "name":
3623
          val = instance.name
3624
        elif field == "os":
3625
          val = instance.os
3626
        elif field == "pnode":
3627
          val = instance.primary_node
3628
        elif field == "snodes":
3629
          val = list(instance.secondary_nodes)
3630
        elif field == "admin_state":
3631
          val = instance.admin_up
3632
        elif field == "oper_state":
3633
          if instance.primary_node in bad_nodes:
3634
            val = None
3635
          else:
3636
            val = bool(live_data.get(instance.name))
3637
        elif field == "status":
3638
          if instance.primary_node in off_nodes:
3639
            val = "ERROR_nodeoffline"
3640
          elif instance.primary_node in bad_nodes:
3641
            val = "ERROR_nodedown"
3642
          else:
3643
            running = bool(live_data.get(instance.name))
3644
            if running:
3645
              if instance.admin_up:
3646
                val = "running"
3647
              else:
3648
                val = "ERROR_up"
3649
            else:
3650
              if instance.admin_up:
3651
                val = "ERROR_down"
3652
              else:
3653
                val = "ADMIN_down"
3654
        elif field == "oper_ram":
3655
          if instance.primary_node in bad_nodes:
3656
            val = None
3657
          elif instance.name in live_data:
3658
            val = live_data[instance.name].get("memory", "?")
3659
          else:
3660
            val = "-"
3661
        elif field == "vcpus":
3662
          val = i_be[constants.BE_VCPUS]
3663
        elif field == "disk_template":
3664
          val = instance.disk_template
3665
        elif field == "ip":
3666
          if instance.nics:
3667
            val = instance.nics[0].ip
3668
          else:
3669
            val = None
3670
        elif field == "nic_mode":
3671
          if instance.nics:
3672
            val = i_nicp[0][constants.NIC_MODE]
3673
          else:
3674
            val = None
3675
        elif field == "nic_link":
3676
          if instance.nics:
3677
            val = i_nicp[0][constants.NIC_LINK]
3678
          else:
3679
            val = None
3680
        elif field == "bridge":
3681
          if (instance.nics and
3682
              i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3683
            val = i_nicp[0][constants.NIC_LINK]
3684
          else:
3685
            val = None
3686
        elif field == "mac":
3687
          if instance.nics:
3688
            val = instance.nics[0].mac
3689
          else:
3690
            val = None
3691
        elif field == "sda_size" or field == "sdb_size":
3692
          idx = ord(field[2]) - ord('a')
3693
          try:
3694
            val = instance.FindDisk(idx).size
3695
          except errors.OpPrereqError:
3696
            val = None
3697
        elif field == "disk_usage": # total disk usage per node
3698
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3699
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3700
        elif field == "tags":
3701
          val = list(instance.GetTags())
3702
        elif field == "serial_no":
3703
          val = instance.serial_no
3704
        elif field == "network_port":
3705
          val = instance.network_port
3706
        elif field == "hypervisor":
3707
          val = instance.hypervisor
3708
        elif field == "hvparams":
3709
          val = i_hv
3710
        elif (field.startswith(HVPREFIX) and
3711
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3712
          val = i_hv.get(field[len(HVPREFIX):], None)
3713
        elif field == "beparams":
3714
          val = i_be
3715
        elif (field.startswith(BEPREFIX) and
3716
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3717
          val = i_be.get(field[len(BEPREFIX):], None)
3718
        elif st_match and st_match.groups():
3719
          # matches a variable list
3720
          st_groups = st_match.groups()
3721
          if st_groups and st_groups[0] == "disk":
3722
            if st_groups[1] == "count":
3723
              val = len(instance.disks)
3724
            elif st_groups[1] == "sizes":
3725
              val = [disk.size for disk in instance.disks]
3726
            elif st_groups[1] == "size":
3727
              try:
3728
                val = instance.FindDisk(st_groups[2]).size
3729
              except errors.OpPrereqError:
3730
                val = None
3731
            else:
3732
              assert False, "Unhandled disk parameter"
3733
          elif st_groups[0] == "nic":
3734
            if st_groups[1] == "count":
3735
              val = len(instance.nics)
3736
            elif st_groups[1] == "macs":
3737
              val = [nic.mac for nic in instance.nics]
3738
            elif st_groups[1] == "ips":
3739
              val = [nic.ip for nic in instance.nics]
3740
            elif st_groups[1] == "modes":
3741
              val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3742
            elif st_groups[1] == "links":
3743
              val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3744
            elif st_groups[1] == "bridges":
3745
              val = []
3746
              for nicp in i_nicp:
3747
                if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3748
                  val.append(nicp[constants.NIC_LINK])
3749
                else:
3750
                  val.append(None)
3751
            else:
3752
              # index-based item
3753
              nic_idx = int(st_groups[2])
3754
              if nic_idx >= len(instance.nics):
3755
                val = None
3756
              else:
3757
                if st_groups[1] == "mac":
3758
                  val = instance.nics[nic_idx].mac
3759
                elif st_groups[1] == "ip":
3760
                  val = instance.nics[nic_idx].ip
3761
                elif st_groups[1] == "mode":
3762
                  val = i_nicp[nic_idx][constants.NIC_MODE]
3763
                elif st_groups[1] == "link":
3764
                  val = i_nicp[nic_idx][constants.NIC_LINK]
3765
                elif st_groups[1] == "bridge":
3766
                  nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3767
                  if nic_mode == constants.NIC_MODE_BRIDGED:
3768
                    val = i_nicp[nic_idx][constants.NIC_LINK]
3769
                  else:
3770
                    val = None
3771
                else:
3772
                  assert False, "Unhandled NIC parameter"
3773
          else:
3774
            assert False, ("Declared but unhandled variable parameter '%s'" %
3775
                           field)
3776
        else:
3777
          assert False, "Declared but unhandled parameter '%s'" % field
3778
        iout.append(val)
3779
      output.append(iout)
3780

    
3781
    return output
3782

    
3783

    
3784
class LUFailoverInstance(LogicalUnit):
3785
  """Failover an instance.
3786

3787
  """
3788
  HPATH = "instance-failover"
3789
  HTYPE = constants.HTYPE_INSTANCE
3790
  _OP_REQP = ["instance_name", "ignore_consistency"]
3791
  REQ_BGL = False
3792

    
3793
  def ExpandNames(self):
3794
    self._ExpandAndLockInstance()
3795
    self.needed_locks[locking.LEVEL_NODE] = []
3796
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3797

    
3798
  def DeclareLocks(self, level):
3799
    if level == locking.LEVEL_NODE:
3800
      self._LockInstancesNodes()
3801

    
3802
  def BuildHooksEnv(self):
3803
    """Build hooks env.
3804

3805
    This runs on master, primary and secondary nodes of the instance.
3806

3807
    """
3808
    env = {
3809
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3810
      }
3811
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3812
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3813
    return env, nl, nl
3814

    
3815
  def CheckPrereq(self):
3816
    """Check prerequisites.
3817

3818
    This checks that the instance is in the cluster.
3819

3820
    """
3821
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3822
    assert self.instance is not None, \
3823
      "Cannot retrieve locked instance %s" % self.op.instance_name
3824

    
3825
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3826
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3827
      raise errors.OpPrereqError("Instance's disk layout is not"
3828
                                 " network mirrored, cannot failover.")
3829

    
3830
    secondary_nodes = instance.secondary_nodes
3831
    if not secondary_nodes:
3832
      raise errors.ProgrammerError("no secondary node but using "
3833
                                   "a mirrored disk template")
3834

    
3835
    target_node = secondary_nodes[0]
3836
    _CheckNodeOnline(self, target_node)
3837
    _CheckNodeNotDrained(self, target_node)
3838
    if instance.admin_up:
3839
      # check memory requirements on the secondary node
3840
      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3841
                           instance.name, bep[constants.BE_MEMORY],
3842
                           instance.hypervisor)
3843
    else:
3844
      self.LogInfo("Not checking memory on the secondary node as"
3845
                   " instance will not be started")
3846

    
3847
    # check bridge existance
3848
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3849

    
3850
  def Exec(self, feedback_fn):
3851
    """Failover an instance.
3852

3853
    The failover is done by shutting it down on its present node and
3854
    starting it on the secondary.
3855

3856
    """
3857
    instance = self.instance
3858

    
3859
    source_node = instance.primary_node
3860
    target_node = instance.secondary_nodes[0]
3861

    
3862
    feedback_fn("* checking disk consistency between source and target")
3863
    for dev in instance.disks:
3864
      # for drbd, these are drbd over lvm
3865
      if not _CheckDiskConsistency(self, dev, target_node, False):
3866
        if instance.admin_up and not self.op.ignore_consistency:
3867
          raise errors.OpExecError("Disk %s is degraded on target node,"
3868
                                   " aborting failover." % dev.iv_name)
3869

    
3870
    feedback_fn("* shutting down instance on source node")
3871
    logging.info("Shutting down instance %s on node %s",
3872
                 instance.name, source_node)
3873

    
3874
    result = self.rpc.call_instance_shutdown(source_node, instance)
3875
    msg = result.fail_msg
3876
    if msg:
3877
      if self.op.ignore_consistency:
3878
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3879
                             " Proceeding anyway. Please make sure node"
3880
                             " %s is down. Error details: %s",
3881
                             instance.name, source_node, source_node, msg)
3882
      else:
3883
        raise errors.OpExecError("Could not shutdown instance %s on"
3884
                                 " node %s: %s" %
3885
                                 (instance.name, source_node, msg))
3886

    
3887
    feedback_fn("* deactivating the instance's disks on source node")
3888
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3889
      raise errors.OpExecError("Can't shut down the instance's disks.")
3890

    
3891
    instance.primary_node = target_node
3892
    # distribute new instance config to the other nodes
3893
    self.cfg.Update(instance)
3894

    
3895
    # Only start the instance if it's marked as up
3896
    if instance.admin_up:
3897
      feedback_fn("* activating the instance's disks on target node")
3898
      logging.info("Starting instance %s on node %s",
3899
                   instance.name, target_node)
3900

    
3901
      disks_ok, _ = _AssembleInstanceDisks(self, instance,
3902
                                               ignore_secondaries=True)
3903
      if not disks_ok:
3904
        _ShutdownInstanceDisks(self, instance)
3905
        raise errors.OpExecError("Can't activate the instance's disks")
3906

    
3907
      feedback_fn("* starting the instance on the target node")
3908
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3909
      msg = result.fail_msg
3910
      if msg:
3911
        _ShutdownInstanceDisks(self, instance)
3912
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3913
                                 (instance.name, target_node, msg))
3914

    
3915

    
3916
class LUMigrateInstance(LogicalUnit):
3917
  """Migrate an instance.
3918

3919
  This is migration without shutting down, compared to the failover,
3920
  which is done with shutdown.
3921

3922
  """
3923
  HPATH = "instance-migrate"
3924
  HTYPE = constants.HTYPE_INSTANCE
3925
  _OP_REQP = ["instance_name", "live", "cleanup"]
3926

    
3927
  REQ_BGL = False
3928

    
3929
  def ExpandNames(self):
3930
    self._ExpandAndLockInstance()
3931

    
3932
    self.needed_locks[locking.LEVEL_NODE] = []
3933
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3934

    
3935
    self._migrater = TLMigrateInstance(self, self.op.instance_name,
3936
                                       self.op.live, self.op.cleanup)
3937
    self.tasklets = [self._migrater]
3938

    
3939
  def DeclareLocks(self, level):
3940
    if level == locking.LEVEL_NODE:
3941
      self._LockInstancesNodes()
3942

    
3943
  def BuildHooksEnv(self):
3944
    """Build hooks env.
3945

3946
    This runs on master, primary and secondary nodes of the instance.
3947

3948
    """
3949
    instance = self._migrater.instance
3950
    env = _BuildInstanceHookEnvByObject(self, instance)
3951
    env["MIGRATE_LIVE"] = self.op.live
3952
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3953
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
3954
    return env, nl, nl
3955

    
3956

    
3957
class LUMigrateNode(LogicalUnit):
3958
  """Migrate all instances from a node.
3959

3960
  """
3961
  HPATH = "node-migrate"
3962
  HTYPE = constants.HTYPE_NODE
3963
  _OP_REQP = ["node_name", "live"]
3964
  REQ_BGL = False
3965

    
3966
  def ExpandNames(self):
3967
    self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
3968
    if self.op.node_name is None:
3969
      raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
3970

    
3971
    self.needed_locks = {
3972
      locking.LEVEL_NODE: [self.op.node_name],
3973
      }
3974

    
3975
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3976

    
3977
    # Create tasklets for migrating instances for all instances on this node
3978
    names = []
3979
    tasklets = []
3980

    
3981
    for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
3982
      logging.debug("Migrating instance %s", inst.name)
3983
      names.append(inst.name)
3984

    
3985
      tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
3986

    
3987
    self.tasklets = tasklets
3988

    
3989
    # Declare instance locks
3990
    self.needed_locks[locking.LEVEL_INSTANCE] = names
3991

    
3992
  def DeclareLocks(self, level):
3993
    if level == locking.LEVEL_NODE:
3994
      self._LockInstancesNodes()
3995