Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ ddfe2228

History | View | Annotate | Download (267 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq (except when tasklets are used)
51
    - implement Exec (except when tasklets are used)
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  @ivar dry_run_result: the value (if any) that will be returned to the caller
60
      in dry-run mode (signalled by opcode dry_run parameter)
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overridden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92
    self.LogStep = processor.LogStep
93
    # support for dry-run
94
    self.dry_run_result = None
95

    
96
    # Tasklets
97
    self.tasklets = None
98

    
99
    for attr_name in self._OP_REQP:
100
      attr_val = getattr(op, attr_name, None)
101
      if attr_val is None:
102
        raise errors.OpPrereqError("Required parameter '%s' missing" %
103
                                   attr_name)
104

    
105
    self.CheckArguments()
106

    
107
  def __GetSSH(self):
108
    """Returns the SshRunner object
109

110
    """
111
    if not self.__ssh:
112
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113
    return self.__ssh
114

    
115
  ssh = property(fget=__GetSSH)
116

    
117
  def CheckArguments(self):
118
    """Check syntactic validity for the opcode arguments.
119

120
    This method is for doing a simple syntactic check and ensure
121
    validity of opcode parameters, without any cluster-related
122
    checks. While the same can be accomplished in ExpandNames and/or
123
    CheckPrereq, doing these separate is better because:
124

125
      - ExpandNames is left as as purely a lock-related function
126
      - CheckPrereq is run after we have acquired locks (and possible
127
        waited for them)
128

129
    The function is allowed to change the self.op attribute so that
130
    later methods can no longer worry about missing parameters.
131

132
    """
133
    pass
134

    
135
  def ExpandNames(self):
136
    """Expand names for this LU.
137

138
    This method is called before starting to execute the opcode, and it should
139
    update all the parameters of the opcode to their canonical form (e.g. a
140
    short node name must be fully expanded after this method has successfully
141
    completed). This way locking, hooks, logging, ecc. can work correctly.
142

143
    LUs which implement this method must also populate the self.needed_locks
144
    member, as a dict with lock levels as keys, and a list of needed lock names
145
    as values. Rules:
146

147
      - use an empty dict if you don't need any lock
148
      - if you don't need any lock at a particular level omit that level
149
      - don't put anything for the BGL level
150
      - if you want all locks at a level use locking.ALL_SET as a value
151

152
    If you need to share locks (rather than acquire them exclusively) at one
153
    level you can modify self.share_locks, setting a true value (usually 1) for
154
    that level. By default locks are not shared.
155

156
    This function can also define a list of tasklets, which then will be
157
    executed in order instead of the usual LU-level CheckPrereq and Exec
158
    functions, if those are not defined by the LU.
159

160
    Examples::
161

162
      # Acquire all nodes and one instance
163
      self.needed_locks = {
164
        locking.LEVEL_NODE: locking.ALL_SET,
165
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
166
      }
167
      # Acquire just two nodes
168
      self.needed_locks = {
169
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
170
      }
171
      # Acquire no locks
172
      self.needed_locks = {} # No, you can't leave it to the default value None
173

174
    """
175
    # The implementation of this method is mandatory only if the new LU is
176
    # concurrent, so that old LUs don't need to be changed all at the same
177
    # time.
178
    if self.REQ_BGL:
179
      self.needed_locks = {} # Exclusive LUs don't need locks.
180
    else:
181
      raise NotImplementedError
182

    
183
  def DeclareLocks(self, level):
184
    """Declare LU locking needs for a level
185

186
    While most LUs can just declare their locking needs at ExpandNames time,
187
    sometimes there's the need to calculate some locks after having acquired
188
    the ones before. This function is called just before acquiring locks at a
189
    particular level, but after acquiring the ones at lower levels, and permits
190
    such calculations. It can be used to modify self.needed_locks, and by
191
    default it does nothing.
192

193
    This function is only called if you have something already set in
194
    self.needed_locks for the level.
195

196
    @param level: Locking level which is going to be locked
197
    @type level: member of ganeti.locking.LEVELS
198

199
    """
200

    
201
  def CheckPrereq(self):
202
    """Check prerequisites for this LU.
203

204
    This method should check that the prerequisites for the execution
205
    of this LU are fulfilled. It can do internode communication, but
206
    it should be idempotent - no cluster or system changes are
207
    allowed.
208

209
    The method should raise errors.OpPrereqError in case something is
210
    not fulfilled. Its return value is ignored.
211

212
    This method should also update all the parameters of the opcode to
213
    their canonical form if it hasn't been done by ExpandNames before.
214

215
    """
216
    if self.tasklets is not None:
217
      for (idx, tl) in enumerate(self.tasklets):
218
        logging.debug("Checking prerequisites for tasklet %s/%s",
219
                      idx + 1, len(self.tasklets))
220
        tl.CheckPrereq()
221
    else:
222
      raise NotImplementedError
223

    
224
  def Exec(self, feedback_fn):
225
    """Execute the LU.
226

227
    This method should implement the actual work. It should raise
228
    errors.OpExecError for failures that are somewhat dealt with in
229
    code, or expected.
230

231
    """
232
    if self.tasklets is not None:
233
      for (idx, tl) in enumerate(self.tasklets):
234
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
235
        tl.Exec(feedback_fn)
236
    else:
237
      raise NotImplementedError
238

    
239
  def BuildHooksEnv(self):
240
    """Build hooks environment for this LU.
241

242
    This method should return a three-node tuple consisting of: a dict
243
    containing the environment that will be used for running the
244
    specific hook for this LU, a list of node names on which the hook
245
    should run before the execution, and a list of node names on which
246
    the hook should run after the execution.
247

248
    The keys of the dict must not have 'GANETI_' prefixed as this will
249
    be handled in the hooks runner. Also note additional keys will be
250
    added by the hooks runner. If the LU doesn't define any
251
    environment, an empty dict (and not None) should be returned.
252

253
    No nodes should be returned as an empty list (and not None).
254

255
    Note that if the HPATH for a LU class is None, this function will
256
    not be called.
257

258
    """
259
    raise NotImplementedError
260

    
261
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262
    """Notify the LU about the results of its hooks.
263

264
    This method is called every time a hooks phase is executed, and notifies
265
    the Logical Unit about the hooks' result. The LU can then use it to alter
266
    its result based on the hooks.  By default the method does nothing and the
267
    previous result is passed back unchanged but any LU can define it if it
268
    wants to use the local cluster hook-scripts somehow.
269

270
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
271
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272
    @param hook_results: the results of the multi-node hooks rpc call
273
    @param feedback_fn: function used send feedback back to the caller
274
    @param lu_result: the previous Exec result this LU had, or None
275
        in the PRE phase
276
    @return: the new Exec result, based on the previous result
277
        and hook results
278

279
    """
280
    return lu_result
281

    
282
  def _ExpandAndLockInstance(self):
283
    """Helper function to expand and lock an instance.
284

285
    Many LUs that work on an instance take its name in self.op.instance_name
286
    and need to expand it and then declare the expanded name for locking. This
287
    function does it, and then updates self.op.instance_name to the expanded
288
    name. It also initializes needed_locks as a dict, if this hasn't been done
289
    before.
290

291
    """
292
    if self.needed_locks is None:
293
      self.needed_locks = {}
294
    else:
295
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296
        "_ExpandAndLockInstance called with instance-level locks set"
297
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298
    if expanded_name is None:
299
      raise errors.OpPrereqError("Instance '%s' not known" %
300
                                  self.op.instance_name)
301
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302
    self.op.instance_name = expanded_name
303

    
304
  def _LockInstancesNodes(self, primary_only=False):
305
    """Helper function to declare instances' nodes for locking.
306

307
    This function should be called after locking one or more instances to lock
308
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309
    with all primary or secondary nodes for instances already locked and
310
    present in self.needed_locks[locking.LEVEL_INSTANCE].
311

312
    It should be called from DeclareLocks, and for safety only works if
313
    self.recalculate_locks[locking.LEVEL_NODE] is set.
314

315
    In the future it may grow parameters to just lock some instance's nodes, or
316
    to just lock primaries or secondary nodes, if needed.
317

318
    If should be called in DeclareLocks in a way similar to::
319

320
      if level == locking.LEVEL_NODE:
321
        self._LockInstancesNodes()
322

323
    @type primary_only: boolean
324
    @param primary_only: only lock primary nodes of locked instances
325

326
    """
327
    assert locking.LEVEL_NODE in self.recalculate_locks, \
328
      "_LockInstancesNodes helper function called with no nodes to recalculate"
329

    
330
    # TODO: check if we're really been called with the instance locks held
331

    
332
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333
    # future we might want to have different behaviors depending on the value
334
    # of self.recalculate_locks[locking.LEVEL_NODE]
335
    wanted_nodes = []
336
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337
      instance = self.context.cfg.GetInstanceInfo(instance_name)
338
      wanted_nodes.append(instance.primary_node)
339
      if not primary_only:
340
        wanted_nodes.extend(instance.secondary_nodes)
341

    
342
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
346

    
347
    del self.recalculate_locks[locking.LEVEL_NODE]
348

    
349

    
350
class NoHooksLU(LogicalUnit):
351
  """Simple LU which runs no hooks.
352

353
  This LU is intended as a parent for other LogicalUnits which will
354
  run no hooks, in order to reduce duplicate code.
355

356
  """
357
  HPATH = None
358
  HTYPE = None
359

    
360

    
361
class Tasklet:
362
  """Tasklet base class.
363

364
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
366
  tasklets know nothing about locks.
367

368
  Subclasses must follow these rules:
369
    - Implement CheckPrereq
370
    - Implement Exec
371

372
  """
373
  def __init__(self, lu):
374
    self.lu = lu
375

    
376
    # Shortcuts
377
    self.cfg = lu.cfg
378
    self.rpc = lu.rpc
379

    
380
  def CheckPrereq(self):
381
    """Check prerequisites for this tasklets.
382

383
    This method should check whether the prerequisites for the execution of
384
    this tasklet are fulfilled. It can do internode communication, but it
385
    should be idempotent - no cluster or system changes are allowed.
386

387
    The method should raise errors.OpPrereqError in case something is not
388
    fulfilled. Its return value is ignored.
389

390
    This method should also update all parameters to their canonical form if it
391
    hasn't been done before.
392

393
    """
394
    raise NotImplementedError
395

    
396
  def Exec(self, feedback_fn):
397
    """Execute the tasklet.
398

399
    This method should implement the actual work. It should raise
400
    errors.OpExecError for failures that are somewhat dealt with in code, or
401
    expected.
402

403
    """
404
    raise NotImplementedError
405

    
406

    
407
def _GetWantedNodes(lu, nodes):
408
  """Returns list of checked and expanded node names.
409

410
  @type lu: L{LogicalUnit}
411
  @param lu: the logical unit on whose behalf we execute
412
  @type nodes: list
413
  @param nodes: list of node names or None for all nodes
414
  @rtype: list
415
  @return: the list of nodes, sorted
416
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
417

418
  """
419
  if not isinstance(nodes, list):
420
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
421

    
422
  if not nodes:
423
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424
      " non-empty list of nodes whose name is to be expanded.")
425

    
426
  wanted = []
427
  for name in nodes:
428
    node = lu.cfg.ExpandNodeName(name)
429
    if node is None:
430
      raise errors.OpPrereqError("No such node name '%s'" % name)
431
    wanted.append(node)
432

    
433
  return utils.NiceSort(wanted)
434

    
435

    
436
def _GetWantedInstances(lu, instances):
437
  """Returns list of checked and expanded instance names.
438

439
  @type lu: L{LogicalUnit}
440
  @param lu: the logical unit on whose behalf we execute
441
  @type instances: list
442
  @param instances: list of instance names or None for all instances
443
  @rtype: list
444
  @return: the list of instances, sorted
445
  @raise errors.OpPrereqError: if the instances parameter is wrong type
446
  @raise errors.OpPrereqError: if any of the passed instances is not found
447

448
  """
449
  if not isinstance(instances, list):
450
    raise errors.OpPrereqError("Invalid argument type 'instances'")
451

    
452
  if instances:
453
    wanted = []
454

    
455
    for name in instances:
456
      instance = lu.cfg.ExpandInstanceName(name)
457
      if instance is None:
458
        raise errors.OpPrereqError("No such instance name '%s'" % name)
459
      wanted.append(instance)
460

    
461
  else:
462
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
463
  return wanted
464

    
465

    
466
def _CheckOutputFields(static, dynamic, selected):
467
  """Checks whether all selected fields are valid.
468

469
  @type static: L{utils.FieldSet}
470
  @param static: static fields set
471
  @type dynamic: L{utils.FieldSet}
472
  @param dynamic: dynamic fields set
473

474
  """
475
  f = utils.FieldSet()
476
  f.Extend(static)
477
  f.Extend(dynamic)
478

    
479
  delta = f.NonMatching(selected)
480
  if delta:
481
    raise errors.OpPrereqError("Unknown output fields selected: %s"
482
                               % ",".join(delta))
483

    
484

    
485
def _CheckBooleanOpField(op, name):
486
  """Validates boolean opcode parameters.
487

488
  This will ensure that an opcode parameter is either a boolean value,
489
  or None (but that it always exists).
490

491
  """
492
  val = getattr(op, name, None)
493
  if not (val is None or isinstance(val, bool)):
494
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
495
                               (name, str(val)))
496
  setattr(op, name, val)
497

    
498

    
499
def _CheckNodeOnline(lu, node):
500
  """Ensure that a given node is online.
501

502
  @param lu: the LU on behalf of which we make the check
503
  @param node: the node to check
504
  @raise errors.OpPrereqError: if the node is offline
505

506
  """
507
  if lu.cfg.GetNodeInfo(node).offline:
508
    raise errors.OpPrereqError("Can't use offline node %s" % node)
509

    
510

    
511
def _CheckNodeNotDrained(lu, node):
512
  """Ensure that a given node is not drained.
513

514
  @param lu: the LU on behalf of which we make the check
515
  @param node: the node to check
516
  @raise errors.OpPrereqError: if the node is drained
517

518
  """
519
  if lu.cfg.GetNodeInfo(node).drained:
520
    raise errors.OpPrereqError("Can't use drained node %s" % node)
521

    
522

    
523
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524
                          memory, vcpus, nics, disk_template, disks,
525
                          bep, hvp, hypervisor_name):
526
  """Builds instance related env variables for hooks
527

528
  This builds the hook environment from individual variables.
529

530
  @type name: string
531
  @param name: the name of the instance
532
  @type primary_node: string
533
  @param primary_node: the name of the instance's primary node
534
  @type secondary_nodes: list
535
  @param secondary_nodes: list of secondary nodes as strings
536
  @type os_type: string
537
  @param os_type: the name of the instance's OS
538
  @type status: boolean
539
  @param status: the should_run status of the instance
540
  @type memory: string
541
  @param memory: the memory size of the instance
542
  @type vcpus: string
543
  @param vcpus: the count of VCPUs the instance has
544
  @type nics: list
545
  @param nics: list of tuples (ip, mac, mode, link) representing
546
      the NICs the instance has
547
  @type disk_template: string
548
  @param disk_template: the disk template of the instance
549
  @type disks: list
550
  @param disks: the list of (size, mode) pairs
551
  @type bep: dict
552
  @param bep: the backend parameters for the instance
553
  @type hvp: dict
554
  @param hvp: the hypervisor parameters for the instance
555
  @type hypervisor_name: string
556
  @param hypervisor_name: the hypervisor for the instance
557
  @rtype: dict
558
  @return: the hook environment for this instance
559

560
  """
561
  if status:
562
    str_status = "up"
563
  else:
564
    str_status = "down"
565
  env = {
566
    "OP_TARGET": name,
567
    "INSTANCE_NAME": name,
568
    "INSTANCE_PRIMARY": primary_node,
569
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570
    "INSTANCE_OS_TYPE": os_type,
571
    "INSTANCE_STATUS": str_status,
572
    "INSTANCE_MEMORY": memory,
573
    "INSTANCE_VCPUS": vcpus,
574
    "INSTANCE_DISK_TEMPLATE": disk_template,
575
    "INSTANCE_HYPERVISOR": hypervisor_name,
576
  }
577

    
578
  if nics:
579
    nic_count = len(nics)
580
    for idx, (ip, mac, mode, link) in enumerate(nics):
581
      if ip is None:
582
        ip = ""
583
      env["INSTANCE_NIC%d_IP" % idx] = ip
584
      env["INSTANCE_NIC%d_MAC" % idx] = mac
585
      env["INSTANCE_NIC%d_MODE" % idx] = mode
586
      env["INSTANCE_NIC%d_LINK" % idx] = link
587
      if mode == constants.NIC_MODE_BRIDGED:
588
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
589
  else:
590
    nic_count = 0
591

    
592
  env["INSTANCE_NIC_COUNT"] = nic_count
593

    
594
  if disks:
595
    disk_count = len(disks)
596
    for idx, (size, mode) in enumerate(disks):
597
      env["INSTANCE_DISK%d_SIZE" % idx] = size
598
      env["INSTANCE_DISK%d_MODE" % idx] = mode
599
  else:
600
    disk_count = 0
601

    
602
  env["INSTANCE_DISK_COUNT"] = disk_count
603

    
604
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
605
    for key, value in source.items():
606
      env["INSTANCE_%s_%s" % (kind, key)] = value
607

    
608
  return env
609

    
610

    
611
def _NICListToTuple(lu, nics):
612
  """Build a list of nic information tuples.
613

614
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615
  value in LUQueryInstanceData.
616

617
  @type lu:  L{LogicalUnit}
618
  @param lu: the logical unit on whose behalf we execute
619
  @type nics: list of L{objects.NIC}
620
  @param nics: list of nics to convert to hooks tuples
621

622
  """
623
  hooks_nics = []
624
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
625
  for nic in nics:
626
    ip = nic.ip
627
    mac = nic.mac
628
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629
    mode = filled_params[constants.NIC_MODE]
630
    link = filled_params[constants.NIC_LINK]
631
    hooks_nics.append((ip, mac, mode, link))
632
  return hooks_nics
633

    
634

    
635
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636
  """Builds instance related env variables for hooks from an object.
637

638
  @type lu: L{LogicalUnit}
639
  @param lu: the logical unit on whose behalf we execute
640
  @type instance: L{objects.Instance}
641
  @param instance: the instance for which we should build the
642
      environment
643
  @type override: dict
644
  @param override: dictionary with key/values that will override
645
      our values
646
  @rtype: dict
647
  @return: the hook environment dictionary
648

649
  """
650
  cluster = lu.cfg.GetClusterInfo()
651
  bep = cluster.FillBE(instance)
652
  hvp = cluster.FillHV(instance)
653
  args = {
654
    'name': instance.name,
655
    'primary_node': instance.primary_node,
656
    'secondary_nodes': instance.secondary_nodes,
657
    'os_type': instance.os,
658
    'status': instance.admin_up,
659
    'memory': bep[constants.BE_MEMORY],
660
    'vcpus': bep[constants.BE_VCPUS],
661
    'nics': _NICListToTuple(lu, instance.nics),
662
    'disk_template': instance.disk_template,
663
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
664
    'bep': bep,
665
    'hvp': hvp,
666
    'hypervisor_name': instance.hypervisor,
667
  }
668
  if override:
669
    args.update(override)
670
  return _BuildInstanceHookEnv(**args)
671

    
672

    
673
def _AdjustCandidatePool(lu):
674
  """Adjust the candidate pool after node operations.
675

676
  """
677
  mod_list = lu.cfg.MaintainCandidatePool()
678
  if mod_list:
679
    lu.LogInfo("Promoted nodes to master candidate role: %s",
680
               ", ".join(node.name for node in mod_list))
681
    for name in mod_list:
682
      lu.context.ReaddNode(name)
683
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
684
  if mc_now > mc_max:
685
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
686
               (mc_now, mc_max))
687

    
688

    
689
def _CheckNicsBridgesExist(lu, target_nics, target_node,
690
                               profile=constants.PP_DEFAULT):
691
  """Check that the brigdes needed by a list of nics exist.
692

693
  """
694
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696
                for nic in target_nics]
697
  brlist = [params[constants.NIC_LINK] for params in paramslist
698
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
699
  if brlist:
700
    result = lu.rpc.call_bridges_exist(target_node, brlist)
701
    result.Raise("Error checking bridges on destination node '%s'" %
702
                 target_node, prereq=True)
703

    
704

    
705
def _CheckInstanceBridgesExist(lu, instance, node=None):
706
  """Check that the brigdes needed by an instance exist.
707

708
  """
709
  if node is None:
710
    node = instance.primary_node
711
  _CheckNicsBridgesExist(lu, instance.nics, node)
712

    
713

    
714
def _GetNodePrimaryInstances(cfg, node_name):
715
  """Returns primary instances on a node.
716

717
  """
718
  instances = []
719

    
720
  for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
721
    if node_name == inst.primary_node:
722
      instances.append(inst)
723

    
724
  return instances
725

    
726

    
727
def _GetNodeSecondaryInstances(cfg, node_name):
728
  """Returns secondary instances on a node.
729

730
  """
731
  instances = []
732

    
733
  for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
734
    if node_name in inst.secondary_nodes:
735
      instances.append(inst)
736

    
737
  return instances
738

    
739

    
740
def _GetStorageTypeArgs(cfg, storage_type):
741
  """Returns the arguments for a storage type.
742

743
  """
744
  # Special case for file storage
745
  if storage_type == constants.ST_FILE:
746
    # storage.FileStorage wants a list of storage directories
747
    return [[cfg.GetFileStorageDir()]]
748

    
749
  return []
750

    
751

    
752
class LUDestroyCluster(NoHooksLU):
753
  """Logical unit for destroying the cluster.
754

755
  """
756
  _OP_REQP = []
757

    
758
  def CheckPrereq(self):
759
    """Check prerequisites.
760

761
    This checks whether the cluster is empty.
762

763
    Any errors are signaled by raising errors.OpPrereqError.
764

765
    """
766
    master = self.cfg.GetMasterNode()
767

    
768
    nodelist = self.cfg.GetNodeList()
769
    if len(nodelist) != 1 or nodelist[0] != master:
770
      raise errors.OpPrereqError("There are still %d node(s) in"
771
                                 " this cluster." % (len(nodelist) - 1))
772
    instancelist = self.cfg.GetInstanceList()
773
    if instancelist:
774
      raise errors.OpPrereqError("There are still %d instance(s) in"
775
                                 " this cluster." % len(instancelist))
776

    
777
  def Exec(self, feedback_fn):
778
    """Destroys the cluster.
779

780
    """
781
    master = self.cfg.GetMasterNode()
782
    result = self.rpc.call_node_stop_master(master, False)
783
    result.Raise("Could not disable the master role")
784
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
785
    utils.CreateBackup(priv_key)
786
    utils.CreateBackup(pub_key)
787
    return master
788

    
789

    
790
class LUVerifyCluster(LogicalUnit):
791
  """Verifies the cluster status.
792

793
  """
794
  HPATH = "cluster-verify"
795
  HTYPE = constants.HTYPE_CLUSTER
796
  _OP_REQP = ["skip_checks"]
797
  REQ_BGL = False
798

    
799
  def ExpandNames(self):
800
    self.needed_locks = {
801
      locking.LEVEL_NODE: locking.ALL_SET,
802
      locking.LEVEL_INSTANCE: locking.ALL_SET,
803
    }
804
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
805

    
806
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
807
                  node_result, feedback_fn, master_files,
808
                  drbd_map, vg_name):
809
    """Run multiple tests against a node.
810

811
    Test list:
812

813
      - compares ganeti version
814
      - checks vg existence and size > 20G
815
      - checks config file checksum
816
      - checks ssh to other nodes
817

818
    @type nodeinfo: L{objects.Node}
819
    @param nodeinfo: the node to check
820
    @param file_list: required list of files
821
    @param local_cksum: dictionary of local files and their checksums
822
    @param node_result: the results from the node
823
    @param feedback_fn: function used to accumulate results
824
    @param master_files: list of files that only masters should have
825
    @param drbd_map: the useddrbd minors for this node, in
826
        form of minor: (instance, must_exist) which correspond to instances
827
        and their running status
828
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
829

830
    """
831
    node = nodeinfo.name
832

    
833
    # main result, node_result should be a non-empty dict
834
    if not node_result or not isinstance(node_result, dict):
835
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
836
      return True
837

    
838
    # compares ganeti version
839
    local_version = constants.PROTOCOL_VERSION
840
    remote_version = node_result.get('version', None)
841
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
842
            len(remote_version) == 2):
843
      feedback_fn("  - ERROR: connection to %s failed" % (node))
844
      return True
845

    
846
    if local_version != remote_version[0]:
847
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
848
                  " node %s %s" % (local_version, node, remote_version[0]))
849
      return True
850

    
851
    # node seems compatible, we can actually try to look into its results
852

    
853
    bad = False
854

    
855
    # full package version
856
    if constants.RELEASE_VERSION != remote_version[1]:
857
      feedback_fn("  - WARNING: software version mismatch: master %s,"
858
                  " node %s %s" %
859
                  (constants.RELEASE_VERSION, node, remote_version[1]))
860

    
861
    # checks vg existence and size > 20G
862
    if vg_name is not None:
863
      vglist = node_result.get(constants.NV_VGLIST, None)
864
      if not vglist:
865
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
866
                        (node,))
867
        bad = True
868
      else:
869
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
870
                                              constants.MIN_VG_SIZE)
871
        if vgstatus:
872
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
873
          bad = True
874

    
875
    # checks config file checksum
876

    
877
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
878
    if not isinstance(remote_cksum, dict):
879
      bad = True
880
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
881
    else:
882
      for file_name in file_list:
883
        node_is_mc = nodeinfo.master_candidate
884
        must_have_file = file_name not in master_files
885
        if file_name not in remote_cksum:
886
          if node_is_mc or must_have_file:
887
            bad = True
888
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
889
        elif remote_cksum[file_name] != local_cksum[file_name]:
890
          if node_is_mc or must_have_file:
891
            bad = True
892
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
893
          else:
894
            # not candidate and this is not a must-have file
895
            bad = True
896
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
897
                        " candidates (and the file is outdated)" % file_name)
898
        else:
899
          # all good, except non-master/non-must have combination
900
          if not node_is_mc and not must_have_file:
901
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
902
                        " candidates" % file_name)
903

    
904
    # checks ssh to any
905

    
906
    if constants.NV_NODELIST not in node_result:
907
      bad = True
908
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
909
    else:
910
      if node_result[constants.NV_NODELIST]:
911
        bad = True
912
        for node in node_result[constants.NV_NODELIST]:
913
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
914
                          (node, node_result[constants.NV_NODELIST][node]))
915

    
916
    if constants.NV_NODENETTEST not in node_result:
917
      bad = True
918
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
919
    else:
920
      if node_result[constants.NV_NODENETTEST]:
921
        bad = True
922
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
923
        for node in nlist:
924
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
925
                          (node, node_result[constants.NV_NODENETTEST][node]))
926

    
927
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
928
    if isinstance(hyp_result, dict):
929
      for hv_name, hv_result in hyp_result.iteritems():
930
        if hv_result is not None:
931
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
932
                      (hv_name, hv_result))
933

    
934
    # check used drbd list
935
    if vg_name is not None:
936
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
937
      if not isinstance(used_minors, (tuple, list)):
938
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
939
                    str(used_minors))
940
      else:
941
        for minor, (iname, must_exist) in drbd_map.items():
942
          if minor not in used_minors and must_exist:
943
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
944
                        " not active" % (minor, iname))
945
            bad = True
946
        for minor in used_minors:
947
          if minor not in drbd_map:
948
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
949
                        minor)
950
            bad = True
951

    
952
    return bad
953

    
954
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
955
                      node_instance, feedback_fn, n_offline):
956
    """Verify an instance.
957

958
    This function checks to see if the required block devices are
959
    available on the instance's node.
960

961
    """
962
    bad = False
963

    
964
    node_current = instanceconfig.primary_node
965

    
966
    node_vol_should = {}
967
    instanceconfig.MapLVsByNode(node_vol_should)
968

    
969
    for node in node_vol_should:
970
      if node in n_offline:
971
        # ignore missing volumes on offline nodes
972
        continue
973
      for volume in node_vol_should[node]:
974
        if node not in node_vol_is or volume not in node_vol_is[node]:
975
          feedback_fn("  - ERROR: volume %s missing on node %s" %
976
                          (volume, node))
977
          bad = True
978

    
979
    if instanceconfig.admin_up:
980
      if ((node_current not in node_instance or
981
          not instance in node_instance[node_current]) and
982
          node_current not in n_offline):
983
        feedback_fn("  - ERROR: instance %s not running on node %s" %
984
                        (instance, node_current))
985
        bad = True
986

    
987
    for node in node_instance:
988
      if (not node == node_current):
989
        if instance in node_instance[node]:
990
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
991
                          (instance, node))
992
          bad = True
993

    
994
    return bad
995

    
996
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
997
    """Verify if there are any unknown volumes in the cluster.
998

999
    The .os, .swap and backup volumes are ignored. All other volumes are
1000
    reported as unknown.
1001

1002
    """
1003
    bad = False
1004

    
1005
    for node in node_vol_is:
1006
      for volume in node_vol_is[node]:
1007
        if node not in node_vol_should or volume not in node_vol_should[node]:
1008
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
1009
                      (volume, node))
1010
          bad = True
1011
    return bad
1012

    
1013
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1014
    """Verify the list of running instances.
1015

1016
    This checks what instances are running but unknown to the cluster.
1017

1018
    """
1019
    bad = False
1020
    for node in node_instance:
1021
      for runninginstance in node_instance[node]:
1022
        if runninginstance not in instancelist:
1023
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
1024
                          (runninginstance, node))
1025
          bad = True
1026
    return bad
1027

    
1028
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1029
    """Verify N+1 Memory Resilience.
1030

1031
    Check that if one single node dies we can still start all the instances it
1032
    was primary for.
1033

1034
    """
1035
    bad = False
1036

    
1037
    for node, nodeinfo in node_info.iteritems():
1038
      # This code checks that every node which is now listed as secondary has
1039
      # enough memory to host all instances it is supposed to should a single
1040
      # other node in the cluster fail.
1041
      # FIXME: not ready for failover to an arbitrary node
1042
      # FIXME: does not support file-backed instances
1043
      # WARNING: we currently take into account down instances as well as up
1044
      # ones, considering that even if they're down someone might want to start
1045
      # them even in the event of a node failure.
1046
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1047
        needed_mem = 0
1048
        for instance in instances:
1049
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1050
          if bep[constants.BE_AUTO_BALANCE]:
1051
            needed_mem += bep[constants.BE_MEMORY]
1052
        if nodeinfo['mfree'] < needed_mem:
1053
          feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
1054
                      " failovers should node %s fail" % (node, prinode))
1055
          bad = True
1056
    return bad
1057

    
1058
  def CheckPrereq(self):
1059
    """Check prerequisites.
1060

1061
    Transform the list of checks we're going to skip into a set and check that
1062
    all its members are valid.
1063

1064
    """
1065
    self.skip_set = frozenset(self.op.skip_checks)
1066
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1067
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
1068

    
1069
  def BuildHooksEnv(self):
1070
    """Build hooks env.
1071

1072
    Cluster-Verify hooks just ran in the post phase and their failure makes
1073
    the output be logged in the verify output and the verification to fail.
1074

1075
    """
1076
    all_nodes = self.cfg.GetNodeList()
1077
    env = {
1078
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1079
      }
1080
    for node in self.cfg.GetAllNodesInfo().values():
1081
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1082

    
1083
    return env, [], all_nodes
1084

    
1085
  def Exec(self, feedback_fn):
1086
    """Verify integrity of cluster, performing various test on nodes.
1087

1088
    """
1089
    bad = False
1090
    feedback_fn("* Verifying global settings")
1091
    for msg in self.cfg.VerifyConfig():
1092
      feedback_fn("  - ERROR: %s" % msg)
1093

    
1094
    vg_name = self.cfg.GetVGName()
1095
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1096
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1097
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1098
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1099
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1100
                        for iname in instancelist)
1101
    i_non_redundant = [] # Non redundant instances
1102
    i_non_a_balanced = [] # Non auto-balanced instances
1103
    n_offline = [] # List of offline nodes
1104
    n_drained = [] # List of nodes being drained
1105
    node_volume = {}
1106
    node_instance = {}
1107
    node_info = {}
1108
    instance_cfg = {}
1109

    
1110
    # FIXME: verify OS list
1111
    # do local checksums
1112
    master_files = [constants.CLUSTER_CONF_FILE]
1113

    
1114
    file_names = ssconf.SimpleStore().GetFileList()
1115
    file_names.append(constants.SSL_CERT_FILE)
1116
    file_names.append(constants.RAPI_CERT_FILE)
1117
    file_names.extend(master_files)
1118

    
1119
    local_checksums = utils.FingerprintFiles(file_names)
1120

    
1121
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1122
    node_verify_param = {
1123
      constants.NV_FILELIST: file_names,
1124
      constants.NV_NODELIST: [node.name for node in nodeinfo
1125
                              if not node.offline],
1126
      constants.NV_HYPERVISOR: hypervisors,
1127
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1128
                                  node.secondary_ip) for node in nodeinfo
1129
                                 if not node.offline],
1130
      constants.NV_INSTANCELIST: hypervisors,
1131
      constants.NV_VERSION: None,
1132
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1133
      }
1134
    if vg_name is not None:
1135
      node_verify_param[constants.NV_VGLIST] = None
1136
      node_verify_param[constants.NV_LVLIST] = vg_name
1137
      node_verify_param[constants.NV_DRBDLIST] = None
1138
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1139
                                           self.cfg.GetClusterName())
1140

    
1141
    cluster = self.cfg.GetClusterInfo()
1142
    master_node = self.cfg.GetMasterNode()
1143
    all_drbd_map = self.cfg.ComputeDRBDMap()
1144

    
1145
    for node_i in nodeinfo:
1146
      node = node_i.name
1147

    
1148
      if node_i.offline:
1149
        feedback_fn("* Skipping offline node %s" % (node,))
1150
        n_offline.append(node)
1151
        continue
1152

    
1153
      if node == master_node:
1154
        ntype = "master"
1155
      elif node_i.master_candidate:
1156
        ntype = "master candidate"
1157
      elif node_i.drained:
1158
        ntype = "drained"
1159
        n_drained.append(node)
1160
      else:
1161
        ntype = "regular"
1162
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1163

    
1164
      msg = all_nvinfo[node].fail_msg
1165
      if msg:
1166
        feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1167
        bad = True
1168
        continue
1169

    
1170
      nresult = all_nvinfo[node].payload
1171
      node_drbd = {}
1172
      for minor, instance in all_drbd_map[node].items():
1173
        if instance not in instanceinfo:
1174
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1175
                      instance)
1176
          # ghost instance should not be running, but otherwise we
1177
          # don't give double warnings (both ghost instance and
1178
          # unallocated minor in use)
1179
          node_drbd[minor] = (instance, False)
1180
        else:
1181
          instance = instanceinfo[instance]
1182
          node_drbd[minor] = (instance.name, instance.admin_up)
1183
      result = self._VerifyNode(node_i, file_names, local_checksums,
1184
                                nresult, feedback_fn, master_files,
1185
                                node_drbd, vg_name)
1186
      bad = bad or result
1187

    
1188
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1189
      if vg_name is None:
1190
        node_volume[node] = {}
1191
      elif isinstance(lvdata, basestring):
1192
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1193
                    (node, utils.SafeEncode(lvdata)))
1194
        bad = True
1195
        node_volume[node] = {}
1196
      elif not isinstance(lvdata, dict):
1197
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1198
        bad = True
1199
        continue
1200
      else:
1201
        node_volume[node] = lvdata
1202

    
1203
      # node_instance
1204
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1205
      if not isinstance(idata, list):
1206
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1207
                    (node,))
1208
        bad = True
1209
        continue
1210

    
1211
      node_instance[node] = idata
1212

    
1213
      # node_info
1214
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1215
      if not isinstance(nodeinfo, dict):
1216
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1217
        bad = True
1218
        continue
1219

    
1220
      try:
1221
        node_info[node] = {
1222
          "mfree": int(nodeinfo['memory_free']),
1223
          "pinst": [],
1224
          "sinst": [],
1225
          # dictionary holding all instances this node is secondary for,
1226
          # grouped by their primary node. Each key is a cluster node, and each
1227
          # value is a list of instances which have the key as primary and the
1228
          # current node as secondary.  this is handy to calculate N+1 memory
1229
          # availability if you can only failover from a primary to its
1230
          # secondary.
1231
          "sinst-by-pnode": {},
1232
        }
1233
        # FIXME: devise a free space model for file based instances as well
1234
        if vg_name is not None:
1235
          if (constants.NV_VGLIST not in nresult or
1236
              vg_name not in nresult[constants.NV_VGLIST]):
1237
            feedback_fn("  - ERROR: node %s didn't return data for the"
1238
                        " volume group '%s' - it is either missing or broken" %
1239
                        (node, vg_name))
1240
            bad = True
1241
            continue
1242
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1243
      except (ValueError, KeyError):
1244
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1245
                    " from node %s" % (node,))
1246
        bad = True
1247
        continue
1248

    
1249
    node_vol_should = {}
1250

    
1251
    for instance in instancelist:
1252
      feedback_fn("* Verifying instance %s" % instance)
1253
      inst_config = instanceinfo[instance]
1254
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1255
                                     node_instance, feedback_fn, n_offline)
1256
      bad = bad or result
1257
      inst_nodes_offline = []
1258

    
1259
      inst_config.MapLVsByNode(node_vol_should)
1260

    
1261
      instance_cfg[instance] = inst_config
1262

    
1263
      pnode = inst_config.primary_node
1264
      if pnode in node_info:
1265
        node_info[pnode]['pinst'].append(instance)
1266
      elif pnode not in n_offline:
1267
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1268
                    " %s failed" % (instance, pnode))
1269
        bad = True
1270

    
1271
      if pnode in n_offline:
1272
        inst_nodes_offline.append(pnode)
1273

    
1274
      # If the instance is non-redundant we cannot survive losing its primary
1275
      # node, so we are not N+1 compliant. On the other hand we have no disk
1276
      # templates with more than one secondary so that situation is not well
1277
      # supported either.
1278
      # FIXME: does not support file-backed instances
1279
      if len(inst_config.secondary_nodes) == 0:
1280
        i_non_redundant.append(instance)
1281
      elif len(inst_config.secondary_nodes) > 1:
1282
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1283
                    % instance)
1284

    
1285
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1286
        i_non_a_balanced.append(instance)
1287

    
1288
      for snode in inst_config.secondary_nodes:
1289
        if snode in node_info:
1290
          node_info[snode]['sinst'].append(instance)
1291
          if pnode not in node_info[snode]['sinst-by-pnode']:
1292
            node_info[snode]['sinst-by-pnode'][pnode] = []
1293
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1294
        elif snode not in n_offline:
1295
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1296
                      " %s failed" % (instance, snode))
1297
          bad = True
1298
        if snode in n_offline:
1299
          inst_nodes_offline.append(snode)
1300

    
1301
      if inst_nodes_offline:
1302
        # warn that the instance lives on offline nodes, and set bad=True
1303
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1304
                    ", ".join(inst_nodes_offline))
1305
        bad = True
1306

    
1307
    feedback_fn("* Verifying orphan volumes")
1308
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1309
                                       feedback_fn)
1310
    bad = bad or result
1311

    
1312
    feedback_fn("* Verifying remaining instances")
1313
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1314
                                         feedback_fn)
1315
    bad = bad or result
1316

    
1317
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1318
      feedback_fn("* Verifying N+1 Memory redundancy")
1319
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1320
      bad = bad or result
1321

    
1322
    feedback_fn("* Other Notes")
1323
    if i_non_redundant:
1324
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1325
                  % len(i_non_redundant))
1326

    
1327
    if i_non_a_balanced:
1328
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1329
                  % len(i_non_a_balanced))
1330

    
1331
    if n_offline:
1332
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1333

    
1334
    if n_drained:
1335
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1336

    
1337
    return not bad
1338

    
1339
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1340
    """Analyze the post-hooks' result
1341

1342
    This method analyses the hook result, handles it, and sends some
1343
    nicely-formatted feedback back to the user.
1344

1345
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1346
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1347
    @param hooks_results: the results of the multi-node hooks rpc call
1348
    @param feedback_fn: function used send feedback back to the caller
1349
    @param lu_result: previous Exec result
1350
    @return: the new Exec result, based on the previous result
1351
        and hook results
1352

1353
    """
1354
    # We only really run POST phase hooks, and are only interested in
1355
    # their results
1356
    if phase == constants.HOOKS_PHASE_POST:
1357
      # Used to change hooks' output to proper indentation
1358
      indent_re = re.compile('^', re.M)
1359
      feedback_fn("* Hooks Results")
1360
      if not hooks_results:
1361
        feedback_fn("  - ERROR: general communication failure")
1362
        lu_result = 1
1363
      else:
1364
        for node_name in hooks_results:
1365
          show_node_header = True
1366
          res = hooks_results[node_name]
1367
          msg = res.fail_msg
1368
          if msg:
1369
            if res.offline:
1370
              # no need to warn or set fail return value
1371
              continue
1372
            feedback_fn("    Communication failure in hooks execution: %s" %
1373
                        msg)
1374
            lu_result = 1
1375
            continue
1376
          for script, hkr, output in res.payload:
1377
            if hkr == constants.HKR_FAIL:
1378
              # The node header is only shown once, if there are
1379
              # failing hooks on that node
1380
              if show_node_header:
1381
                feedback_fn("  Node %s:" % node_name)
1382
                show_node_header = False
1383
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1384
              output = indent_re.sub('      ', output)
1385
              feedback_fn("%s" % output)
1386
              lu_result = 1
1387

    
1388
      return lu_result
1389

    
1390

    
1391
class LUVerifyDisks(NoHooksLU):
1392
  """Verifies the cluster disks status.
1393

1394
  """
1395
  _OP_REQP = []
1396
  REQ_BGL = False
1397

    
1398
  def ExpandNames(self):
1399
    self.needed_locks = {
1400
      locking.LEVEL_NODE: locking.ALL_SET,
1401
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1402
    }
1403
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1404

    
1405
  def CheckPrereq(self):
1406
    """Check prerequisites.
1407

1408
    This has no prerequisites.
1409

1410
    """
1411
    pass
1412

    
1413
  def Exec(self, feedback_fn):
1414
    """Verify integrity of cluster disks.
1415

1416
    @rtype: tuple of three items
1417
    @return: a tuple of (dict of node-to-node_error, list of instances
1418
        which need activate-disks, dict of instance: (node, volume) for
1419
        missing volumes
1420

1421
    """
1422
    result = res_nodes, res_instances, res_missing = {}, [], {}
1423

    
1424
    vg_name = self.cfg.GetVGName()
1425
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1426
    instances = [self.cfg.GetInstanceInfo(name)
1427
                 for name in self.cfg.GetInstanceList()]
1428

    
1429
    nv_dict = {}
1430
    for inst in instances:
1431
      inst_lvs = {}
1432
      if (not inst.admin_up or
1433
          inst.disk_template not in constants.DTS_NET_MIRROR):
1434
        continue
1435
      inst.MapLVsByNode(inst_lvs)
1436
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1437
      for node, vol_list in inst_lvs.iteritems():
1438
        for vol in vol_list:
1439
          nv_dict[(node, vol)] = inst
1440

    
1441
    if not nv_dict:
1442
      return result
1443

    
1444
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1445

    
1446
    for node in nodes:
1447
      # node_volume
1448
      node_res = node_lvs[node]
1449
      if node_res.offline:
1450
        continue
1451
      msg = node_res.fail_msg
1452
      if msg:
1453
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1454
        res_nodes[node] = msg
1455
        continue
1456

    
1457
      lvs = node_res.payload
1458
      for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1459
        inst = nv_dict.pop((node, lv_name), None)
1460
        if (not lv_online and inst is not None
1461
            and inst.name not in res_instances):
1462
          res_instances.append(inst.name)
1463

    
1464
    # any leftover items in nv_dict are missing LVs, let's arrange the
1465
    # data better
1466
    for key, inst in nv_dict.iteritems():
1467
      if inst.name not in res_missing:
1468
        res_missing[inst.name] = []
1469
      res_missing[inst.name].append(key)
1470

    
1471
    return result
1472

    
1473

    
1474
class LURenameCluster(LogicalUnit):
1475
  """Rename the cluster.
1476

1477
  """
1478
  HPATH = "cluster-rename"
1479
  HTYPE = constants.HTYPE_CLUSTER
1480
  _OP_REQP = ["name"]
1481

    
1482
  def BuildHooksEnv(self):
1483
    """Build hooks env.
1484

1485
    """
1486
    env = {
1487
      "OP_TARGET": self.cfg.GetClusterName(),
1488
      "NEW_NAME": self.op.name,
1489
      }
1490
    mn = self.cfg.GetMasterNode()
1491
    return env, [mn], [mn]
1492

    
1493
  def CheckPrereq(self):
1494
    """Verify that the passed name is a valid one.
1495

1496
    """
1497
    hostname = utils.HostInfo(self.op.name)
1498

    
1499
    new_name = hostname.name
1500
    self.ip = new_ip = hostname.ip
1501
    old_name = self.cfg.GetClusterName()
1502
    old_ip = self.cfg.GetMasterIP()
1503
    if new_name == old_name and new_ip == old_ip:
1504
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1505
                                 " cluster has changed")
1506
    if new_ip != old_ip:
1507
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1508
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1509
                                   " reachable on the network. Aborting." %
1510
                                   new_ip)
1511

    
1512
    self.op.name = new_name
1513

    
1514
  def Exec(self, feedback_fn):
1515
    """Rename the cluster.
1516

1517
    """
1518
    clustername = self.op.name
1519
    ip = self.ip
1520

    
1521
    # shutdown the master IP
1522
    master = self.cfg.GetMasterNode()
1523
    result = self.rpc.call_node_stop_master(master, False)
1524
    result.Raise("Could not disable the master role")
1525

    
1526
    try:
1527
      cluster = self.cfg.GetClusterInfo()
1528
      cluster.cluster_name = clustername
1529
      cluster.master_ip = ip
1530
      self.cfg.Update(cluster)
1531

    
1532
      # update the known hosts file
1533
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1534
      node_list = self.cfg.GetNodeList()
1535
      try:
1536
        node_list.remove(master)
1537
      except ValueError:
1538
        pass
1539
      result = self.rpc.call_upload_file(node_list,
1540
                                         constants.SSH_KNOWN_HOSTS_FILE)
1541
      for to_node, to_result in result.iteritems():
1542
        msg = to_result.fail_msg
1543
        if msg:
1544
          msg = ("Copy of file %s to node %s failed: %s" %
1545
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1546
          self.proc.LogWarning(msg)
1547

    
1548
    finally:
1549
      result = self.rpc.call_node_start_master(master, False, False)
1550
      msg = result.fail_msg
1551
      if msg:
1552
        self.LogWarning("Could not re-enable the master role on"
1553
                        " the master, please restart manually: %s", msg)
1554

    
1555

    
1556
def _RecursiveCheckIfLVMBased(disk):
1557
  """Check if the given disk or its children are lvm-based.
1558

1559
  @type disk: L{objects.Disk}
1560
  @param disk: the disk to check
1561
  @rtype: boolean
1562
  @return: boolean indicating whether a LD_LV dev_type was found or not
1563

1564
  """
1565
  if disk.children:
1566
    for chdisk in disk.children:
1567
      if _RecursiveCheckIfLVMBased(chdisk):
1568
        return True
1569
  return disk.dev_type == constants.LD_LV
1570

    
1571

    
1572
class LUSetClusterParams(LogicalUnit):
1573
  """Change the parameters of the cluster.
1574

1575
  """
1576
  HPATH = "cluster-modify"
1577
  HTYPE = constants.HTYPE_CLUSTER
1578
  _OP_REQP = []
1579
  REQ_BGL = False
1580

    
1581
  def CheckArguments(self):
1582
    """Check parameters
1583

1584
    """
1585
    if not hasattr(self.op, "candidate_pool_size"):
1586
      self.op.candidate_pool_size = None
1587
    if self.op.candidate_pool_size is not None:
1588
      try:
1589
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1590
      except (ValueError, TypeError), err:
1591
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1592
                                   str(err))
1593
      if self.op.candidate_pool_size < 1:
1594
        raise errors.OpPrereqError("At least one master candidate needed")
1595

    
1596
  def ExpandNames(self):
1597
    # FIXME: in the future maybe other cluster params won't require checking on
1598
    # all nodes to be modified.
1599
    self.needed_locks = {
1600
      locking.LEVEL_NODE: locking.ALL_SET,
1601
    }
1602
    self.share_locks[locking.LEVEL_NODE] = 1
1603

    
1604
  def BuildHooksEnv(self):
1605
    """Build hooks env.
1606

1607
    """
1608
    env = {
1609
      "OP_TARGET": self.cfg.GetClusterName(),
1610
      "NEW_VG_NAME": self.op.vg_name,
1611
      }
1612
    mn = self.cfg.GetMasterNode()
1613
    return env, [mn], [mn]
1614

    
1615
  def CheckPrereq(self):
1616
    """Check prerequisites.
1617

1618
    This checks whether the given params don't conflict and
1619
    if the given volume group is valid.
1620

1621
    """
1622
    if self.op.vg_name is not None and not self.op.vg_name:
1623
      instances = self.cfg.GetAllInstancesInfo().values()
1624
      for inst in instances:
1625
        for disk in inst.disks:
1626
          if _RecursiveCheckIfLVMBased(disk):
1627
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1628
                                       " lvm-based instances exist")
1629

    
1630
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1631

    
1632
    # if vg_name not None, checks given volume group on all nodes
1633
    if self.op.vg_name:
1634
      vglist = self.rpc.call_vg_list(node_list)
1635
      for node in node_list:
1636
        msg = vglist[node].fail_msg
1637
        if msg:
1638
          # ignoring down node
1639
          self.LogWarning("Error while gathering data on node %s"
1640
                          " (ignoring node): %s", node, msg)
1641
          continue
1642
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1643
                                              self.op.vg_name,
1644
                                              constants.MIN_VG_SIZE)
1645
        if vgstatus:
1646
          raise errors.OpPrereqError("Error on node '%s': %s" %
1647
                                     (node, vgstatus))
1648

    
1649
    self.cluster = cluster = self.cfg.GetClusterInfo()
1650
    # validate params changes
1651
    if self.op.beparams:
1652
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1653
      self.new_beparams = objects.FillDict(
1654
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1655

    
1656
    if self.op.nicparams:
1657
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1658
      self.new_nicparams = objects.FillDict(
1659
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1660
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1661

    
1662
    # hypervisor list/parameters
1663
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1664
    if self.op.hvparams:
1665
      if not isinstance(self.op.hvparams, dict):
1666
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1667
      for hv_name, hv_dict in self.op.hvparams.items():
1668
        if hv_name not in self.new_hvparams:
1669
          self.new_hvparams[hv_name] = hv_dict
1670
        else:
1671
          self.new_hvparams[hv_name].update(hv_dict)
1672

    
1673
    if self.op.enabled_hypervisors is not None:
1674
      self.hv_list = self.op.enabled_hypervisors
1675
      if not self.hv_list:
1676
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1677
                                   " least one member")
1678
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1679
      if invalid_hvs:
1680
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1681
                                   " entries: %s" % invalid_hvs)
1682
    else:
1683
      self.hv_list = cluster.enabled_hypervisors
1684

    
1685
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1686
      # either the enabled list has changed, or the parameters have, validate
1687
      for hv_name, hv_params in self.new_hvparams.items():
1688
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1689
            (self.op.enabled_hypervisors and
1690
             hv_name in self.op.enabled_hypervisors)):
1691
          # either this is a new hypervisor, or its parameters have changed
1692
          hv_class = hypervisor.GetHypervisor(hv_name)
1693
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1694
          hv_class.CheckParameterSyntax(hv_params)
1695
          _CheckHVParams(self, node_list, hv_name, hv_params)
1696

    
1697
  def Exec(self, feedback_fn):
1698
    """Change the parameters of the cluster.
1699

1700
    """
1701
    if self.op.vg_name is not None:
1702
      new_volume = self.op.vg_name
1703
      if not new_volume:
1704
        new_volume = None
1705
      if new_volume != self.cfg.GetVGName():
1706
        self.cfg.SetVGName(new_volume)
1707
      else:
1708
        feedback_fn("Cluster LVM configuration already in desired"
1709
                    " state, not changing")
1710
    if self.op.hvparams:
1711
      self.cluster.hvparams = self.new_hvparams
1712
    if self.op.enabled_hypervisors is not None:
1713
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1714
    if self.op.beparams:
1715
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1716
    if self.op.nicparams:
1717
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1718

    
1719
    if self.op.candidate_pool_size is not None:
1720
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1721
      # we need to update the pool size here, otherwise the save will fail
1722
      _AdjustCandidatePool(self)
1723

    
1724
    self.cfg.Update(self.cluster)
1725

    
1726

    
1727
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1728
  """Distribute additional files which are part of the cluster configuration.
1729

1730
  ConfigWriter takes care of distributing the config and ssconf files, but
1731
  there are more files which should be distributed to all nodes. This function
1732
  makes sure those are copied.
1733

1734
  @param lu: calling logical unit
1735
  @param additional_nodes: list of nodes not in the config to distribute to
1736

1737
  """
1738
  # 1. Gather target nodes
1739
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1740
  dist_nodes = lu.cfg.GetNodeList()
1741
  if additional_nodes is not None:
1742
    dist_nodes.extend(additional_nodes)
1743
  if myself.name in dist_nodes:
1744
    dist_nodes.remove(myself.name)
1745
  # 2. Gather files to distribute
1746
  dist_files = set([constants.ETC_HOSTS,
1747
                    constants.SSH_KNOWN_HOSTS_FILE,
1748
                    constants.RAPI_CERT_FILE,
1749
                    constants.RAPI_USERS_FILE,
1750
                    constants.HMAC_CLUSTER_KEY,
1751
                   ])
1752

    
1753
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1754
  for hv_name in enabled_hypervisors:
1755
    hv_class = hypervisor.GetHypervisor(hv_name)
1756
    dist_files.update(hv_class.GetAncillaryFiles())
1757

    
1758
  # 3. Perform the files upload
1759
  for fname in dist_files:
1760
    if os.path.exists(fname):
1761
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1762
      for to_node, to_result in result.items():
1763
        msg = to_result.fail_msg
1764
        if msg:
1765
          msg = ("Copy of file %s to node %s failed: %s" %
1766
                 (fname, to_node, msg))
1767
          lu.proc.LogWarning(msg)
1768

    
1769

    
1770
class LURedistributeConfig(NoHooksLU):
1771
  """Force the redistribution of cluster configuration.
1772

1773
  This is a very simple LU.
1774

1775
  """
1776
  _OP_REQP = []
1777
  REQ_BGL = False
1778

    
1779
  def ExpandNames(self):
1780
    self.needed_locks = {
1781
      locking.LEVEL_NODE: locking.ALL_SET,
1782
    }
1783
    self.share_locks[locking.LEVEL_NODE] = 1
1784

    
1785
  def CheckPrereq(self):
1786
    """Check prerequisites.
1787

1788
    """
1789

    
1790
  def Exec(self, feedback_fn):
1791
    """Redistribute the configuration.
1792

1793
    """
1794
    self.cfg.Update(self.cfg.GetClusterInfo())
1795
    _RedistributeAncillaryFiles(self)
1796

    
1797

    
1798
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1799
  """Sleep and poll for an instance's disk to sync.
1800

1801
  """
1802
  if not instance.disks:
1803
    return True
1804

    
1805
  if not oneshot:
1806
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1807

    
1808
  node = instance.primary_node
1809

    
1810
  for dev in instance.disks:
1811
    lu.cfg.SetDiskID(dev, node)
1812

    
1813
  retries = 0
1814
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1815
  while True:
1816
    max_time = 0
1817
    done = True
1818
    cumul_degraded = False
1819
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1820
    msg = rstats.fail_msg
1821
    if msg:
1822
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1823
      retries += 1
1824
      if retries >= 10:
1825
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1826
                                 " aborting." % node)
1827
      time.sleep(6)
1828
      continue
1829
    rstats = rstats.payload
1830
    retries = 0
1831
    for i, mstat in enumerate(rstats):
1832
      if mstat is None:
1833
        lu.LogWarning("Can't compute data for node %s/%s",
1834
                           node, instance.disks[i].iv_name)
1835
        continue
1836

    
1837
      cumul_degraded = (cumul_degraded or
1838
                        (mstat.is_degraded and mstat.sync_percent is None))
1839
      if mstat.sync_percent is not None:
1840
        done = False
1841
        if mstat.estimated_time is not None:
1842
          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
1843
          max_time = mstat.estimated_time
1844
        else:
1845
          rem_time = "no time estimate"
1846
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1847
                        (instance.disks[i].iv_name, mstat.sync_percent, rem_time))
1848

    
1849
    # if we're done but degraded, let's do a few small retries, to
1850
    # make sure we see a stable and not transient situation; therefore
1851
    # we force restart of the loop
1852
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1853
      logging.info("Degraded disks found, %d retries left", degr_retries)
1854
      degr_retries -= 1
1855
      time.sleep(1)
1856
      continue
1857

    
1858
    if done or oneshot:
1859
      break
1860

    
1861
    time.sleep(min(60, max_time))
1862

    
1863
  if done:
1864
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1865
  return not cumul_degraded
1866

    
1867

    
1868
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1869
  """Check that mirrors are not degraded.
1870

1871
  The ldisk parameter, if True, will change the test from the
1872
  is_degraded attribute (which represents overall non-ok status for
1873
  the device(s)) to the ldisk (representing the local storage status).
1874

1875
  """
1876
  lu.cfg.SetDiskID(dev, node)
1877

    
1878
  result = True
1879

    
1880
  if on_primary or dev.AssembleOnSecondary():
1881
    rstats = lu.rpc.call_blockdev_find(node, dev)
1882
    msg = rstats.fail_msg
1883
    if msg:
1884
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1885
      result = False
1886
    elif not rstats.payload:
1887
      lu.LogWarning("Can't find disk on node %s", node)
1888
      result = False
1889
    else:
1890
      if ldisk:
1891
        result = result and not rstats.payload.ldisk_degraded
1892
      else:
1893
        result = result and not rstats.payload.is_degraded
1894

    
1895
  if dev.children:
1896
    for child in dev.children:
1897
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1898

    
1899
  return result
1900

    
1901

    
1902
class LUDiagnoseOS(NoHooksLU):
1903
  """Logical unit for OS diagnose/query.
1904

1905
  """
1906
  _OP_REQP = ["output_fields", "names"]
1907
  REQ_BGL = False
1908
  _FIELDS_STATIC = utils.FieldSet()
1909
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1910

    
1911
  def ExpandNames(self):
1912
    if self.op.names:
1913
      raise errors.OpPrereqError("Selective OS query not supported")
1914

    
1915
    _CheckOutputFields(static=self._FIELDS_STATIC,
1916
                       dynamic=self._FIELDS_DYNAMIC,
1917
                       selected=self.op.output_fields)
1918

    
1919
    # Lock all nodes, in shared mode
1920
    # Temporary removal of locks, should be reverted later
1921
    # TODO: reintroduce locks when they are lighter-weight
1922
    self.needed_locks = {}
1923
    #self.share_locks[locking.LEVEL_NODE] = 1
1924
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1925

    
1926
  def CheckPrereq(self):
1927
    """Check prerequisites.
1928

1929
    """
1930

    
1931
  @staticmethod
1932
  def _DiagnoseByOS(node_list, rlist):
1933
    """Remaps a per-node return list into an a per-os per-node dictionary
1934

1935
    @param node_list: a list with the names of all nodes
1936
    @param rlist: a map with node names as keys and OS objects as values
1937

1938
    @rtype: dict
1939
    @return: a dictionary with osnames as keys and as value another map, with
1940
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
1941

1942
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1943
                                     (/srv/..., False, "invalid api")],
1944
                           "node2": [(/srv/..., True, "")]}
1945
          }
1946

1947
    """
1948
    all_os = {}
1949
    # we build here the list of nodes that didn't fail the RPC (at RPC
1950
    # level), so that nodes with a non-responding node daemon don't
1951
    # make all OSes invalid
1952
    good_nodes = [node_name for node_name in rlist
1953
                  if not rlist[node_name].fail_msg]
1954
    for node_name, nr in rlist.items():
1955
      if nr.fail_msg or not nr.payload:
1956
        continue
1957
      for name, path, status, diagnose in nr.payload:
1958
        if name not in all_os:
1959
          # build a list of nodes for this os containing empty lists
1960
          # for each node in node_list
1961
          all_os[name] = {}
1962
          for nname in good_nodes:
1963
            all_os[name][nname] = []
1964
        all_os[name][node_name].append((path, status, diagnose))
1965
    return all_os
1966

    
1967
  def Exec(self, feedback_fn):
1968
    """Compute the list of OSes.
1969

1970
    """
1971
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1972
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1973
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1974
    output = []
1975
    for os_name, os_data in pol.items():
1976
      row = []
1977
      for field in self.op.output_fields:
1978
        if field == "name":
1979
          val = os_name
1980
        elif field == "valid":
1981
          val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1982
        elif field == "node_status":
1983
          # this is just a copy of the dict
1984
          val = {}
1985
          for node_name, nos_list in os_data.items():
1986
            val[node_name] = nos_list
1987
        else:
1988
          raise errors.ParameterError(field)
1989
        row.append(val)
1990
      output.append(row)
1991

    
1992
    return output
1993

    
1994

    
1995
class LURemoveNode(LogicalUnit):
1996
  """Logical unit for removing a node.
1997

1998
  """
1999
  HPATH = "node-remove"
2000
  HTYPE = constants.HTYPE_NODE
2001
  _OP_REQP = ["node_name"]
2002

    
2003
  def BuildHooksEnv(self):
2004
    """Build hooks env.
2005

2006
    This doesn't run on the target node in the pre phase as a failed
2007
    node would then be impossible to remove.
2008

2009
    """
2010
    env = {
2011
      "OP_TARGET": self.op.node_name,
2012
      "NODE_NAME": self.op.node_name,
2013
      }
2014
    all_nodes = self.cfg.GetNodeList()
2015
    all_nodes.remove(self.op.node_name)
2016
    return env, all_nodes, all_nodes
2017

    
2018
  def CheckPrereq(self):
2019
    """Check prerequisites.
2020

2021
    This checks:
2022
     - the node exists in the configuration
2023
     - it does not have primary or secondary instances
2024
     - it's not the master
2025

2026
    Any errors are signaled by raising errors.OpPrereqError.
2027

2028
    """
2029
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2030
    if node is None:
2031
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2032

    
2033
    instance_list = self.cfg.GetInstanceList()
2034

    
2035
    masternode = self.cfg.GetMasterNode()
2036
    if node.name == masternode:
2037
      raise errors.OpPrereqError("Node is the master node,"
2038
                                 " you need to failover first.")
2039

    
2040
    for instance_name in instance_list:
2041
      instance = self.cfg.GetInstanceInfo(instance_name)
2042
      if node.name in instance.all_nodes:
2043
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2044
                                   " please remove first." % instance_name)
2045
    self.op.node_name = node.name
2046
    self.node = node
2047

    
2048
  def Exec(self, feedback_fn):
2049
    """Removes the node from the cluster.
2050

2051
    """
2052
    node = self.node
2053
    logging.info("Stopping the node daemon and removing configs from node %s",
2054
                 node.name)
2055

    
2056
    self.context.RemoveNode(node.name)
2057

    
2058
    result = self.rpc.call_node_leave_cluster(node.name)
2059
    msg = result.fail_msg
2060
    if msg:
2061
      self.LogWarning("Errors encountered on the remote node while leaving"
2062
                      " the cluster: %s", msg)
2063

    
2064
    # Promote nodes to master candidate as needed
2065
    _AdjustCandidatePool(self)
2066

    
2067

    
2068
class LUQueryNodes(NoHooksLU):
2069
  """Logical unit for querying nodes.
2070

2071
  """
2072
  _OP_REQP = ["output_fields", "names", "use_locking"]
2073
  REQ_BGL = False
2074
  _FIELDS_DYNAMIC = utils.FieldSet(
2075
    "dtotal", "dfree",
2076
    "mtotal", "mnode", "mfree",
2077
    "bootid",
2078
    "ctotal", "cnodes", "csockets",
2079
    )
2080

    
2081
  _FIELDS_STATIC = utils.FieldSet(
2082
    "name", "pinst_cnt", "sinst_cnt",
2083
    "pinst_list", "sinst_list",
2084
    "pip", "sip", "tags",
2085
    "serial_no",
2086
    "master_candidate",
2087
    "master",
2088
    "offline",
2089
    "drained",
2090
    "role",
2091
    )
2092

    
2093
  def ExpandNames(self):
2094
    _CheckOutputFields(static=self._FIELDS_STATIC,
2095
                       dynamic=self._FIELDS_DYNAMIC,
2096
                       selected=self.op.output_fields)
2097

    
2098
    self.needed_locks = {}
2099
    self.share_locks[locking.LEVEL_NODE] = 1
2100

    
2101
    if self.op.names:
2102
      self.wanted = _GetWantedNodes(self, self.op.names)
2103
    else:
2104
      self.wanted = locking.ALL_SET
2105

    
2106
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2107
    self.do_locking = self.do_node_query and self.op.use_locking
2108
    if self.do_locking:
2109
      # if we don't request only static fields, we need to lock the nodes
2110
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2111

    
2112

    
2113
  def CheckPrereq(self):
2114
    """Check prerequisites.
2115

2116
    """
2117
    # The validation of the node list is done in the _GetWantedNodes,
2118
    # if non empty, and if empty, there's no validation to do
2119
    pass
2120

    
2121
  def Exec(self, feedback_fn):
2122
    """Computes the list of nodes and their attributes.
2123

2124
    """
2125
    all_info = self.cfg.GetAllNodesInfo()
2126
    if self.do_locking:
2127
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2128
    elif self.wanted != locking.ALL_SET:
2129
      nodenames = self.wanted
2130
      missing = set(nodenames).difference(all_info.keys())
2131
      if missing:
2132
        raise errors.OpExecError(
2133
          "Some nodes were removed before retrieving their data: %s" % missing)
2134
    else:
2135
      nodenames = all_info.keys()
2136

    
2137
    nodenames = utils.NiceSort(nodenames)
2138
    nodelist = [all_info[name] for name in nodenames]
2139

    
2140
    # begin data gathering
2141

    
2142
    if self.do_node_query:
2143
      live_data = {}
2144
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2145
                                          self.cfg.GetHypervisorType())
2146
      for name in nodenames:
2147
        nodeinfo = node_data[name]
2148
        if not nodeinfo.fail_msg and nodeinfo.payload:
2149
          nodeinfo = nodeinfo.payload
2150
          fn = utils.TryConvert
2151
          live_data[name] = {
2152
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2153
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2154
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2155
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2156
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2157
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2158
            "bootid": nodeinfo.get('bootid', None),
2159
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2160
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2161
            }
2162
        else:
2163
          live_data[name] = {}
2164
    else:
2165
      live_data = dict.fromkeys(nodenames, {})
2166

    
2167
    node_to_primary = dict([(name, set()) for name in nodenames])
2168
    node_to_secondary = dict([(name, set()) for name in nodenames])
2169

    
2170
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2171
                             "sinst_cnt", "sinst_list"))
2172
    if inst_fields & frozenset(self.op.output_fields):
2173
      instancelist = self.cfg.GetInstanceList()
2174

    
2175
      for instance_name in instancelist:
2176
        inst = self.cfg.GetInstanceInfo(instance_name)
2177
        if inst.primary_node in node_to_primary:
2178
          node_to_primary[inst.primary_node].add(inst.name)
2179
        for secnode in inst.secondary_nodes:
2180
          if secnode in node_to_secondary:
2181
            node_to_secondary[secnode].add(inst.name)
2182

    
2183
    master_node = self.cfg.GetMasterNode()
2184

    
2185
    # end data gathering
2186

    
2187
    output = []
2188
    for node in nodelist:
2189
      node_output = []
2190
      for field in self.op.output_fields:
2191
        if field == "name":
2192
          val = node.name
2193
        elif field == "pinst_list":
2194
          val = list(node_to_primary[node.name])
2195
        elif field == "sinst_list":
2196
          val = list(node_to_secondary[node.name])
2197
        elif field == "pinst_cnt":
2198
          val = len(node_to_primary[node.name])
2199
        elif field == "sinst_cnt":
2200
          val = len(node_to_secondary[node.name])
2201
        elif field == "pip":
2202
          val = node.primary_ip
2203
        elif field == "sip":
2204
          val = node.secondary_ip
2205
        elif field == "tags":
2206
          val = list(node.GetTags())
2207
        elif field == "serial_no":
2208
          val = node.serial_no
2209
        elif field == "master_candidate":
2210
          val = node.master_candidate
2211
        elif field == "master":
2212
          val = node.name == master_node
2213
        elif field == "offline":
2214
          val = node.offline
2215
        elif field == "drained":
2216
          val = node.drained
2217
        elif self._FIELDS_DYNAMIC.Matches(field):
2218
          val = live_data[node.name].get(field, None)
2219
        elif field == "role":
2220
          if node.name == master_node:
2221
            val = "M"
2222
          elif node.master_candidate:
2223
            val = "C"
2224
          elif node.drained:
2225
            val = "D"
2226
          elif node.offline:
2227
            val = "O"
2228
          else:
2229
            val = "R"
2230
        else:
2231
          raise errors.ParameterError(field)
2232
        node_output.append(val)
2233
      output.append(node_output)
2234

    
2235
    return output
2236

    
2237

    
2238
class LUQueryNodeVolumes(NoHooksLU):
2239
  """Logical unit for getting volumes on node(s).
2240

2241
  """
2242
  _OP_REQP = ["nodes", "output_fields"]
2243
  REQ_BGL = False
2244
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2245
  _FIELDS_STATIC = utils.FieldSet("node")
2246

    
2247
  def ExpandNames(self):
2248
    _CheckOutputFields(static=self._FIELDS_STATIC,
2249
                       dynamic=self._FIELDS_DYNAMIC,
2250
                       selected=self.op.output_fields)
2251

    
2252
    self.needed_locks = {}
2253
    self.share_locks[locking.LEVEL_NODE] = 1
2254
    if not self.op.nodes:
2255
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2256
    else:
2257
      self.needed_locks[locking.LEVEL_NODE] = \
2258
        _GetWantedNodes(self, self.op.nodes)
2259

    
2260
  def CheckPrereq(self):
2261
    """Check prerequisites.
2262

2263
    This checks that the fields required are valid output fields.
2264

2265
    """
2266
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2267

    
2268
  def Exec(self, feedback_fn):
2269
    """Computes the list of nodes and their attributes.
2270

2271
    """
2272
    nodenames = self.nodes
2273
    volumes = self.rpc.call_node_volumes(nodenames)
2274

    
2275
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2276
             in self.cfg.GetInstanceList()]
2277

    
2278
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2279

    
2280
    output = []
2281
    for node in nodenames:
2282
      nresult = volumes[node]
2283
      if nresult.offline:
2284
        continue
2285
      msg = nresult.fail_msg
2286
      if msg:
2287
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2288
        continue
2289

    
2290
      node_vols = nresult.payload[:]
2291
      node_vols.sort(key=lambda vol: vol['dev'])
2292

    
2293
      for vol in node_vols:
2294
        node_output = []
2295
        for field in self.op.output_fields:
2296
          if field == "node":
2297
            val = node
2298
          elif field == "phys":
2299
            val = vol['dev']
2300
          elif field == "vg":
2301
            val = vol['vg']
2302
          elif field == "name":
2303
            val = vol['name']
2304
          elif field == "size":
2305
            val = int(float(vol['size']))
2306
          elif field == "instance":
2307
            for inst in ilist:
2308
              if node not in lv_by_node[inst]:
2309
                continue
2310
              if vol['name'] in lv_by_node[inst][node]:
2311
                val = inst.name
2312
                break
2313
            else:
2314
              val = '-'
2315
          else:
2316
            raise errors.ParameterError(field)
2317
          node_output.append(str(val))
2318

    
2319
        output.append(node_output)
2320

    
2321
    return output
2322

    
2323

    
2324
class LUQueryNodeStorage(NoHooksLU):
2325
  """Logical unit for getting information on storage units on node(s).
2326

2327
  """
2328
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
2329
  REQ_BGL = False
2330
  _FIELDS_STATIC = utils.FieldSet("node")
2331

    
2332
  def ExpandNames(self):
2333
    storage_type = self.op.storage_type
2334

    
2335
    if storage_type not in constants.VALID_STORAGE_FIELDS:
2336
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2337

    
2338
    dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2339

    
2340
    _CheckOutputFields(static=self._FIELDS_STATIC,
2341
                       dynamic=utils.FieldSet(*dynamic_fields),
2342
                       selected=self.op.output_fields)
2343

    
2344
    self.needed_locks = {}
2345
    self.share_locks[locking.LEVEL_NODE] = 1
2346

    
2347
    if self.op.nodes:
2348
      self.needed_locks[locking.LEVEL_NODE] = \
2349
        _GetWantedNodes(self, self.op.nodes)
2350
    else:
2351
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2352

    
2353
  def CheckPrereq(self):
2354
    """Check prerequisites.
2355

2356
    This checks that the fields required are valid output fields.
2357

2358
    """
2359
    self.op.name = getattr(self.op, "name", None)
2360

    
2361
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2362

    
2363
  def Exec(self, feedback_fn):
2364
    """Computes the list of nodes and their attributes.
2365

2366
    """
2367
    # Always get name to sort by
2368
    if constants.SF_NAME in self.op.output_fields:
2369
      fields = self.op.output_fields[:]
2370
    else:
2371
      fields = [constants.SF_NAME] + self.op.output_fields
2372

    
2373
    # Never ask for node as it's only known to the LU
2374
    while "node" in fields:
2375
      fields.remove("node")
2376

    
2377
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2378
    name_idx = field_idx[constants.SF_NAME]
2379

    
2380
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2381
    data = self.rpc.call_storage_list(self.nodes,
2382
                                      self.op.storage_type, st_args,
2383
                                      self.op.name, fields)
2384

    
2385
    result = []
2386

    
2387
    for node in utils.NiceSort(self.nodes):
2388
      nresult = data[node]
2389
      if nresult.offline:
2390
        continue
2391

    
2392
      msg = nresult.fail_msg
2393
      if msg:
2394
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2395
        continue
2396

    
2397
      rows = dict([(row[name_idx], row) for row in nresult.payload])
2398

    
2399
      for name in utils.NiceSort(rows.keys()):
2400
        row = rows[name]
2401

    
2402
        out = []
2403

    
2404
        for field in self.op.output_fields:
2405
          if field == "node":
2406
            val = node
2407
          elif field in field_idx:
2408
            val = row[field_idx[field]]
2409
          else:
2410
            raise errors.ParameterError(field)
2411

    
2412
          out.append(val)
2413

    
2414
        result.append(out)
2415

    
2416
    return result
2417

    
2418

    
2419
class LUModifyNodeStorage(NoHooksLU):
2420
  """Logical unit for modifying a storage volume on a node.
2421

2422
  """
2423
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2424
  REQ_BGL = False
2425

    
2426
  def CheckArguments(self):
2427
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2428
    if node_name is None:
2429
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2430

    
2431
    self.op.node_name = node_name
2432

    
2433
    storage_type = self.op.storage_type
2434
    if storage_type not in constants.VALID_STORAGE_FIELDS:
2435
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2436

    
2437
  def ExpandNames(self):
2438
    self.needed_locks = {
2439
      locking.LEVEL_NODE: self.op.node_name,
2440
      }
2441

    
2442
  def CheckPrereq(self):
2443
    """Check prerequisites.
2444

2445
    """
2446
    storage_type = self.op.storage_type
2447

    
2448
    try:
2449
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2450
    except KeyError:
2451
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
2452
                                 " modified" % storage_type)
2453

    
2454
    diff = set(self.op.changes.keys()) - modifiable
2455
    if diff:
2456
      raise errors.OpPrereqError("The following fields can not be modified for"
2457
                                 " storage units of type '%s': %r" %
2458
                                 (storage_type, list(diff)))
2459

    
2460
  def Exec(self, feedback_fn):
2461
    """Computes the list of nodes and their attributes.
2462

2463
    """
2464
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2465
    result = self.rpc.call_storage_modify(self.op.node_name,
2466
                                          self.op.storage_type, st_args,
2467
                                          self.op.name, self.op.changes)
2468
    result.Raise("Failed to modify storage unit '%s' on %s" %
2469
                 (self.op.name, self.op.node_name))
2470

    
2471

    
2472
class LUAddNode(LogicalUnit):
2473
  """Logical unit for adding node to the cluster.
2474

2475
  """
2476
  HPATH = "node-add"
2477
  HTYPE = constants.HTYPE_NODE
2478
  _OP_REQP = ["node_name"]
2479

    
2480
  def BuildHooksEnv(self):
2481
    """Build hooks env.
2482

2483
    This will run on all nodes before, and on all nodes + the new node after.
2484

2485
    """
2486
    env = {
2487
      "OP_TARGET": self.op.node_name,
2488
      "NODE_NAME": self.op.node_name,
2489
      "NODE_PIP": self.op.primary_ip,
2490
      "NODE_SIP": self.op.secondary_ip,
2491
      }
2492
    nodes_0 = self.cfg.GetNodeList()
2493
    nodes_1 = nodes_0 + [self.op.node_name, ]
2494
    return env, nodes_0, nodes_1
2495

    
2496
  def CheckPrereq(self):
2497
    """Check prerequisites.
2498

2499
    This checks:
2500
     - the new node is not already in the config
2501
     - it is resolvable
2502
     - its parameters (single/dual homed) matches the cluster
2503

2504
    Any errors are signaled by raising errors.OpPrereqError.
2505

2506
    """
2507
    node_name = self.op.node_name
2508
    cfg = self.cfg
2509

    
2510
    dns_data = utils.HostInfo(node_name)
2511

    
2512
    node = dns_data.name
2513
    primary_ip = self.op.primary_ip = dns_data.ip
2514
    secondary_ip = getattr(self.op, "secondary_ip", None)
2515
    if secondary_ip is None:
2516
      secondary_ip = primary_ip
2517
    if not utils.IsValidIP(secondary_ip):
2518
      raise errors.OpPrereqError("Invalid secondary IP given")
2519
    self.op.secondary_ip = secondary_ip
2520

    
2521
    node_list = cfg.GetNodeList()
2522
    if not self.op.readd and node in node_list:
2523
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2524
                                 node)
2525
    elif self.op.readd and node not in node_list:
2526
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2527

    
2528
    for existing_node_name in node_list:
2529
      existing_node = cfg.GetNodeInfo(existing_node_name)
2530

    
2531
      if self.op.readd and node == existing_node_name:
2532
        if (existing_node.primary_ip != primary_ip or
2533
            existing_node.secondary_ip != secondary_ip):
2534
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2535
                                     " address configuration as before")
2536
        continue
2537

    
2538
      if (existing_node.primary_ip == primary_ip or
2539
          existing_node.secondary_ip == primary_ip or
2540
          existing_node.primary_ip == secondary_ip or
2541
          existing_node.secondary_ip == secondary_ip):
2542
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2543
                                   " existing node %s" % existing_node.name)
2544

    
2545
    # check that the type of the node (single versus dual homed) is the
2546
    # same as for the master
2547
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2548
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2549
    newbie_singlehomed = secondary_ip == primary_ip
2550
    if master_singlehomed != newbie_singlehomed:
2551
      if master_singlehomed:
2552
        raise errors.OpPrereqError("The master has no private ip but the"
2553
                                   " new node has one")
2554
      else:
2555
        raise errors.OpPrereqError("The master has a private ip but the"
2556
                                   " new node doesn't have one")
2557

    
2558
    # checks reachability
2559
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2560
      raise errors.OpPrereqError("Node not reachable by ping")
2561

    
2562
    if not newbie_singlehomed:
2563
      # check reachability from my secondary ip to newbie's secondary ip
2564
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2565
                           source=myself.secondary_ip):
2566
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2567
                                   " based ping to noded port")
2568

    
2569
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2570
    if self.op.readd:
2571
      exceptions = [node]
2572
    else:
2573
      exceptions = []
2574
    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2575
    # the new node will increase mc_max with one, so:
2576
    mc_max = min(mc_max + 1, cp_size)
2577
    self.master_candidate = mc_now < mc_max
2578

    
2579
    if self.op.readd:
2580
      self.new_node = self.cfg.GetNodeInfo(node)
2581
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2582
    else:
2583
      self.new_node = objects.Node(name=node,
2584
                                   primary_ip=primary_ip,
2585
                                   secondary_ip=secondary_ip,
2586
                                   master_candidate=self.master_candidate,
2587
                                   offline=False, drained=False)
2588

    
2589
  def Exec(self, feedback_fn):
2590
    """Adds the new node to the cluster.
2591

2592
    """
2593
    new_node = self.new_node
2594
    node = new_node.name
2595

    
2596
    # for re-adds, reset the offline/drained/master-candidate flags;
2597
    # we need to reset here, otherwise offline would prevent RPC calls
2598
    # later in the procedure; this also means that if the re-add
2599
    # fails, we are left with a non-offlined, broken node
2600
    if self.op.readd:
2601
      new_node.drained = new_node.offline = False
2602
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2603
      # if we demote the node, we do cleanup later in the procedure
2604
      new_node.master_candidate = self.master_candidate
2605

    
2606
    # notify the user about any possible mc promotion
2607
    if new_node.master_candidate:
2608
      self.LogInfo("Node will be a master candidate")
2609

    
2610
    # check connectivity
2611
    result = self.rpc.call_version([node])[node]
2612
    result.Raise("Can't get version information from node %s" % node)
2613
    if constants.PROTOCOL_VERSION == result.payload:
2614
      logging.info("Communication to node %s fine, sw version %s match",
2615
                   node, result.payload)
2616
    else:
2617
      raise errors.OpExecError("Version mismatch master version %s,"
2618
                               " node version %s" %
2619
                               (constants.PROTOCOL_VERSION, result.payload))
2620

    
2621
    # setup ssh on node
2622
    logging.info("Copy ssh key to node %s", node)
2623
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2624
    keyarray = []
2625
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2626
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2627
                priv_key, pub_key]
2628

    
2629
    for i in keyfiles:
2630
      f = open(i, 'r')
2631
      try:
2632
        keyarray.append(f.read())
2633
      finally:
2634
        f.close()
2635

    
2636
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2637
                                    keyarray[2],
2638
                                    keyarray[3], keyarray[4], keyarray[5])
2639
    result.Raise("Cannot transfer ssh keys to the new node")
2640

    
2641
    # Add node to our /etc/hosts, and add key to known_hosts
2642
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2643
      utils.AddHostToEtcHosts(new_node.name)
2644

    
2645
    if new_node.secondary_ip != new_node.primary_ip:
2646
      result = self.rpc.call_node_has_ip_address(new_node.name,
2647
                                                 new_node.secondary_ip)
2648
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2649
                   prereq=True)
2650
      if not result.payload:
2651
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2652
                                 " you gave (%s). Please fix and re-run this"
2653
                                 " command." % new_node.secondary_ip)
2654

    
2655
    node_verify_list = [self.cfg.GetMasterNode()]
2656
    node_verify_param = {
2657
      'nodelist': [node],
2658
      # TODO: do a node-net-test as well?
2659
    }
2660

    
2661
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2662
                                       self.cfg.GetClusterName())
2663
    for verifier in node_verify_list:
2664
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2665
      nl_payload = result[verifier].payload['nodelist']
2666
      if nl_payload:
2667
        for failed in nl_payload:
2668
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2669
                      (verifier, nl_payload[failed]))
2670
        raise errors.OpExecError("ssh/hostname verification failed.")
2671

    
2672
    if self.op.readd:
2673
      _RedistributeAncillaryFiles(self)
2674
      self.context.ReaddNode(new_node)
2675
      # make sure we redistribute the config
2676
      self.cfg.Update(new_node)
2677
      # and make sure the new node will not have old files around
2678
      if not new_node.master_candidate:
2679
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2680
        msg = result.RemoteFailMsg()
2681
        if msg:
2682
          self.LogWarning("Node failed to demote itself from master"
2683
                          " candidate status: %s" % msg)
2684
    else:
2685
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2686
      self.context.AddNode(new_node)
2687

    
2688

    
2689
class LUSetNodeParams(LogicalUnit):
2690
  """Modifies the parameters of a node.
2691

2692
  """
2693
  HPATH = "node-modify"
2694
  HTYPE = constants.HTYPE_NODE
2695
  _OP_REQP = ["node_name"]
2696
  REQ_BGL = False
2697

    
2698
  def CheckArguments(self):
2699
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2700
    if node_name is None:
2701
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2702
    self.op.node_name = node_name
2703
    _CheckBooleanOpField(self.op, 'master_candidate')
2704
    _CheckBooleanOpField(self.op, 'offline')
2705
    _CheckBooleanOpField(self.op, 'drained')
2706
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2707
    if all_mods.count(None) == 3:
2708
      raise errors.OpPrereqError("Please pass at least one modification")
2709
    if all_mods.count(True) > 1:
2710
      raise errors.OpPrereqError("Can't set the node into more than one"
2711
                                 " state at the same time")
2712

    
2713
  def ExpandNames(self):
2714
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2715

    
2716
  def BuildHooksEnv(self):
2717
    """Build hooks env.
2718

2719
    This runs on the master node.
2720

2721
    """
2722
    env = {
2723
      "OP_TARGET": self.op.node_name,
2724
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2725
      "OFFLINE": str(self.op.offline),
2726
      "DRAINED": str(self.op.drained),
2727
      }
2728
    nl = [self.cfg.GetMasterNode(),
2729
          self.op.node_name]
2730
    return env, nl, nl
2731

    
2732
  def CheckPrereq(self):
2733
    """Check prerequisites.
2734

2735
    This only checks the instance list against the existing names.
2736

2737
    """
2738
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2739

    
2740
    if ((self.op.master_candidate == False or self.op.offline == True or
2741
         self.op.drained == True) and node.master_candidate):
2742
      # we will demote the node from master_candidate
2743
      if self.op.node_name == self.cfg.GetMasterNode():
2744
        raise errors.OpPrereqError("The master node has to be a"
2745
                                   " master candidate, online and not drained")
2746
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2747
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2748
      if num_candidates <= cp_size:
2749
        msg = ("Not enough master candidates (desired"
2750
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2751
        if self.op.force:
2752
          self.LogWarning(msg)
2753
        else:
2754
          raise errors.OpPrereqError(msg)
2755

    
2756
    if (self.op.master_candidate == True and
2757
        ((node.offline and not self.op.offline == False) or
2758
         (node.drained and not self.op.drained == False))):
2759
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2760
                                 " to master_candidate" % node.name)
2761

    
2762
    return
2763

    
2764
  def Exec(self, feedback_fn):
2765
    """Modifies a node.
2766

2767
    """
2768
    node = self.node
2769

    
2770
    result = []
2771
    changed_mc = False
2772

    
2773
    if self.op.offline is not None:
2774
      node.offline = self.op.offline
2775
      result.append(("offline", str(self.op.offline)))
2776
      if self.op.offline == True:
2777
        if node.master_candidate:
2778
          node.master_candidate = False
2779
          changed_mc = True
2780
          result.append(("master_candidate", "auto-demotion due to offline"))
2781
        if node.drained:
2782
          node.drained = False
2783
          result.append(("drained", "clear drained status due to offline"))
2784

    
2785
    if self.op.master_candidate is not None:
2786
      node.master_candidate = self.op.master_candidate
2787
      changed_mc = True
2788
      result.append(("master_candidate", str(self.op.master_candidate)))
2789
      if self.op.master_candidate == False:
2790
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2791
        msg = rrc.fail_msg
2792
        if msg:
2793
          self.LogWarning("Node failed to demote itself: %s" % msg)
2794

    
2795
    if self.op.drained is not None:
2796
      node.drained = self.op.drained
2797
      result.append(("drained", str(self.op.drained)))
2798
      if self.op.drained == True:
2799
        if node.master_candidate:
2800
          node.master_candidate = False
2801
          changed_mc = True
2802
          result.append(("master_candidate", "auto-demotion due to drain"))
2803
          rrc = self.rpc.call_node_demote_from_mc(node.name)
2804
          msg = rrc.RemoteFailMsg()
2805
          if msg:
2806
            self.LogWarning("Node failed to demote itself: %s" % msg)
2807
        if node.offline:
2808
          node.offline = False
2809
          result.append(("offline", "clear offline status due to drain"))
2810

    
2811
    # this will trigger configuration file update, if needed
2812
    self.cfg.Update(node)
2813
    # this will trigger job queue propagation or cleanup
2814
    if changed_mc:
2815
      self.context.ReaddNode(node)
2816

    
2817
    return result
2818

    
2819

    
2820
class LUPowercycleNode(NoHooksLU):
2821
  """Powercycles a node.
2822

2823
  """
2824
  _OP_REQP = ["node_name", "force"]
2825
  REQ_BGL = False
2826

    
2827
  def CheckArguments(self):
2828
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2829
    if node_name is None:
2830
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2831
    self.op.node_name = node_name
2832
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
2833
      raise errors.OpPrereqError("The node is the master and the force"
2834
                                 " parameter was not set")
2835

    
2836
  def ExpandNames(self):
2837
    """Locking for PowercycleNode.
2838

2839
    This is a last-resort option and shouldn't block on other
2840
    jobs. Therefore, we grab no locks.
2841

2842
    """
2843
    self.needed_locks = {}
2844

    
2845
  def CheckPrereq(self):
2846
    """Check prerequisites.
2847

2848
    This LU has no prereqs.
2849

2850
    """
2851
    pass
2852

    
2853
  def Exec(self, feedback_fn):
2854
    """Reboots a node.
2855

2856
    """
2857
    result = self.rpc.call_node_powercycle(self.op.node_name,
2858
                                           self.cfg.GetHypervisorType())
2859
    result.Raise("Failed to schedule the reboot")
2860
    return result.payload
2861

    
2862

    
2863
class LUQueryClusterInfo(NoHooksLU):
2864
  """Query cluster configuration.
2865

2866
  """
2867
  _OP_REQP = []
2868
  REQ_BGL = False
2869

    
2870
  def ExpandNames(self):
2871
    self.needed_locks = {}
2872

    
2873
  def CheckPrereq(self):
2874
    """No prerequsites needed for this LU.
2875

2876
    """
2877
    pass
2878

    
2879
  def Exec(self, feedback_fn):
2880
    """Return cluster config.
2881

2882
    """
2883
    cluster = self.cfg.GetClusterInfo()
2884
    result = {
2885
      "software_version": constants.RELEASE_VERSION,
2886
      "protocol_version": constants.PROTOCOL_VERSION,
2887
      "config_version": constants.CONFIG_VERSION,
2888
      "os_api_version": max(constants.OS_API_VERSIONS),
2889
      "export_version": constants.EXPORT_VERSION,
2890
      "architecture": (platform.architecture()[0], platform.machine()),
2891
      "name": cluster.cluster_name,
2892
      "master": cluster.master_node,
2893
      "default_hypervisor": cluster.enabled_hypervisors[0],
2894
      "enabled_hypervisors": cluster.enabled_hypervisors,
2895
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2896
                        for hypervisor_name in cluster.enabled_hypervisors]),
2897
      "beparams": cluster.beparams,
2898
      "nicparams": cluster.nicparams,
2899
      "candidate_pool_size": cluster.candidate_pool_size,
2900
      "master_netdev": cluster.master_netdev,
2901
      "volume_group_name": cluster.volume_group_name,
2902
      "file_storage_dir": cluster.file_storage_dir,
2903
      }
2904

    
2905
    return result
2906

    
2907

    
2908
class LUQueryConfigValues(NoHooksLU):
2909
  """Return configuration values.
2910

2911
  """
2912
  _OP_REQP = []
2913
  REQ_BGL = False
2914
  _FIELDS_DYNAMIC = utils.FieldSet()
2915
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2916

    
2917
  def ExpandNames(self):
2918
    self.needed_locks = {}
2919

    
2920
    _CheckOutputFields(static=self._FIELDS_STATIC,
2921
                       dynamic=self._FIELDS_DYNAMIC,
2922
                       selected=self.op.output_fields)
2923

    
2924
  def CheckPrereq(self):
2925
    """No prerequisites.
2926

2927
    """
2928
    pass
2929

    
2930
  def Exec(self, feedback_fn):
2931
    """Dump a representation of the cluster config to the standard output.
2932

2933
    """
2934
    values = []
2935
    for field in self.op.output_fields:
2936
      if field == "cluster_name":
2937
        entry = self.cfg.GetClusterName()
2938
      elif field == "master_node":
2939
        entry = self.cfg.GetMasterNode()
2940
      elif field == "drain_flag":
2941
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2942
      else:
2943
        raise errors.ParameterError(field)
2944
      values.append(entry)
2945
    return values
2946

    
2947

    
2948
class LUActivateInstanceDisks(NoHooksLU):
2949
  """Bring up an instance's disks.
2950

2951
  """
2952
  _OP_REQP = ["instance_name"]
2953
  REQ_BGL = False
2954

    
2955
  def ExpandNames(self):
2956
    self._ExpandAndLockInstance()
2957
    self.needed_locks[locking.LEVEL_NODE] = []
2958
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2959

    
2960
  def DeclareLocks(self, level):
2961
    if level == locking.LEVEL_NODE:
2962
      self._LockInstancesNodes()
2963

    
2964
  def CheckPrereq(self):
2965
    """Check prerequisites.
2966

2967
    This checks that the instance is in the cluster.
2968

2969
    """
2970
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2971
    assert self.instance is not None, \
2972
      "Cannot retrieve locked instance %s" % self.op.instance_name
2973
    _CheckNodeOnline(self, self.instance.primary_node)
2974

    
2975
  def Exec(self, feedback_fn):
2976
    """Activate the disks.
2977

2978
    """
2979
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2980
    if not disks_ok:
2981
      raise errors.OpExecError("Cannot activate block devices")
2982

    
2983
    return disks_info
2984

    
2985

    
2986
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2987
  """Prepare the block devices for an instance.
2988

2989
  This sets up the block devices on all nodes.
2990

2991
  @type lu: L{LogicalUnit}
2992
  @param lu: the logical unit on whose behalf we execute
2993
  @type instance: L{objects.Instance}
2994
  @param instance: the instance for whose disks we assemble
2995
  @type ignore_secondaries: boolean
2996
  @param ignore_secondaries: if true, errors on secondary nodes
2997
      won't result in an error return from the function
2998
  @return: False if the operation failed, otherwise a list of
2999
      (host, instance_visible_name, node_visible_name)
3000
      with the mapping from node devices to instance devices
3001

3002
  """
3003
  device_info = []
3004
  disks_ok = True
3005
  iname = instance.name
3006
  # With the two passes mechanism we try to reduce the window of
3007
  # opportunity for the race condition of switching DRBD to primary
3008
  # before handshaking occured, but we do not eliminate it
3009

    
3010
  # The proper fix would be to wait (with some limits) until the
3011
  # connection has been made and drbd transitions from WFConnection
3012
  # into any other network-connected state (Connected, SyncTarget,
3013
  # SyncSource, etc.)
3014

    
3015
  # 1st pass, assemble on all nodes in secondary mode
3016
  for inst_disk in instance.disks:
3017
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3018
      lu.cfg.SetDiskID(node_disk, node)
3019
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3020
      msg = result.fail_msg
3021
      if msg:
3022
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3023
                           " (is_primary=False, pass=1): %s",
3024
                           inst_disk.iv_name, node, msg)
3025
        if not ignore_secondaries:
3026
          disks_ok = False
3027

    
3028
  # FIXME: race condition on drbd migration to primary
3029

    
3030
  # 2nd pass, do only the primary node
3031
  for inst_disk in instance.disks:
3032
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3033
      if node != instance.primary_node:
3034
        continue
3035
      lu.cfg.SetDiskID(node_disk, node)
3036
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3037
      msg = result.fail_msg
3038
      if msg:
3039
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3040
                           " (is_primary=True, pass=2): %s",
3041
                           inst_disk.iv_name, node, msg)
3042
        disks_ok = False
3043
    device_info.append((instance.primary_node, inst_disk.iv_name,
3044
                        result.payload))
3045

    
3046
  # leave the disks configured for the primary node
3047
  # this is a workaround that would be fixed better by
3048
  # improving the logical/physical id handling
3049
  for disk in instance.disks:
3050
    lu.cfg.SetDiskID(disk, instance.primary_node)
3051

    
3052
  return disks_ok, device_info
3053

    
3054

    
3055
def _StartInstanceDisks(lu, instance, force):
3056
  """Start the disks of an instance.
3057

3058
  """
3059
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3060
                                           ignore_secondaries=force)
3061
  if not disks_ok:
3062
    _ShutdownInstanceDisks(lu, instance)
3063
    if force is not None and not force:
3064
      lu.proc.LogWarning("", hint="If the message above refers to a"
3065
                         " secondary node,"
3066
                         " you can retry the operation using '--force'.")
3067
    raise errors.OpExecError("Disk consistency error")
3068

    
3069

    
3070
class LUDeactivateInstanceDisks(NoHooksLU):
3071
  """Shutdown an instance's disks.
3072

3073
  """
3074
  _OP_REQP = ["instance_name"]
3075
  REQ_BGL = False
3076

    
3077
  def ExpandNames(self):
3078
    self._ExpandAndLockInstance()
3079
    self.needed_locks[locking.LEVEL_NODE] = []
3080
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3081

    
3082
  def DeclareLocks(self, level):
3083
    if level == locking.LEVEL_NODE:
3084
      self._LockInstancesNodes()
3085

    
3086
  def CheckPrereq(self):
3087
    """Check prerequisites.
3088

3089
    This checks that the instance is in the cluster.
3090

3091
    """
3092
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3093
    assert self.instance is not None, \
3094
      "Cannot retrieve locked instance %s" % self.op.instance_name
3095

    
3096
  def Exec(self, feedback_fn):
3097
    """Deactivate the disks
3098

3099
    """
3100
    instance = self.instance
3101
    _SafeShutdownInstanceDisks(self, instance)
3102

    
3103

    
3104
def _SafeShutdownInstanceDisks(lu, instance):
3105
  """Shutdown block devices of an instance.
3106

3107
  This function checks if an instance is running, before calling
3108
  _ShutdownInstanceDisks.
3109

3110
  """
3111
  pnode = instance.primary_node
3112
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3113
  ins_l.Raise("Can't contact node %s" % pnode)
3114

    
3115
  if instance.name in ins_l.payload:
3116
    raise errors.OpExecError("Instance is running, can't shutdown"
3117
                             " block devices.")
3118

    
3119
  _ShutdownInstanceDisks(lu, instance)
3120

    
3121

    
3122
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3123
  """Shutdown block devices of an instance.
3124

3125
  This does the shutdown on all nodes of the instance.
3126

3127
  If the ignore_primary is false, errors on the primary node are
3128
  ignored.
3129

3130
  """
3131
  all_result = True
3132
  for disk in instance.disks:
3133
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3134
      lu.cfg.SetDiskID(top_disk, node)
3135
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3136
      msg = result.fail_msg
3137
      if msg:
3138
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3139
                      disk.iv_name, node, msg)
3140
        if not ignore_primary or node != instance.primary_node:
3141
          all_result = False
3142
  return all_result
3143

    
3144

    
3145
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3146
  """Checks if a node has enough free memory.
3147

3148
  This function check if a given node has the needed amount of free
3149
  memory. In case the node has less memory or we cannot get the
3150
  information from the node, this function raise an OpPrereqError
3151
  exception.
3152

3153
  @type lu: C{LogicalUnit}
3154
  @param lu: a logical unit from which we get configuration data
3155
  @type node: C{str}
3156
  @param node: the node to check
3157
  @type reason: C{str}
3158
  @param reason: string to use in the error message
3159
  @type requested: C{int}
3160
  @param requested: the amount of memory in MiB to check for
3161
  @type hypervisor_name: C{str}
3162
  @param hypervisor_name: the hypervisor to ask for memory stats
3163
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3164
      we cannot check the node
3165

3166
  """
3167
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3168
  nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3169
  free_mem = nodeinfo[node].payload.get('memory_free', None)
3170
  if not isinstance(free_mem, int):
3171
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3172
                               " was '%s'" % (node, free_mem))
3173
  if requested > free_mem:
3174
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3175
                               " needed %s MiB, available %s MiB" %
3176
                               (node, reason, requested, free_mem))
3177

    
3178

    
3179
class LUStartupInstance(LogicalUnit):
3180
  """Starts an instance.
3181

3182
  """
3183
  HPATH = "instance-start"
3184
  HTYPE = constants.HTYPE_INSTANCE
3185
  _OP_REQP = ["instance_name", "force"]
3186
  REQ_BGL = False
3187

    
3188
  def ExpandNames(self):
3189
    self._ExpandAndLockInstance()
3190

    
3191
  def BuildHooksEnv(self):
3192
    """Build hooks env.
3193

3194
    This runs on master, primary and secondary nodes of the instance.
3195

3196
    """
3197
    env = {
3198
      "FORCE": self.op.force,
3199
      }
3200
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3201
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3202
    return env, nl, nl
3203

    
3204
  def CheckPrereq(self):
3205
    """Check prerequisites.
3206

3207
    This checks that the instance is in the cluster.
3208

3209
    """
3210
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3211
    assert self.instance is not None, \
3212
      "Cannot retrieve locked instance %s" % self.op.instance_name
3213

    
3214
    # extra beparams
3215
    self.beparams = getattr(self.op, "beparams", {})
3216
    if self.beparams:
3217
      if not isinstance(self.beparams, dict):
3218
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3219
                                   " dict" % (type(self.beparams), ))
3220
      # fill the beparams dict
3221
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3222
      self.op.beparams = self.beparams
3223

    
3224
    # extra hvparams
3225
    self.hvparams = getattr(self.op, "hvparams", {})
3226
    if self.hvparams:
3227
      if not isinstance(self.hvparams, dict):
3228
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3229
                                   " dict" % (type(self.hvparams), ))
3230

    
3231
      # check hypervisor parameter syntax (locally)
3232
      cluster = self.cfg.GetClusterInfo()
3233
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3234
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3235
                                    instance.hvparams)
3236
      filled_hvp.update(self.hvparams)
3237
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3238
      hv_type.CheckParameterSyntax(filled_hvp)
3239
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3240
      self.op.hvparams = self.hvparams
3241

    
3242
    _CheckNodeOnline(self, instance.primary_node)
3243

    
3244
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3245
    # check bridges existence
3246
    _CheckInstanceBridgesExist(self, instance)
3247

    
3248
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3249
                                              instance.name,
3250
                                              instance.hypervisor)
3251
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3252
                      prereq=True)
3253
    if not remote_info.payload: # not running already
3254
      _CheckNodeFreeMemory(self, instance.primary_node,
3255
                           "starting instance %s" % instance.name,
3256
                           bep[constants.BE_MEMORY], instance.hypervisor)
3257

    
3258
  def Exec(self, feedback_fn):
3259
    """Start the instance.
3260

3261
    """
3262
    instance = self.instance
3263
    force = self.op.force
3264

    
3265
    self.cfg.MarkInstanceUp(instance.name)
3266

    
3267
    node_current = instance.primary_node
3268

    
3269
    _StartInstanceDisks(self, instance, force)
3270

    
3271
    result = self.rpc.call_instance_start(node_current, instance,
3272
                                          self.hvparams, self.beparams)
3273
    msg = result.fail_msg
3274
    if msg:
3275
      _ShutdownInstanceDisks(self, instance)
3276
      raise errors.OpExecError("Could not start instance: %s" % msg)
3277

    
3278

    
3279
class LURebootInstance(LogicalUnit):
3280
  """Reboot an instance.
3281

3282
  """
3283
  HPATH = "instance-reboot"
3284
  HTYPE = constants.HTYPE_INSTANCE
3285
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3286
  REQ_BGL = False
3287

    
3288
  def ExpandNames(self):
3289
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3290
                                   constants.INSTANCE_REBOOT_HARD,
3291
                                   constants.INSTANCE_REBOOT_FULL]:
3292
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3293
                                  (constants.INSTANCE_REBOOT_SOFT,
3294
                                   constants.INSTANCE_REBOOT_HARD,
3295
                                   constants.INSTANCE_REBOOT_FULL))
3296
    self._ExpandAndLockInstance()
3297

    
3298
  def BuildHooksEnv(self):
3299
    """Build hooks env.
3300

3301
    This runs on master, primary and secondary nodes of the instance.
3302

3303
    """
3304
    env = {
3305
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3306
      "REBOOT_TYPE": self.op.reboot_type,
3307
      }
3308
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3309
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3310
    return env, nl, nl
3311

    
3312
  def CheckPrereq(self):
3313
    """Check prerequisites.
3314

3315
    This checks that the instance is in the cluster.
3316

3317
    """
3318
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3319
    assert self.instance is not None, \
3320
      "Cannot retrieve locked instance %s" % self.op.instance_name
3321

    
3322
    _CheckNodeOnline(self, instance.primary_node)
3323

    
3324
    # check bridges existence
3325
    _CheckInstanceBridgesExist(self, instance)
3326

    
3327
  def Exec(self, feedback_fn):
3328
    """Reboot the instance.
3329

3330
    """
3331
    instance = self.instance
3332
    ignore_secondaries = self.op.ignore_secondaries
3333
    reboot_type = self.op.reboot_type
3334

    
3335
    node_current = instance.primary_node
3336

    
3337
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3338
                       constants.INSTANCE_REBOOT_HARD]:
3339
      for disk in instance.disks:
3340
        self.cfg.SetDiskID(disk, node_current)
3341
      result = self.rpc.call_instance_reboot(node_current, instance,
3342
                                             reboot_type)
3343
      result.Raise("Could not reboot instance")
3344
    else:
3345
      result = self.rpc.call_instance_shutdown(node_current, instance)
3346
      result.Raise("Could not shutdown instance for full reboot")
3347
      _ShutdownInstanceDisks(self, instance)
3348
      _StartInstanceDisks(self, instance, ignore_secondaries)
3349
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3350
      msg = result.fail_msg
3351
      if msg:
3352
        _ShutdownInstanceDisks(self, instance)
3353
        raise errors.OpExecError("Could not start instance for"
3354
                                 " full reboot: %s" % msg)
3355

    
3356
    self.cfg.MarkInstanceUp(instance.name)
3357

    
3358

    
3359
class LUShutdownInstance(LogicalUnit):
3360
  """Shutdown an instance.
3361

3362
  """
3363
  HPATH = "instance-stop"
3364
  HTYPE = constants.HTYPE_INSTANCE
3365
  _OP_REQP = ["instance_name"]
3366
  REQ_BGL = False
3367

    
3368
  def ExpandNames(self):
3369
    self._ExpandAndLockInstance()
3370

    
3371
  def BuildHooksEnv(self):
3372
    """Build hooks env.
3373

3374
    This runs on master, primary and secondary nodes of the instance.
3375

3376
    """
3377
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3378
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3379
    return env, nl, nl
3380

    
3381
  def CheckPrereq(self):
3382
    """Check prerequisites.
3383

3384
    This checks that the instance is in the cluster.
3385

3386
    """
3387
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3388
    assert self.instance is not None, \
3389
      "Cannot retrieve locked instance %s" % self.op.instance_name
3390
    _CheckNodeOnline(self, self.instance.primary_node)
3391

    
3392
  def Exec(self, feedback_fn):
3393
    """Shutdown the instance.
3394

3395
    """
3396
    instance = self.instance
3397
    node_current = instance.primary_node
3398
    self.cfg.MarkInstanceDown(instance.name)
3399
    result = self.rpc.call_instance_shutdown(node_current, instance)
3400
    msg = result.fail_msg
3401
    if msg:
3402
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3403

    
3404
    _ShutdownInstanceDisks(self, instance)
3405

    
3406

    
3407
class LUReinstallInstance(LogicalUnit):
3408
  """Reinstall an instance.
3409

3410
  """
3411
  HPATH = "instance-reinstall"
3412
  HTYPE = constants.HTYPE_INSTANCE
3413
  _OP_REQP = ["instance_name"]
3414
  REQ_BGL = False
3415

    
3416
  def ExpandNames(self):
3417
    self._ExpandAndLockInstance()
3418

    
3419
  def BuildHooksEnv(self):
3420
    """Build hooks env.
3421

3422
    This runs on master, primary and secondary nodes of the instance.
3423

3424
    """
3425
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3426
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3427
    return env, nl, nl
3428

    
3429
  def CheckPrereq(self):
3430
    """Check prerequisites.
3431

3432
    This checks that the instance is in the cluster and is not running.
3433

3434
    """
3435
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3436
    assert instance is not None, \
3437
      "Cannot retrieve locked instance %s" % self.op.instance_name
3438
    _CheckNodeOnline(self, instance.primary_node)
3439

    
3440
    if instance.disk_template == constants.DT_DISKLESS:
3441
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3442
                                 self.op.instance_name)
3443
    if instance.admin_up:
3444
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3445
                                 self.op.instance_name)
3446
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3447
                                              instance.name,
3448
                                              instance.hypervisor)
3449
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3450
                      prereq=True)
3451
    if remote_info.payload:
3452
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3453
                                 (self.op.instance_name,
3454
                                  instance.primary_node))
3455

    
3456
    self.op.os_type = getattr(self.op, "os_type", None)
3457
    if self.op.os_type is not None:
3458
      # OS verification
3459
      pnode = self.cfg.GetNodeInfo(
3460
        self.cfg.ExpandNodeName(instance.primary_node))
3461
      if pnode is None:
3462
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3463
                                   self.op.pnode)
3464
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3465
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3466
                   (self.op.os_type, pnode.name), prereq=True)
3467

    
3468
    self.instance = instance
3469

    
3470
  def Exec(self, feedback_fn):
3471
    """Reinstall the instance.
3472

3473
    """
3474
    inst = self.instance
3475

    
3476
    if self.op.os_type is not None:
3477
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3478
      inst.os = self.op.os_type
3479
      self.cfg.Update(inst)
3480

    
3481
    _StartInstanceDisks(self, inst, None)
3482
    try:
3483
      feedback_fn("Running the instance OS create scripts...")
3484
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3485
      result.Raise("Could not install OS for instance %s on node %s" %
3486
                   (inst.name, inst.primary_node))
3487
    finally:
3488
      _ShutdownInstanceDisks(self, inst)
3489

    
3490

    
3491
class LURenameInstance(LogicalUnit):
3492
  """Rename an instance.
3493

3494
  """
3495
  HPATH = "instance-rename"
3496
  HTYPE = constants.HTYPE_INSTANCE
3497
  _OP_REQP = ["instance_name", "new_name"]
3498

    
3499
  def BuildHooksEnv(self):
3500
    """Build hooks env.
3501

3502
    This runs on master, primary and secondary nodes of the instance.
3503

3504
    """
3505
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3506
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3507
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3508
    return env, nl, nl
3509

    
3510
  def CheckPrereq(self):
3511
    """Check prerequisites.
3512

3513
    This checks that the instance is in the cluster and is not running.
3514

3515
    """
3516
    instance = self.cfg.GetInstanceInfo(
3517
      self.cfg.ExpandInstanceName(self.op.instance_name))
3518
    if instance is None:
3519
      raise errors.OpPrereqError("Instance '%s' not known" %
3520
                                 self.op.instance_name)
3521
    _CheckNodeOnline(self, instance.primary_node)
3522

    
3523
    if instance.admin_up:
3524
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3525
                                 self.op.instance_name)
3526
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3527
                                              instance.name,
3528
                                              instance.hypervisor)
3529
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3530
                      prereq=True)
3531
    if remote_info.payload:
3532
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3533
                                 (self.op.instance_name,
3534
                                  instance.primary_node))
3535
    self.instance = instance
3536

    
3537
    # new name verification
3538
    name_info = utils.HostInfo(self.op.new_name)
3539

    
3540
    self.op.new_name = new_name = name_info.name
3541
    instance_list = self.cfg.GetInstanceList()
3542
    if new_name in instance_list:
3543
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3544
                                 new_name)
3545

    
3546
    if not getattr(self.op, "ignore_ip", False):
3547
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3548
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3549
                                   (name_info.ip, new_name))
3550

    
3551

    
3552
  def Exec(self, feedback_fn):
3553
    """Reinstall the instance.
3554

3555
    """
3556
    inst = self.instance
3557
    old_name = inst.name
3558

    
3559
    if inst.disk_template == constants.DT_FILE:
3560
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3561

    
3562
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3563
    # Change the instance lock. This is definitely safe while we hold the BGL
3564
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3565
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3566

    
3567
    # re-read the instance from the configuration after rename
3568
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3569

    
3570
    if inst.disk_template == constants.DT_FILE:
3571
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3572
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3573
                                                     old_file_storage_dir,
3574
                                                     new_file_storage_dir)
3575
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
3576
                   " (but the instance has been renamed in Ganeti)" %
3577
                   (inst.primary_node, old_file_storage_dir,
3578
                    new_file_storage_dir))
3579

    
3580
    _StartInstanceDisks(self, inst, None)
3581
    try:
3582
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3583
                                                 old_name)
3584
      msg = result.fail_msg
3585
      if msg:
3586
        msg = ("Could not run OS rename script for instance %s on node %s"
3587
               " (but the instance has been renamed in Ganeti): %s" %
3588
               (inst.name, inst.primary_node, msg))
3589
        self.proc.LogWarning(msg)
3590
    finally:
3591
      _ShutdownInstanceDisks(self, inst)
3592

    
3593

    
3594
class LURemoveInstance(LogicalUnit):
3595
  """Remove an instance.
3596

3597
  """
3598
  HPATH = "instance-remove"
3599
  HTYPE = constants.HTYPE_INSTANCE
3600
  _OP_REQP = ["instance_name", "ignore_failures"]
3601
  REQ_BGL = False
3602

    
3603
  def ExpandNames(self):
3604
    self._ExpandAndLockInstance()
3605
    self.needed_locks[locking.LEVEL_NODE] = []
3606
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3607

    
3608
  def DeclareLocks(self, level):
3609
    if level == locking.LEVEL_NODE:
3610
      self._LockInstancesNodes()
3611

    
3612
  def BuildHooksEnv(self):
3613
    """Build hooks env.
3614

3615
    This runs on master, primary and secondary nodes of the instance.
3616

3617
    """
3618
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3619
    nl = [self.cfg.GetMasterNode()]
3620
    return env, nl, nl
3621

    
3622
  def CheckPrereq(self):
3623
    """Check prerequisites.
3624

3625
    This checks that the instance is in the cluster.
3626

3627
    """
3628
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3629
    assert self.instance is not None, \
3630
      "Cannot retrieve locked instance %s" % self.op.instance_name
3631

    
3632
  def Exec(self, feedback_fn):
3633
    """Remove the instance.
3634

3635
    """
3636
    instance = self.instance
3637
    logging.info("Shutting down instance %s on node %s",
3638
                 instance.name, instance.primary_node)
3639

    
3640
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3641
    msg = result.fail_msg
3642
    if msg:
3643
      if self.op.ignore_failures:
3644
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3645
      else:
3646
        raise errors.OpExecError("Could not shutdown instance %s on"
3647
                                 " node %s: %s" %
3648
                                 (instance.name, instance.primary_node, msg))
3649

    
3650
    logging.info("Removing block devices for instance %s", instance.name)
3651

    
3652
    if not _RemoveDisks(self, instance):
3653
      if self.op.ignore_failures:
3654
        feedback_fn("Warning: can't remove instance's disks")
3655
      else:
3656
        raise errors.OpExecError("Can't remove instance's disks")
3657

    
3658
    logging.info("Removing instance %s out of cluster config", instance.name)
3659

    
3660
    self.cfg.RemoveInstance(instance.name)
3661
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3662

    
3663

    
3664
class LUQueryInstances(NoHooksLU):
3665
  """Logical unit for querying instances.
3666

3667
  """
3668
  _OP_REQP = ["output_fields", "names", "use_locking"]
3669
  REQ_BGL = False
3670
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3671
                                    "admin_state",
3672
                                    "disk_template", "ip", "mac", "bridge",
3673
                                    "nic_mode", "nic_link",
3674
                                    "sda_size", "sdb_size", "vcpus", "tags",
3675
                                    "network_port", "beparams",
3676
                                    r"(disk)\.(size)/([0-9]+)",
3677
                                    r"(disk)\.(sizes)", "disk_usage",
3678
                                    r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3679
                                    r"(nic)\.(bridge)/([0-9]+)",
3680
                                    r"(nic)\.(macs|ips|modes|links|bridges)",
3681
                                    r"(disk|nic)\.(count)",
3682
                                    "serial_no", "hypervisor", "hvparams",] +
3683
                                  ["hv/%s" % name
3684
                                   for name in constants.HVS_PARAMETERS] +
3685
                                  ["be/%s" % name
3686
                                   for name in constants.BES_PARAMETERS])
3687
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3688

    
3689

    
3690
  def ExpandNames(self):
3691
    _CheckOutputFields(static=self._FIELDS_STATIC,
3692
                       dynamic=self._FIELDS_DYNAMIC,
3693
                       selected=self.op.output_fields)
3694

    
3695
    self.needed_locks = {}
3696
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3697
    self.share_locks[locking.LEVEL_NODE] = 1
3698

    
3699
    if self.op.names:
3700
      self.wanted = _GetWantedInstances(self, self.op.names)
3701
    else:
3702
      self.wanted = locking.ALL_SET
3703

    
3704
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3705
    self.do_locking = self.do_node_query and self.op.use_locking
3706
    if self.do_locking:
3707
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3708
      self.needed_locks[locking.LEVEL_NODE] = []
3709
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3710

    
3711
  def DeclareLocks(self, level):
3712
    if level == locking.LEVEL_NODE and self.do_locking:
3713
      self._LockInstancesNodes()
3714

    
3715
  def CheckPrereq(self):
3716
    """Check prerequisites.
3717

3718
    """
3719
    pass
3720

    
3721
  def Exec(self, feedback_fn):
3722
    """Computes the list of nodes and their attributes.
3723

3724
    """
3725
    all_info = self.cfg.GetAllInstancesInfo()
3726
    if self.wanted == locking.ALL_SET:
3727
      # caller didn't specify instance names, so ordering is not important
3728
      if self.do_locking:
3729
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3730
      else:
3731
        instance_names = all_info.keys()
3732
      instance_names = utils.NiceSort(instance_names)
3733
    else:
3734
      # caller did specify names, so we must keep the ordering
3735
      if self.do_locking:
3736
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3737
      else:
3738
        tgt_set = all_info.keys()
3739
      missing = set(self.wanted).difference(tgt_set)
3740
      if missing:
3741
        raise errors.OpExecError("Some instances were removed before"
3742
                                 " retrieving their data: %s" % missing)
3743
      instance_names = self.wanted
3744

    
3745
    instance_list = [all_info[iname] for iname in instance_names]
3746

    
3747
    # begin data gathering
3748

    
3749
    nodes = frozenset([inst.primary_node for inst in instance_list])
3750
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3751

    
3752
    bad_nodes = []
3753
    off_nodes = []
3754
    if self.do_node_query:
3755
      live_data = {}
3756
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3757
      for name in nodes:
3758
        result = node_data[name]
3759
        if result.offline:
3760
          # offline nodes will be in both lists
3761
          off_nodes.append(name)
3762
        if result.failed or result.fail_msg:
3763
          bad_nodes.append(name)
3764
        else:
3765
          if result.payload:
3766
            live_data.update(result.payload)
3767
          # else no instance is alive
3768
    else:
3769
      live_data = dict([(name, {}) for name in instance_names])
3770

    
3771
    # end data gathering
3772

    
3773
    HVPREFIX = "hv/"
3774
    BEPREFIX = "be/"
3775
    output = []
3776
    cluster = self.cfg.GetClusterInfo()
3777
    for instance in instance_list:
3778
      iout = []
3779
      i_hv = cluster.FillHV(instance)
3780
      i_be = cluster.FillBE(instance)
3781
      i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3782
                                 nic.nicparams) for nic in instance.nics]
3783
      for field in self.op.output_fields:
3784
        st_match = self._FIELDS_STATIC.Matches(field)
3785
        if field == "name":
3786
          val = instance.name
3787
        elif field == "os":
3788
          val = instance.os
3789
        elif field == "pnode":
3790
          val = instance.primary_node
3791
        elif field == "snodes":
3792
          val = list(instance.secondary_nodes)
3793
        elif field == "admin_state":
3794
          val = instance.admin_up
3795
        elif field == "oper_state":
3796
          if instance.primary_node in bad_nodes:
3797
            val = None
3798
          else:
3799
            val = bool(live_data.get(instance.name))
3800
        elif field == "status":
3801
          if instance.primary_node in off_nodes:
3802
            val = "ERROR_nodeoffline"
3803
          elif instance.primary_node in bad_nodes:
3804
            val = "ERROR_nodedown"
3805
          else:
3806
            running = bool(live_data.get(instance.name))
3807
            if running:
3808
              if instance.admin_up:
3809
                val = "running"
3810
              else:
3811
                val = "ERROR_up"
3812
            else:
3813
              if instance.admin_up:
3814
                val = "ERROR_down"
3815
              else:
3816
                val = "ADMIN_down"
3817
        elif field == "oper_ram":
3818
          if instance.primary_node in bad_nodes:
3819
            val = None
3820
          elif instance.name in live_data:
3821
            val = live_data[instance.name].get("memory", "?")
3822
          else:
3823
            val = "-"
3824
        elif field == "vcpus":
3825
          val = i_be[constants.BE_VCPUS]
3826
        elif field == "disk_template":
3827
          val = instance.disk_template
3828
        elif field == "ip":
3829
          if instance.nics:
3830
            val = instance.nics[0].ip
3831
          else:
3832
            val = None
3833
        elif field == "nic_mode":
3834
          if instance.nics:
3835
            val = i_nicp[0][constants.NIC_MODE]
3836
          else:
3837
            val = None
3838
        elif field == "nic_link":
3839
          if instance.nics:
3840
            val = i_nicp[0][constants.NIC_LINK]
3841
          else:
3842
            val = None
3843
        elif field == "bridge":
3844
          if (instance.nics and
3845
              i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3846
            val = i_nicp[0][constants.NIC_LINK]
3847
          else:
3848
            val = None
3849
        elif field == "mac":
3850
          if instance.nics:
3851
            val = instance.nics[0].mac
3852
          else:
3853
            val = None
3854
        elif field == "sda_size" or field == "sdb_size":
3855
          idx = ord(field[2]) - ord('a')
3856
          try:
3857
            val = instance.FindDisk(idx).size
3858
          except errors.OpPrereqError:
3859
            val = None
3860
        elif field == "disk_usage": # total disk usage per node
3861
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3862
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3863
        elif field == "tags":
3864
          val = list(instance.GetTags())
3865
        elif field == "serial_no":
3866
          val = instance.serial_no
3867
        elif field == "network_port":
3868
          val = instance.network_port
3869
        elif field == "hypervisor":
3870
          val = instance.hypervisor
3871
        elif field == "hvparams":
3872
          val = i_hv
3873
        elif (field.startswith(HVPREFIX) and
3874
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3875
          val = i_hv.get(field[len(HVPREFIX):], None)
3876
        elif field == "beparams":
3877
          val = i_be
3878
        elif (field.startswith(BEPREFIX) and
3879
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3880
          val = i_be.get(field[len(BEPREFIX):], None)
3881
        elif st_match and st_match.groups():
3882
          # matches a variable list
3883
          st_groups = st_match.groups()
3884
          if st_groups and st_groups[0] == "disk":
3885
            if st_groups[1] == "count":
3886
              val = len(instance.disks)
3887
            elif st_groups[1] == "sizes":
3888
              val = [disk.size for disk in instance.disks]
3889
            elif st_groups[1] == "size":
3890
              try:
3891
                val = instance.FindDisk(st_groups[2]).size
3892
              except errors.OpPrereqError:
3893
                val = None
3894
            else:
3895
              assert False, "Unhandled disk parameter"
3896
          elif st_groups[0] == "nic":
3897
            if st_groups[1] == "count":
3898
              val = len(instance.nics)
3899
            elif st_groups[1] == "macs":
3900
              val = [nic.mac for nic in instance.nics]
3901
            elif st_groups[1] == "ips":
3902
              val = [nic.ip for nic in instance.nics]
3903
            elif st_groups[1] == "modes":
3904
              val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3905
            elif st_groups[1] == "links":
3906
              val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3907
            elif st_groups[1] == "bridges":
3908
              val = []
3909
              for nicp in i_nicp:
3910
                if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3911
                  val.append(nicp[constants.NIC_LINK])
3912
                else:
3913
                  val.append(None)
3914
            else:
3915
              # index-based item
3916
              nic_idx = int(st_groups[2])
3917
              if nic_idx >= len(instance.nics):
3918
                val = None
3919
              else:
3920
                if st_groups[1] == "mac":
3921
                  val = instance.nics[nic_idx].mac
3922
                elif st_groups[1] == "ip":
3923
                  val = instance.nics[nic_idx].ip
3924
                elif st_groups[1] == "mode":
3925
                  val = i_nicp[nic_idx][constants.NIC_MODE]
3926
                elif st_groups[1] == "link":
3927
                  val = i_nicp[nic_idx][constants.NIC_LINK]
3928
                elif st_groups[1] == "bridge":
3929
                  nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3930
                  if nic_mode == constants.NIC_MODE_BRIDGED:
3931
                    val = i_nicp[nic_idx][constants.NIC_LINK]
3932
                  else:
3933
                    val = None
3934
                else:
3935
                  assert False, "Unhandled NIC parameter"
3936
          else:
3937
            assert False, ("Declared but unhandled variable parameter '%s'" %
3938
                           field)
3939
        else:
3940
          assert False, "Declared but unhandled parameter '%s'" % field
3941
        iout.append(val)
3942
      output.append(iout)
3943

    
3944
    return output
3945

    
3946

    
3947
class LUFailoverInstance(LogicalUnit):
3948
  """Failover an instance.
3949

3950
  """
3951
  HPATH = "instance-failover"
3952
  HTYPE = constants.HTYPE_INSTANCE
3953
  _OP_REQP = ["instance_name", "ignore_consistency"]
3954
  REQ_BGL = False
3955

    
3956
  def ExpandNames(self):
3957
    self._ExpandAndLockInstance()
3958
    self.needed_locks[locking.LEVEL_NODE] = []
3959
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3960

    
3961
  def DeclareLocks(self, level):
3962
    if level == locking.LEVEL_NODE:
3963
      self._LockInstancesNodes()
3964

    
3965
  def BuildHooksEnv(self):
3966
    """Build hooks env.
3967

3968
    This runs on master, primary and secondary nodes of the instance.
3969

3970
    """
3971
    env = {
3972
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3973
      }
3974
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3975
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3976
    return env, nl, nl
3977

    
3978
  def CheckPrereq(self):
3979
    """Check prerequisites.
3980

3981
    This checks that the instance is in the cluster.
3982

3983
    """
3984
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3985
    assert self.instance is not None, \
3986
      "Cannot retrieve locked instance %s" % self.op.instance_name
3987

    
3988
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3989
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3990
      raise errors.OpPrereqError("Instance's disk layout is not"
3991
                                 " network mirrored, cannot failover.")
3992

    
3993
    secondary_nodes = instance.secondary_nodes
3994
    if not secondary_nodes:
3995
      raise errors.ProgrammerError("no secondary node but using "
3996
                                   "a mirrored disk template")
3997

    
3998
    target_node = secondary_nodes[0]
3999
    _CheckNodeOnline(self, target_node)
4000
    _CheckNodeNotDrained(self, target_node)
4001
    if instance.admin_up:
4002
      # check memory requirements on the secondary node
4003
      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4004
                           instance.name, bep[constants.BE_MEMORY],
4005
                           instance.hypervisor)
4006
    else:
4007
      self.LogInfo("Not checking memory on the secondary node as"
4008
                   " instance will not be started")
4009

    
4010
    # check bridge existance
4011
    _CheckInstanceBridgesExist(self, instance, node=target_node)
4012

    
4013
  def Exec(self, feedback_fn):
4014
    """Failover an instance.
4015

4016
    The failover is done by shutting it down on its present node and
4017
    starting it on the secondary.
4018

4019
    """
4020
    instance = self.instance
4021

    
4022
    source_node = instance.primary_node
4023
    target_node = instance.secondary_nodes[0]
4024