Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 19bed813

History | View | Annotate | Download (289.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq (except when tasklets are used)
51
    - implement Exec (except when tasklets are used)
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  @ivar dry_run_result: the value (if any) that will be returned to the caller
60
      in dry-run mode (signalled by opcode dry_run parameter)
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overridden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92
    self.LogStep = processor.LogStep
93
    # support for dry-run
94
    self.dry_run_result = None
95

    
96
    # Tasklets
97
    self.tasklets = None
98

    
99
    for attr_name in self._OP_REQP:
100
      attr_val = getattr(op, attr_name, None)
101
      if attr_val is None:
102
        raise errors.OpPrereqError("Required parameter '%s' missing" %
103
                                   attr_name)
104

    
105
    self.CheckArguments()
106

    
107
  def __GetSSH(self):
108
    """Returns the SshRunner object
109

110
    """
111
    if not self.__ssh:
112
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113
    return self.__ssh
114

    
115
  ssh = property(fget=__GetSSH)
116

    
117
  def CheckArguments(self):
118
    """Check syntactic validity for the opcode arguments.
119

120
    This method is for doing a simple syntactic check and ensure
121
    validity of opcode parameters, without any cluster-related
122
    checks. While the same can be accomplished in ExpandNames and/or
123
    CheckPrereq, doing these separate is better because:
124

125
      - ExpandNames is left as as purely a lock-related function
126
      - CheckPrereq is run after we have acquired locks (and possible
127
        waited for them)
128

129
    The function is allowed to change the self.op attribute so that
130
    later methods can no longer worry about missing parameters.
131

132
    """
133
    pass
134

    
135
  def ExpandNames(self):
136
    """Expand names for this LU.
137

138
    This method is called before starting to execute the opcode, and it should
139
    update all the parameters of the opcode to their canonical form (e.g. a
140
    short node name must be fully expanded after this method has successfully
141
    completed). This way locking, hooks, logging, ecc. can work correctly.
142

143
    LUs which implement this method must also populate the self.needed_locks
144
    member, as a dict with lock levels as keys, and a list of needed lock names
145
    as values. Rules:
146

147
      - use an empty dict if you don't need any lock
148
      - if you don't need any lock at a particular level omit that level
149
      - don't put anything for the BGL level
150
      - if you want all locks at a level use locking.ALL_SET as a value
151

152
    If you need to share locks (rather than acquire them exclusively) at one
153
    level you can modify self.share_locks, setting a true value (usually 1) for
154
    that level. By default locks are not shared.
155

156
    This function can also define a list of tasklets, which then will be
157
    executed in order instead of the usual LU-level CheckPrereq and Exec
158
    functions, if those are not defined by the LU.
159

160
    Examples::
161

162
      # Acquire all nodes and one instance
163
      self.needed_locks = {
164
        locking.LEVEL_NODE: locking.ALL_SET,
165
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
166
      }
167
      # Acquire just two nodes
168
      self.needed_locks = {
169
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
170
      }
171
      # Acquire no locks
172
      self.needed_locks = {} # No, you can't leave it to the default value None
173

174
    """
175
    # The implementation of this method is mandatory only if the new LU is
176
    # concurrent, so that old LUs don't need to be changed all at the same
177
    # time.
178
    if self.REQ_BGL:
179
      self.needed_locks = {} # Exclusive LUs don't need locks.
180
    else:
181
      raise NotImplementedError
182

    
183
  def DeclareLocks(self, level):
184
    """Declare LU locking needs for a level
185

186
    While most LUs can just declare their locking needs at ExpandNames time,
187
    sometimes there's the need to calculate some locks after having acquired
188
    the ones before. This function is called just before acquiring locks at a
189
    particular level, but after acquiring the ones at lower levels, and permits
190
    such calculations. It can be used to modify self.needed_locks, and by
191
    default it does nothing.
192

193
    This function is only called if you have something already set in
194
    self.needed_locks for the level.
195

196
    @param level: Locking level which is going to be locked
197
    @type level: member of ganeti.locking.LEVELS
198

199
    """
200

    
201
  def CheckPrereq(self):
202
    """Check prerequisites for this LU.
203

204
    This method should check that the prerequisites for the execution
205
    of this LU are fulfilled. It can do internode communication, but
206
    it should be idempotent - no cluster or system changes are
207
    allowed.
208

209
    The method should raise errors.OpPrereqError in case something is
210
    not fulfilled. Its return value is ignored.
211

212
    This method should also update all the parameters of the opcode to
213
    their canonical form if it hasn't been done by ExpandNames before.
214

215
    """
216
    if self.tasklets is not None:
217
      for (idx, tl) in enumerate(self.tasklets):
218
        logging.debug("Checking prerequisites for tasklet %s/%s",
219
                      idx + 1, len(self.tasklets))
220
        tl.CheckPrereq()
221
    else:
222
      raise NotImplementedError
223

    
224
  def Exec(self, feedback_fn):
225
    """Execute the LU.
226

227
    This method should implement the actual work. It should raise
228
    errors.OpExecError for failures that are somewhat dealt with in
229
    code, or expected.
230

231
    """
232
    if self.tasklets is not None:
233
      for (idx, tl) in enumerate(self.tasklets):
234
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
235
        tl.Exec(feedback_fn)
236
    else:
237
      raise NotImplementedError
238

    
239
  def BuildHooksEnv(self):
240
    """Build hooks environment for this LU.
241

242
    This method should return a three-node tuple consisting of: a dict
243
    containing the environment that will be used for running the
244
    specific hook for this LU, a list of node names on which the hook
245
    should run before the execution, and a list of node names on which
246
    the hook should run after the execution.
247

248
    The keys of the dict must not have 'GANETI_' prefixed as this will
249
    be handled in the hooks runner. Also note additional keys will be
250
    added by the hooks runner. If the LU doesn't define any
251
    environment, an empty dict (and not None) should be returned.
252

253
    No nodes should be returned as an empty list (and not None).
254

255
    Note that if the HPATH for a LU class is None, this function will
256
    not be called.
257

258
    """
259
    raise NotImplementedError
260

    
261
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262
    """Notify the LU about the results of its hooks.
263

264
    This method is called every time a hooks phase is executed, and notifies
265
    the Logical Unit about the hooks' result. The LU can then use it to alter
266
    its result based on the hooks.  By default the method does nothing and the
267
    previous result is passed back unchanged but any LU can define it if it
268
    wants to use the local cluster hook-scripts somehow.
269

270
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
271
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272
    @param hook_results: the results of the multi-node hooks rpc call
273
    @param feedback_fn: function used send feedback back to the caller
274
    @param lu_result: the previous Exec result this LU had, or None
275
        in the PRE phase
276
    @return: the new Exec result, based on the previous result
277
        and hook results
278

279
    """
280
    return lu_result
281

    
282
  def _ExpandAndLockInstance(self):
283
    """Helper function to expand and lock an instance.
284

285
    Many LUs that work on an instance take its name in self.op.instance_name
286
    and need to expand it and then declare the expanded name for locking. This
287
    function does it, and then updates self.op.instance_name to the expanded
288
    name. It also initializes needed_locks as a dict, if this hasn't been done
289
    before.
290

291
    """
292
    if self.needed_locks is None:
293
      self.needed_locks = {}
294
    else:
295
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296
        "_ExpandAndLockInstance called with instance-level locks set"
297
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298
    if expanded_name is None:
299
      raise errors.OpPrereqError("Instance '%s' not known" %
300
                                  self.op.instance_name)
301
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302
    self.op.instance_name = expanded_name
303

    
304
  def _LockInstancesNodes(self, primary_only=False):
305
    """Helper function to declare instances' nodes for locking.
306

307
    This function should be called after locking one or more instances to lock
308
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309
    with all primary or secondary nodes for instances already locked and
310
    present in self.needed_locks[locking.LEVEL_INSTANCE].
311

312
    It should be called from DeclareLocks, and for safety only works if
313
    self.recalculate_locks[locking.LEVEL_NODE] is set.
314

315
    In the future it may grow parameters to just lock some instance's nodes, or
316
    to just lock primaries or secondary nodes, if needed.
317

318
    If should be called in DeclareLocks in a way similar to::
319

320
      if level == locking.LEVEL_NODE:
321
        self._LockInstancesNodes()
322

323
    @type primary_only: boolean
324
    @param primary_only: only lock primary nodes of locked instances
325

326
    """
327
    assert locking.LEVEL_NODE in self.recalculate_locks, \
328
      "_LockInstancesNodes helper function called with no nodes to recalculate"
329

    
330
    # TODO: check if we're really been called with the instance locks held
331

    
332
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333
    # future we might want to have different behaviors depending on the value
334
    # of self.recalculate_locks[locking.LEVEL_NODE]
335
    wanted_nodes = []
336
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337
      instance = self.context.cfg.GetInstanceInfo(instance_name)
338
      wanted_nodes.append(instance.primary_node)
339
      if not primary_only:
340
        wanted_nodes.extend(instance.secondary_nodes)
341

    
342
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
346

    
347
    del self.recalculate_locks[locking.LEVEL_NODE]
348

    
349

    
350
class NoHooksLU(LogicalUnit):
351
  """Simple LU which runs no hooks.
352

353
  This LU is intended as a parent for other LogicalUnits which will
354
  run no hooks, in order to reduce duplicate code.
355

356
  """
357
  HPATH = None
358
  HTYPE = None
359

    
360

    
361
class Tasklet:
362
  """Tasklet base class.
363

364
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
366
  tasklets know nothing about locks.
367

368
  Subclasses must follow these rules:
369
    - Implement CheckPrereq
370
    - Implement Exec
371

372
  """
373
  def __init__(self, lu):
374
    self.lu = lu
375

    
376
    # Shortcuts
377
    self.cfg = lu.cfg
378
    self.rpc = lu.rpc
379

    
380
  def CheckPrereq(self):
381
    """Check prerequisites for this tasklets.
382

383
    This method should check whether the prerequisites for the execution of
384
    this tasklet are fulfilled. It can do internode communication, but it
385
    should be idempotent - no cluster or system changes are allowed.
386

387
    The method should raise errors.OpPrereqError in case something is not
388
    fulfilled. Its return value is ignored.
389

390
    This method should also update all parameters to their canonical form if it
391
    hasn't been done before.
392

393
    """
394
    raise NotImplementedError
395

    
396
  def Exec(self, feedback_fn):
397
    """Execute the tasklet.
398

399
    This method should implement the actual work. It should raise
400
    errors.OpExecError for failures that are somewhat dealt with in code, or
401
    expected.
402

403
    """
404
    raise NotImplementedError
405

    
406

    
407
def _GetWantedNodes(lu, nodes):
408
  """Returns list of checked and expanded node names.
409

410
  @type lu: L{LogicalUnit}
411
  @param lu: the logical unit on whose behalf we execute
412
  @type nodes: list
413
  @param nodes: list of node names or None for all nodes
414
  @rtype: list
415
  @return: the list of nodes, sorted
416
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
417

418
  """
419
  if not isinstance(nodes, list):
420
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
421

    
422
  if not nodes:
423
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424
      " non-empty list of nodes whose name is to be expanded.")
425

    
426
  wanted = []
427
  for name in nodes:
428
    node = lu.cfg.ExpandNodeName(name)
429
    if node is None:
430
      raise errors.OpPrereqError("No such node name '%s'" % name)
431
    wanted.append(node)
432

    
433
  return utils.NiceSort(wanted)
434

    
435

    
436
def _GetWantedInstances(lu, instances):
437
  """Returns list of checked and expanded instance names.
438

439
  @type lu: L{LogicalUnit}
440
  @param lu: the logical unit on whose behalf we execute
441
  @type instances: list
442
  @param instances: list of instance names or None for all instances
443
  @rtype: list
444
  @return: the list of instances, sorted
445
  @raise errors.OpPrereqError: if the instances parameter is wrong type
446
  @raise errors.OpPrereqError: if any of the passed instances is not found
447

448
  """
449
  if not isinstance(instances, list):
450
    raise errors.OpPrereqError("Invalid argument type 'instances'")
451

    
452
  if instances:
453
    wanted = []
454

    
455
    for name in instances:
456
      instance = lu.cfg.ExpandInstanceName(name)
457
      if instance is None:
458
        raise errors.OpPrereqError("No such instance name '%s'" % name)
459
      wanted.append(instance)
460

    
461
  else:
462
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
463
  return wanted
464

    
465

    
466
def _CheckOutputFields(static, dynamic, selected):
467
  """Checks whether all selected fields are valid.
468

469
  @type static: L{utils.FieldSet}
470
  @param static: static fields set
471
  @type dynamic: L{utils.FieldSet}
472
  @param dynamic: dynamic fields set
473

474
  """
475
  f = utils.FieldSet()
476
  f.Extend(static)
477
  f.Extend(dynamic)
478

    
479
  delta = f.NonMatching(selected)
480
  if delta:
481
    raise errors.OpPrereqError("Unknown output fields selected: %s"
482
                               % ",".join(delta))
483

    
484

    
485
def _CheckBooleanOpField(op, name):
486
  """Validates boolean opcode parameters.
487

488
  This will ensure that an opcode parameter is either a boolean value,
489
  or None (but that it always exists).
490

491
  """
492
  val = getattr(op, name, None)
493
  if not (val is None or isinstance(val, bool)):
494
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
495
                               (name, str(val)))
496
  setattr(op, name, val)
497

    
498

    
499
def _CheckNodeOnline(lu, node):
500
  """Ensure that a given node is online.
501

502
  @param lu: the LU on behalf of which we make the check
503
  @param node: the node to check
504
  @raise errors.OpPrereqError: if the node is offline
505

506
  """
507
  if lu.cfg.GetNodeInfo(node).offline:
508
    raise errors.OpPrereqError("Can't use offline node %s" % node)
509

    
510

    
511
def _CheckNodeNotDrained(lu, node):
512
  """Ensure that a given node is not drained.
513

514
  @param lu: the LU on behalf of which we make the check
515
  @param node: the node to check
516
  @raise errors.OpPrereqError: if the node is drained
517

518
  """
519
  if lu.cfg.GetNodeInfo(node).drained:
520
    raise errors.OpPrereqError("Can't use drained node %s" % node)
521

    
522

    
523
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524
                          memory, vcpus, nics, disk_template, disks,
525
                          bep, hvp, hypervisor_name):
526
  """Builds instance related env variables for hooks
527

528
  This builds the hook environment from individual variables.
529

530
  @type name: string
531
  @param name: the name of the instance
532
  @type primary_node: string
533
  @param primary_node: the name of the instance's primary node
534
  @type secondary_nodes: list
535
  @param secondary_nodes: list of secondary nodes as strings
536
  @type os_type: string
537
  @param os_type: the name of the instance's OS
538
  @type status: boolean
539
  @param status: the should_run status of the instance
540
  @type memory: string
541
  @param memory: the memory size of the instance
542
  @type vcpus: string
543
  @param vcpus: the count of VCPUs the instance has
544
  @type nics: list
545
  @param nics: list of tuples (ip, mac, mode, link) representing
546
      the NICs the instance has
547
  @type disk_template: string
548
  @param disk_template: the disk template of the instance
549
  @type disks: list
550
  @param disks: the list of (size, mode) pairs
551
  @type bep: dict
552
  @param bep: the backend parameters for the instance
553
  @type hvp: dict
554
  @param hvp: the hypervisor parameters for the instance
555
  @type hypervisor_name: string
556
  @param hypervisor_name: the hypervisor for the instance
557
  @rtype: dict
558
  @return: the hook environment for this instance
559

560
  """
561
  if status:
562
    str_status = "up"
563
  else:
564
    str_status = "down"
565
  env = {
566
    "OP_TARGET": name,
567
    "INSTANCE_NAME": name,
568
    "INSTANCE_PRIMARY": primary_node,
569
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570
    "INSTANCE_OS_TYPE": os_type,
571
    "INSTANCE_STATUS": str_status,
572
    "INSTANCE_MEMORY": memory,
573
    "INSTANCE_VCPUS": vcpus,
574
    "INSTANCE_DISK_TEMPLATE": disk_template,
575
    "INSTANCE_HYPERVISOR": hypervisor_name,
576
  }
577

    
578
  if nics:
579
    nic_count = len(nics)
580
    for idx, (ip, mac, mode, link) in enumerate(nics):
581
      if ip is None:
582
        ip = ""
583
      env["INSTANCE_NIC%d_IP" % idx] = ip
584
      env["INSTANCE_NIC%d_MAC" % idx] = mac
585
      env["INSTANCE_NIC%d_MODE" % idx] = mode
586
      env["INSTANCE_NIC%d_LINK" % idx] = link
587
      if mode == constants.NIC_MODE_BRIDGED:
588
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
589
  else:
590
    nic_count = 0
591

    
592
  env["INSTANCE_NIC_COUNT"] = nic_count
593

    
594
  if disks:
595
    disk_count = len(disks)
596
    for idx, (size, mode) in enumerate(disks):
597
      env["INSTANCE_DISK%d_SIZE" % idx] = size
598
      env["INSTANCE_DISK%d_MODE" % idx] = mode
599
  else:
600
    disk_count = 0
601

    
602
  env["INSTANCE_DISK_COUNT"] = disk_count
603

    
604
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
605
    for key, value in source.items():
606
      env["INSTANCE_%s_%s" % (kind, key)] = value
607

    
608
  return env
609

    
610

    
611
def _NICListToTuple(lu, nics):
612
  """Build a list of nic information tuples.
613

614
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615
  value in LUQueryInstanceData.
616

617
  @type lu:  L{LogicalUnit}
618
  @param lu: the logical unit on whose behalf we execute
619
  @type nics: list of L{objects.NIC}
620
  @param nics: list of nics to convert to hooks tuples
621

622
  """
623
  hooks_nics = []
624
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
625
  for nic in nics:
626
    ip = nic.ip
627
    mac = nic.mac
628
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629
    mode = filled_params[constants.NIC_MODE]
630
    link = filled_params[constants.NIC_LINK]
631
    hooks_nics.append((ip, mac, mode, link))
632
  return hooks_nics
633

    
634

    
635
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636
  """Builds instance related env variables for hooks from an object.
637

638
  @type lu: L{LogicalUnit}
639
  @param lu: the logical unit on whose behalf we execute
640
  @type instance: L{objects.Instance}
641
  @param instance: the instance for which we should build the
642
      environment
643
  @type override: dict
644
  @param override: dictionary with key/values that will override
645
      our values
646
  @rtype: dict
647
  @return: the hook environment dictionary
648

649
  """
650
  cluster = lu.cfg.GetClusterInfo()
651
  bep = cluster.FillBE(instance)
652
  hvp = cluster.FillHV(instance)
653
  args = {
654
    'name': instance.name,
655
    'primary_node': instance.primary_node,
656
    'secondary_nodes': instance.secondary_nodes,
657
    'os_type': instance.os,
658
    'status': instance.admin_up,
659
    'memory': bep[constants.BE_MEMORY],
660
    'vcpus': bep[constants.BE_VCPUS],
661
    'nics': _NICListToTuple(lu, instance.nics),
662
    'disk_template': instance.disk_template,
663
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
664
    'bep': bep,
665
    'hvp': hvp,
666
    'hypervisor_name': instance.hypervisor,
667
  }
668
  if override:
669
    args.update(override)
670
  return _BuildInstanceHookEnv(**args)
671

    
672

    
673
def _AdjustCandidatePool(lu):
674
  """Adjust the candidate pool after node operations.
675

676
  """
677
  mod_list = lu.cfg.MaintainCandidatePool()
678
  if mod_list:
679
    lu.LogInfo("Promoted nodes to master candidate role: %s",
680
               ", ".join(node.name for node in mod_list))
681
    for name in mod_list:
682
      lu.context.ReaddNode(name)
683
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
684
  if mc_now > mc_max:
685
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
686
               (mc_now, mc_max))
687

    
688

    
689
def _CheckNicsBridgesExist(lu, target_nics, target_node,
690
                               profile=constants.PP_DEFAULT):
691
  """Check that the brigdes needed by a list of nics exist.
692

693
  """
694
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696
                for nic in target_nics]
697
  brlist = [params[constants.NIC_LINK] for params in paramslist
698
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
699
  if brlist:
700
    result = lu.rpc.call_bridges_exist(target_node, brlist)
701
    result.Raise("Error checking bridges on destination node '%s'" %
702
                 target_node, prereq=True)
703

    
704

    
705
def _CheckInstanceBridgesExist(lu, instance, node=None):
706
  """Check that the brigdes needed by an instance exist.
707

708
  """
709
  if node is None:
710
    node = instance.primary_node
711
  _CheckNicsBridgesExist(lu, instance.nics, node)
712

    
713

    
714
def _GetNodeInstancesInner(cfg, fn):
715
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
716

    
717

    
718
def _GetNodeInstances(cfg, node_name):
719
  """Returns a list of all primary and secondary instances on a node.
720

721
  """
722

    
723
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
724

    
725

    
726
def _GetNodePrimaryInstances(cfg, node_name):
727
  """Returns primary instances on a node.
728

729
  """
730
  return _GetNodeInstancesInner(cfg,
731
                                lambda inst: node_name == inst.primary_node)
732

    
733

    
734
def _GetNodeSecondaryInstances(cfg, node_name):
735
  """Returns secondary instances on a node.
736

737
  """
738
  return _GetNodeInstancesInner(cfg,
739
                                lambda inst: node_name in inst.secondary_nodes)
740

    
741

    
742
def _GetStorageTypeArgs(cfg, storage_type):
743
  """Returns the arguments for a storage type.
744

745
  """
746
  # Special case for file storage
747
  if storage_type == constants.ST_FILE:
748
    # storage.FileStorage wants a list of storage directories
749
    return [[cfg.GetFileStorageDir()]]
750

    
751
  return []
752

    
753

    
754
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
755
  faulty = []
756

    
757
  for dev in instance.disks:
758
    cfg.SetDiskID(dev, node_name)
759

    
760
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
761
  result.Raise("Failed to get disk status from node %s" % node_name,
762
               prereq=prereq)
763

    
764
  for idx, bdev_status in enumerate(result.payload):
765
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
766
      faulty.append(idx)
767

    
768
  return faulty
769

    
770

    
771
class LUPostInitCluster(LogicalUnit):
772
  """Logical unit for running hooks after cluster initialization.
773

774
  """
775
  HPATH = "cluster-init"
776
  HTYPE = constants.HTYPE_CLUSTER
777
  _OP_REQP = []
778

    
779
  def BuildHooksEnv(self):
780
    """Build hooks env.
781

782
    """
783
    env = {"OP_TARGET": self.cfg.GetClusterName()}
784
    mn = self.cfg.GetMasterNode()
785
    return env, [], [mn]
786

    
787
  def CheckPrereq(self):
788
    """No prerequisites to check.
789

790
    """
791
    return True
792

    
793
  def Exec(self, feedback_fn):
794
    """Nothing to do.
795

796
    """
797
    return True
798

    
799

    
800
class LUDestroyCluster(LogicalUnit):
801
  """Logical unit for destroying the cluster.
802

803
  """
804
  HPATH = "cluster-destroy"
805
  HTYPE = constants.HTYPE_CLUSTER
806
  _OP_REQP = []
807

    
808
  def BuildHooksEnv(self):
809
    """Build hooks env.
810

811
    """
812
    env = {"OP_TARGET": self.cfg.GetClusterName()}
813
    return env, [], []
814

    
815
  def CheckPrereq(self):
816
    """Check prerequisites.
817

818
    This checks whether the cluster is empty.
819

820
    Any errors are signaled by raising errors.OpPrereqError.
821

822
    """
823
    master = self.cfg.GetMasterNode()
824

    
825
    nodelist = self.cfg.GetNodeList()
826
    if len(nodelist) != 1 or nodelist[0] != master:
827
      raise errors.OpPrereqError("There are still %d node(s) in"
828
                                 " this cluster." % (len(nodelist) - 1))
829
    instancelist = self.cfg.GetInstanceList()
830
    if instancelist:
831
      raise errors.OpPrereqError("There are still %d instance(s) in"
832
                                 " this cluster." % len(instancelist))
833

    
834
  def Exec(self, feedback_fn):
835
    """Destroys the cluster.
836

837
    """
838
    master = self.cfg.GetMasterNode()
839

    
840
    # Run post hooks on master node before it's removed
841
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
842
    try:
843
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
844
    except:
845
      self.LogWarning("Errors occurred running hooks on %s" % master)
846

    
847
    result = self.rpc.call_node_stop_master(master, False)
848
    result.Raise("Could not disable the master role")
849
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
850
    utils.CreateBackup(priv_key)
851
    utils.CreateBackup(pub_key)
852
    return master
853

    
854

    
855
class LUVerifyCluster(LogicalUnit):
856
  """Verifies the cluster status.
857

858
  """
859
  HPATH = "cluster-verify"
860
  HTYPE = constants.HTYPE_CLUSTER
861
  _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
862
  REQ_BGL = False
863

    
864
  TCLUSTER = "cluster"
865
  TNODE = "node"
866
  TINSTANCE = "instance"
867

    
868
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
869
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
870
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
871
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
872
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
873
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
874
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
875
  ENODEDRBD = (TNODE, "ENODEDRBD")
876
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
877
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
878
  ENODEHV = (TNODE, "ENODEHV")
879
  ENODELVM = (TNODE, "ENODELVM")
880
  ENODEN1 = (TNODE, "ENODEN1")
881
  ENODENET = (TNODE, "ENODENET")
882
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
883
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
884
  ENODERPC = (TNODE, "ENODERPC")
885
  ENODESSH = (TNODE, "ENODESSH")
886
  ENODEVERSION = (TNODE, "ENODEVERSION")
887

    
888
  ETYPE_FIELD = "code"
889
  ETYPE_ERROR = "ERROR"
890
  ETYPE_WARNING = "WARNING"
891

    
892
  def ExpandNames(self):
893
    self.needed_locks = {
894
      locking.LEVEL_NODE: locking.ALL_SET,
895
      locking.LEVEL_INSTANCE: locking.ALL_SET,
896
    }
897
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
898

    
899
  def _Error(self, ecode, item, msg, *args, **kwargs):
900
    """Format an error message.
901

902
    Based on the opcode's error_codes parameter, either format a
903
    parseable error code, or a simpler error string.
904

905
    This must be called only from Exec and functions called from Exec.
906

907
    """
908
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
909
    itype, etxt = ecode
910
    # first complete the msg
911
    if args:
912
      msg = msg % args
913
    # then format the whole message
914
    if self.op.error_codes:
915
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
916
    else:
917
      if item:
918
        item = " " + item
919
      else:
920
        item = ""
921
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
922
    # and finally report it via the feedback_fn
923
    self._feedback_fn("  - %s" % msg)
924

    
925
  def _ErrorIf(self, cond, *args, **kwargs):
926
    """Log an error message if the passed condition is True.
927

928
    """
929
    cond = bool(cond) or self.op.debug_simulate_errors
930
    if cond:
931
      self._Error(*args, **kwargs)
932
    # do not mark the operation as failed for WARN cases only
933
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
934
      self.bad = self.bad or cond
935

    
936
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
937
                  node_result, master_files, drbd_map, vg_name):
938
    """Run multiple tests against a node.
939

940
    Test list:
941

942
      - compares ganeti version
943
      - checks vg existence and size > 20G
944
      - checks config file checksum
945
      - checks ssh to other nodes
946

947
    @type nodeinfo: L{objects.Node}
948
    @param nodeinfo: the node to check
949
    @param file_list: required list of files
950
    @param local_cksum: dictionary of local files and their checksums
951
    @param node_result: the results from the node
952
    @param master_files: list of files that only masters should have
953
    @param drbd_map: the useddrbd minors for this node, in
954
        form of minor: (instance, must_exist) which correspond to instances
955
        and their running status
956
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
957

958
    """
959
    node = nodeinfo.name
960
    _ErrorIf = self._ErrorIf
961

    
962
    # main result, node_result should be a non-empty dict
963
    test = not node_result or not isinstance(node_result, dict)
964
    _ErrorIf(test, self.ENODERPC, node,
965
                  "unable to verify node: no data returned")
966
    if test:
967
      return
968

    
969
    # compares ganeti version
970
    local_version = constants.PROTOCOL_VERSION
971
    remote_version = node_result.get('version', None)
972
    test = not (remote_version and
973
                isinstance(remote_version, (list, tuple)) and
974
                len(remote_version) == 2)
975
    _ErrorIf(test, self.ENODERPC, node,
976
             "connection to node returned invalid data")
977
    if test:
978
      return
979

    
980
    test = local_version != remote_version[0]
981
    _ErrorIf(test, self.ENODEVERSION, node,
982
             "incompatible protocol versions: master %s,"
983
             " node %s", local_version, remote_version[0])
984
    if test:
985
      return
986

    
987
    # node seems compatible, we can actually try to look into its results
988

    
989
    # full package version
990
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
991
                  self.ENODEVERSION, node,
992
                  "software version mismatch: master %s, node %s",
993
                  constants.RELEASE_VERSION, remote_version[1],
994
                  code=self.ETYPE_WARNING)
995

    
996
    # checks vg existence and size > 20G
997
    if vg_name is not None:
998
      vglist = node_result.get(constants.NV_VGLIST, None)
999
      test = not vglist
1000
      _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1001
      if not test:
1002
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1003
                                              constants.MIN_VG_SIZE)
1004
        _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1005

    
1006
    # checks config file checksum
1007

    
1008
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
1009
    test = not isinstance(remote_cksum, dict)
1010
    _ErrorIf(test, self.ENODEFILECHECK, node,
1011
             "node hasn't returned file checksum data")
1012
    if not test:
1013
      for file_name in file_list:
1014
        node_is_mc = nodeinfo.master_candidate
1015
        must_have = (file_name not in master_files) or node_is_mc
1016
        # missing
1017
        test1 = file_name not in remote_cksum
1018
        # invalid checksum
1019
        test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1020
        # existing and good
1021
        test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1022
        _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1023
                 "file '%s' missing", file_name)
1024
        _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1025
                 "file '%s' has wrong checksum", file_name)
1026
        # not candidate and this is not a must-have file
1027
        _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1028
                 "file '%s' should not exist on non master"
1029
                 " candidates (and the file is outdated)", file_name)
1030
        # all good, except non-master/non-must have combination
1031
        _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1032
                 "file '%s' should not exist"
1033
                 " on non master candidates", file_name)
1034

    
1035
    # checks ssh to any
1036

    
1037
    test = constants.NV_NODELIST not in node_result
1038
    _ErrorIf(test, self.ENODESSH, node,
1039
             "node hasn't returned node ssh connectivity data")
1040
    if not test:
1041
      if node_result[constants.NV_NODELIST]:
1042
        for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1043
          _ErrorIf(True, self.ENODESSH, node,
1044
                   "ssh communication with node '%s': %s", a_node, a_msg)
1045

    
1046
    test = constants.NV_NODENETTEST not in node_result
1047
    _ErrorIf(test, self.ENODENET, node,
1048
             "node hasn't returned node tcp connectivity data")
1049
    if not test:
1050
      if node_result[constants.NV_NODENETTEST]:
1051
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1052
        for anode in nlist:
1053
          _ErrorIf(True, self.ENODENET, node,
1054
                   "tcp communication with node '%s': %s",
1055
                   anode, node_result[constants.NV_NODENETTEST][anode])
1056

    
1057
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1058
    if isinstance(hyp_result, dict):
1059
      for hv_name, hv_result in hyp_result.iteritems():
1060
        test = hv_result is not None
1061
        _ErrorIf(test, self.ENODEHV, node,
1062
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1063

    
1064
    # check used drbd list
1065
    if vg_name is not None:
1066
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
1067
      test = not isinstance(used_minors, (tuple, list))
1068
      _ErrorIf(test, self.ENODEDRBD, node,
1069
               "cannot parse drbd status file: %s", str(used_minors))
1070
      if not test:
1071
        for minor, (iname, must_exist) in drbd_map.items():
1072
          test = minor not in used_minors and must_exist
1073
          _ErrorIf(test, self.ENODEDRBD, node,
1074
                   "drbd minor %d of instance %s is not active",
1075
                   minor, iname)
1076
        for minor in used_minors:
1077
          test = minor not in drbd_map
1078
          _ErrorIf(test, self.ENODEDRBD, node,
1079
                   "unallocated drbd minor %d is in use", minor)
1080

    
1081
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1082
                      node_instance, n_offline):
1083
    """Verify an instance.
1084

1085
    This function checks to see if the required block devices are
1086
    available on the instance's node.
1087

1088
    """
1089
    _ErrorIf = self._ErrorIf
1090
    node_current = instanceconfig.primary_node
1091

    
1092
    node_vol_should = {}
1093
    instanceconfig.MapLVsByNode(node_vol_should)
1094

    
1095
    for node in node_vol_should:
1096
      if node in n_offline:
1097
        # ignore missing volumes on offline nodes
1098
        continue
1099
      for volume in node_vol_should[node]:
1100
        test = node not in node_vol_is or volume not in node_vol_is[node]
1101
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1102
                 "volume %s missing on node %s", volume, node)
1103

    
1104
    if instanceconfig.admin_up:
1105
      test = ((node_current not in node_instance or
1106
               not instance in node_instance[node_current]) and
1107
              node_current not in n_offline)
1108
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1109
               "instance not running on its primary node %s",
1110
               node_current)
1111

    
1112
    for node in node_instance:
1113
      if (not node == node_current):
1114
        test = instance in node_instance[node]
1115
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1116
                 "instance should not run on node %s", node)
1117

    
1118
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1119
    """Verify if there are any unknown volumes in the cluster.
1120

1121
    The .os, .swap and backup volumes are ignored. All other volumes are
1122
    reported as unknown.
1123

1124
    """
1125
    for node in node_vol_is:
1126
      for volume in node_vol_is[node]:
1127
        test = (node not in node_vol_should or
1128
                volume not in node_vol_should[node])
1129
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1130
                      "volume %s is unknown", volume)
1131

    
1132
  def _VerifyOrphanInstances(self, instancelist, node_instance):
1133
    """Verify the list of running instances.
1134

1135
    This checks what instances are running but unknown to the cluster.
1136

1137
    """
1138
    for node in node_instance:
1139
      for o_inst in node_instance[node]:
1140
        test = o_inst not in instancelist
1141
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1142
                      "instance %s on node %s should not exist", o_inst, node)
1143

    
1144
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1145
    """Verify N+1 Memory Resilience.
1146

1147
    Check that if one single node dies we can still start all the instances it
1148
    was primary for.
1149

1150
    """
1151
    for node, nodeinfo in node_info.iteritems():
1152
      # This code checks that every node which is now listed as secondary has
1153
      # enough memory to host all instances it is supposed to should a single
1154
      # other node in the cluster fail.
1155
      # FIXME: not ready for failover to an arbitrary node
1156
      # FIXME: does not support file-backed instances
1157
      # WARNING: we currently take into account down instances as well as up
1158
      # ones, considering that even if they're down someone might want to start
1159
      # them even in the event of a node failure.
1160
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1161
        needed_mem = 0
1162
        for instance in instances:
1163
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1164
          if bep[constants.BE_AUTO_BALANCE]:
1165
            needed_mem += bep[constants.BE_MEMORY]
1166
        test = nodeinfo['mfree'] < needed_mem
1167
        self._ErrorIf(test, self.ENODEN1, node,
1168
                      "not enough memory on to accommodate"
1169
                      " failovers should peer node %s fail", prinode)
1170

    
1171
  def CheckPrereq(self):
1172
    """Check prerequisites.
1173

1174
    Transform the list of checks we're going to skip into a set and check that
1175
    all its members are valid.
1176

1177
    """
1178
    self.skip_set = frozenset(self.op.skip_checks)
1179
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1180
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
1181

    
1182
  def BuildHooksEnv(self):
1183
    """Build hooks env.
1184

1185
    Cluster-Verify hooks just ran in the post phase and their failure makes
1186
    the output be logged in the verify output and the verification to fail.
1187

1188
    """
1189
    all_nodes = self.cfg.GetNodeList()
1190
    env = {
1191
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1192
      }
1193
    for node in self.cfg.GetAllNodesInfo().values():
1194
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1195

    
1196
    return env, [], all_nodes
1197

    
1198
  def Exec(self, feedback_fn):
1199
    """Verify integrity of cluster, performing various test on nodes.
1200

1201
    """
1202
    self.bad = False
1203
    _ErrorIf = self._ErrorIf
1204
    verbose = self.op.verbose
1205
    self._feedback_fn = feedback_fn
1206
    feedback_fn("* Verifying global settings")
1207
    for msg in self.cfg.VerifyConfig():
1208
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1209

    
1210
    vg_name = self.cfg.GetVGName()
1211
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1212
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1213
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1214
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1215
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1216
                        for iname in instancelist)
1217
    i_non_redundant = [] # Non redundant instances
1218
    i_non_a_balanced = [] # Non auto-balanced instances
1219
    n_offline = [] # List of offline nodes
1220
    n_drained = [] # List of nodes being drained
1221
    node_volume = {}
1222
    node_instance = {}
1223
    node_info = {}
1224
    instance_cfg = {}
1225

    
1226
    # FIXME: verify OS list
1227
    # do local checksums
1228
    master_files = [constants.CLUSTER_CONF_FILE]
1229

    
1230
    file_names = ssconf.SimpleStore().GetFileList()
1231
    file_names.append(constants.SSL_CERT_FILE)
1232
    file_names.append(constants.RAPI_CERT_FILE)
1233
    file_names.extend(master_files)
1234

    
1235
    local_checksums = utils.FingerprintFiles(file_names)
1236

    
1237
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1238
    node_verify_param = {
1239
      constants.NV_FILELIST: file_names,
1240
      constants.NV_NODELIST: [node.name for node in nodeinfo
1241
                              if not node.offline],
1242
      constants.NV_HYPERVISOR: hypervisors,
1243
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1244
                                  node.secondary_ip) for node in nodeinfo
1245
                                 if not node.offline],
1246
      constants.NV_INSTANCELIST: hypervisors,
1247
      constants.NV_VERSION: None,
1248
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1249
      }
1250
    if vg_name is not None:
1251
      node_verify_param[constants.NV_VGLIST] = None
1252
      node_verify_param[constants.NV_LVLIST] = vg_name
1253
      node_verify_param[constants.NV_DRBDLIST] = None
1254
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1255
                                           self.cfg.GetClusterName())
1256

    
1257
    cluster = self.cfg.GetClusterInfo()
1258
    master_node = self.cfg.GetMasterNode()
1259
    all_drbd_map = self.cfg.ComputeDRBDMap()
1260

    
1261
    feedback_fn("* Verifying node status")
1262
    for node_i in nodeinfo:
1263
      node = node_i.name
1264

    
1265
      if node_i.offline:
1266
        if verbose:
1267
          feedback_fn("* Skipping offline node %s" % (node,))
1268
        n_offline.append(node)
1269
        continue
1270

    
1271
      if node == master_node:
1272
        ntype = "master"
1273
      elif node_i.master_candidate:
1274
        ntype = "master candidate"
1275
      elif node_i.drained:
1276
        ntype = "drained"
1277
        n_drained.append(node)
1278
      else:
1279
        ntype = "regular"
1280
      if verbose:
1281
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1282

    
1283
      msg = all_nvinfo[node].fail_msg
1284
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1285
      if msg:
1286
        continue
1287

    
1288
      nresult = all_nvinfo[node].payload
1289
      node_drbd = {}
1290
      for minor, instance in all_drbd_map[node].items():
1291
        test = instance not in instanceinfo
1292
        _ErrorIf(test, self.ECLUSTERCFG, None,
1293
                 "ghost instance '%s' in temporary DRBD map", instance)
1294
          # ghost instance should not be running, but otherwise we
1295
          # don't give double warnings (both ghost instance and
1296
          # unallocated minor in use)
1297
        if test:
1298
          node_drbd[minor] = (instance, False)
1299
        else:
1300
          instance = instanceinfo[instance]
1301
          node_drbd[minor] = (instance.name, instance.admin_up)
1302
      self._VerifyNode(node_i, file_names, local_checksums,
1303
                       nresult, master_files, node_drbd, vg_name)
1304

    
1305
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1306
      if vg_name is None:
1307
        node_volume[node] = {}
1308
      elif isinstance(lvdata, basestring):
1309
        _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1310
                 utils.SafeEncode(lvdata))
1311
        node_volume[node] = {}
1312
      elif not isinstance(lvdata, dict):
1313
        _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1314
        continue
1315
      else:
1316
        node_volume[node] = lvdata
1317

    
1318
      # node_instance
1319
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1320
      test = not isinstance(idata, list)
1321
      _ErrorIf(test, self.ENODEHV, node,
1322
               "rpc call to node failed (instancelist)")
1323
      if test:
1324
        continue
1325

    
1326
      node_instance[node] = idata
1327

    
1328
      # node_info
1329
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1330
      test = not isinstance(nodeinfo, dict)
1331
      _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1332
      if test:
1333
        continue
1334

    
1335
      try:
1336
        node_info[node] = {
1337
          "mfree": int(nodeinfo['memory_free']),
1338
          "pinst": [],
1339
          "sinst": [],
1340
          # dictionary holding all instances this node is secondary for,
1341
          # grouped by their primary node. Each key is a cluster node, and each
1342
          # value is a list of instances which have the key as primary and the
1343
          # current node as secondary.  this is handy to calculate N+1 memory
1344
          # availability if you can only failover from a primary to its
1345
          # secondary.
1346
          "sinst-by-pnode": {},
1347
        }
1348
        # FIXME: devise a free space model for file based instances as well
1349
        if vg_name is not None:
1350
          test = (constants.NV_VGLIST not in nresult or
1351
                  vg_name not in nresult[constants.NV_VGLIST])
1352
          _ErrorIf(test, self.ENODELVM, node,
1353
                   "node didn't return data for the volume group '%s'"
1354
                   " - it is either missing or broken", vg_name)
1355
          if test:
1356
            continue
1357
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1358
      except (ValueError, KeyError):
1359
        _ErrorIf(True, self.ENODERPC, node,
1360
                 "node returned invalid nodeinfo, check lvm/hypervisor")
1361
        continue
1362

    
1363
    node_vol_should = {}
1364

    
1365
    feedback_fn("* Verifying instance status")
1366
    for instance in instancelist:
1367
      if verbose:
1368
        feedback_fn("* Verifying instance %s" % instance)
1369
      inst_config = instanceinfo[instance]
1370
      self._VerifyInstance(instance, inst_config, node_volume,
1371
                           node_instance, n_offline)
1372
      inst_nodes_offline = []
1373

    
1374
      inst_config.MapLVsByNode(node_vol_should)
1375

    
1376
      instance_cfg[instance] = inst_config
1377

    
1378
      pnode = inst_config.primary_node
1379
      _ErrorIf(pnode not in node_info and pnode not in n_offline,
1380
               self.ENODERPC, pnode, "instance %s, connection to"
1381
               " primary node failed", instance)
1382
      if pnode in node_info:
1383
        node_info[pnode]['pinst'].append(instance)
1384

    
1385
      if pnode in n_offline:
1386
        inst_nodes_offline.append(pnode)
1387

    
1388
      # If the instance is non-redundant we cannot survive losing its primary
1389
      # node, so we are not N+1 compliant. On the other hand we have no disk
1390
      # templates with more than one secondary so that situation is not well
1391
      # supported either.
1392
      # FIXME: does not support file-backed instances
1393
      if len(inst_config.secondary_nodes) == 0:
1394
        i_non_redundant.append(instance)
1395
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
1396
               self.EINSTANCELAYOUT, instance,
1397
               "instance has multiple secondary nodes", code="WARNING")
1398

    
1399
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1400
        i_non_a_balanced.append(instance)
1401

    
1402
      for snode in inst_config.secondary_nodes:
1403
        _ErrorIf(snode not in node_info and snode not in n_offline,
1404
                 self.ENODERPC, snode,
1405
                 "instance %s, connection to secondary node"
1406
                 "failed", instance)
1407

    
1408
        if snode in node_info:
1409
          node_info[snode]['sinst'].append(instance)
1410
          if pnode not in node_info[snode]['sinst-by-pnode']:
1411
            node_info[snode]['sinst-by-pnode'][pnode] = []
1412
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1413

    
1414
        if snode in n_offline:
1415
          inst_nodes_offline.append(snode)
1416

    
1417
      # warn that the instance lives on offline nodes
1418
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1419
               "instance lives on offline node(s) %s",
1420
               ", ".join(inst_nodes_offline))
1421

    
1422
    feedback_fn("* Verifying orphan volumes")
1423
    self._VerifyOrphanVolumes(node_vol_should, node_volume)
1424

    
1425
    feedback_fn("* Verifying remaining instances")
1426
    self._VerifyOrphanInstances(instancelist, node_instance)
1427

    
1428
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1429
      feedback_fn("* Verifying N+1 Memory redundancy")
1430
      self._VerifyNPlusOneMemory(node_info, instance_cfg)
1431

    
1432
    feedback_fn("* Other Notes")
1433
    if i_non_redundant:
1434
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1435
                  % len(i_non_redundant))
1436

    
1437
    if i_non_a_balanced:
1438
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1439
                  % len(i_non_a_balanced))
1440

    
1441
    if n_offline:
1442
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1443

    
1444
    if n_drained:
1445
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1446

    
1447
    return not self.bad
1448

    
1449
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1450
    """Analyze the post-hooks' result
1451

1452
    This method analyses the hook result, handles it, and sends some
1453
    nicely-formatted feedback back to the user.
1454

1455
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1456
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1457
    @param hooks_results: the results of the multi-node hooks rpc call
1458
    @param feedback_fn: function used send feedback back to the caller
1459
    @param lu_result: previous Exec result
1460
    @return: the new Exec result, based on the previous result
1461
        and hook results
1462

1463
    """
1464
    # We only really run POST phase hooks, and are only interested in
1465
    # their results
1466
    if phase == constants.HOOKS_PHASE_POST:
1467
      # Used to change hooks' output to proper indentation
1468
      indent_re = re.compile('^', re.M)
1469
      feedback_fn("* Hooks Results")
1470
      assert hooks_results, "invalid result from hooks"
1471

    
1472
      for node_name in hooks_results:
1473
        show_node_header = True
1474
        res = hooks_results[node_name]
1475
        msg = res.fail_msg
1476
        test = msg and not res.offline
1477
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
1478
                      "Communication failure in hooks execution: %s", msg)
1479
        if test:
1480
          # override manually lu_result here as _ErrorIf only
1481
          # overrides self.bad
1482
          lu_result = 1
1483
          continue
1484
        for script, hkr, output in res.payload:
1485
          test = hkr == constants.HKR_FAIL
1486
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
1487
                        "Script %s failed, output:", script)
1488
          if test:
1489
            output = indent_re.sub('      ', output)
1490
            feedback_fn("%s" % output)
1491
            lu_result = 1
1492

    
1493
      return lu_result
1494

    
1495

    
1496
class LUVerifyDisks(NoHooksLU):
1497
  """Verifies the cluster disks status.
1498

1499
  """
1500
  _OP_REQP = []
1501
  REQ_BGL = False
1502

    
1503
  def ExpandNames(self):
1504
    self.needed_locks = {
1505
      locking.LEVEL_NODE: locking.ALL_SET,
1506
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1507
    }
1508
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1509

    
1510
  def CheckPrereq(self):
1511
    """Check prerequisites.
1512

1513
    This has no prerequisites.
1514

1515
    """
1516
    pass
1517

    
1518
  def Exec(self, feedback_fn):
1519
    """Verify integrity of cluster disks.
1520

1521
    @rtype: tuple of three items
1522
    @return: a tuple of (dict of node-to-node_error, list of instances
1523
        which need activate-disks, dict of instance: (node, volume) for
1524
        missing volumes
1525

1526
    """
1527
    result = res_nodes, res_instances, res_missing = {}, [], {}
1528

    
1529
    vg_name = self.cfg.GetVGName()
1530
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1531
    instances = [self.cfg.GetInstanceInfo(name)
1532
                 for name in self.cfg.GetInstanceList()]
1533

    
1534
    nv_dict = {}
1535
    for inst in instances:
1536
      inst_lvs = {}
1537
      if (not inst.admin_up or
1538
          inst.disk_template not in constants.DTS_NET_MIRROR):
1539
        continue
1540
      inst.MapLVsByNode(inst_lvs)
1541
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1542
      for node, vol_list in inst_lvs.iteritems():
1543
        for vol in vol_list:
1544
          nv_dict[(node, vol)] = inst
1545

    
1546
    if not nv_dict:
1547
      return result
1548

    
1549
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1550

    
1551
    for node in nodes:
1552
      # node_volume
1553
      node_res = node_lvs[node]
1554
      if node_res.offline:
1555
        continue
1556
      msg = node_res.fail_msg
1557
      if msg:
1558
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1559
        res_nodes[node] = msg
1560
        continue
1561

    
1562
      lvs = node_res.payload
1563
      for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1564
        inst = nv_dict.pop((node, lv_name), None)
1565
        if (not lv_online and inst is not None
1566
            and inst.name not in res_instances):
1567
          res_instances.append(inst.name)
1568

    
1569
    # any leftover items in nv_dict are missing LVs, let's arrange the
1570
    # data better
1571
    for key, inst in nv_dict.iteritems():
1572
      if inst.name not in res_missing:
1573
        res_missing[inst.name] = []
1574
      res_missing[inst.name].append(key)
1575

    
1576
    return result
1577

    
1578

    
1579
class LURepairDiskSizes(NoHooksLU):
1580
  """Verifies the cluster disks sizes.
1581

1582
  """
1583
  _OP_REQP = ["instances"]
1584
  REQ_BGL = False
1585

    
1586
  def ExpandNames(self):
1587
    if not isinstance(self.op.instances, list):
1588
      raise errors.OpPrereqError("Invalid argument type 'instances'")
1589

    
1590
    if self.op.instances:
1591
      self.wanted_names = []
1592
      for name in self.op.instances:
1593
        full_name = self.cfg.ExpandInstanceName(name)
1594
        if full_name is None:
1595
          raise errors.OpPrereqError("Instance '%s' not known" % name)
1596
        self.wanted_names.append(full_name)
1597
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1598
      self.needed_locks = {
1599
        locking.LEVEL_NODE: [],
1600
        locking.LEVEL_INSTANCE: self.wanted_names,
1601
        }
1602
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1603
    else:
1604
      self.wanted_names = None
1605
      self.needed_locks = {
1606
        locking.LEVEL_NODE: locking.ALL_SET,
1607
        locking.LEVEL_INSTANCE: locking.ALL_SET,
1608
        }
1609
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1610

    
1611
  def DeclareLocks(self, level):
1612
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
1613
      self._LockInstancesNodes(primary_only=True)
1614

    
1615
  def CheckPrereq(self):
1616
    """Check prerequisites.
1617

1618
    This only checks the optional instance list against the existing names.
1619

1620
    """
1621
    if self.wanted_names is None:
1622
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1623

    
1624
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1625
                             in self.wanted_names]
1626

    
1627
  def Exec(self, feedback_fn):
1628
    """Verify the size of cluster disks.
1629

1630
    """
1631
    # TODO: check child disks too
1632
    # TODO: check differences in size between primary/secondary nodes
1633
    per_node_disks = {}
1634
    for instance in self.wanted_instances:
1635
      pnode = instance.primary_node
1636
      if pnode not in per_node_disks:
1637
        per_node_disks[pnode] = []
1638
      for idx, disk in enumerate(instance.disks):
1639
        per_node_disks[pnode].append((instance, idx, disk))
1640

    
1641
    changed = []
1642
    for node, dskl in per_node_disks.items():
1643
      result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1644
      if result.fail_msg:
1645
        self.LogWarning("Failure in blockdev_getsizes call to node"
1646
                        " %s, ignoring", node)
1647
        continue
1648
      if len(result.data) != len(dskl):
1649
        self.LogWarning("Invalid result from node %s, ignoring node results",
1650
                        node)
1651
        continue
1652
      for ((instance, idx, disk), size) in zip(dskl, result.data):
1653
        if size is None:
1654
          self.LogWarning("Disk %d of instance %s did not return size"
1655
                          " information, ignoring", idx, instance.name)
1656
          continue
1657
        if not isinstance(size, (int, long)):
1658
          self.LogWarning("Disk %d of instance %s did not return valid"
1659
                          " size information, ignoring", idx, instance.name)
1660
          continue
1661
        size = size >> 20
1662
        if size != disk.size:
1663
          self.LogInfo("Disk %d of instance %s has mismatched size,"
1664
                       " correcting: recorded %d, actual %d", idx,
1665
                       instance.name, disk.size, size)
1666
          disk.size = size
1667
          self.cfg.Update(instance)
1668
          changed.append((instance.name, idx, size))
1669
    return changed
1670

    
1671

    
1672
class LURenameCluster(LogicalUnit):
1673
  """Rename the cluster.
1674

1675
  """
1676
  HPATH = "cluster-rename"
1677
  HTYPE = constants.HTYPE_CLUSTER
1678
  _OP_REQP = ["name"]
1679

    
1680
  def BuildHooksEnv(self):
1681
    """Build hooks env.
1682

1683
    """
1684
    env = {
1685
      "OP_TARGET": self.cfg.GetClusterName(),
1686
      "NEW_NAME": self.op.name,
1687
      }
1688
    mn = self.cfg.GetMasterNode()
1689
    return env, [mn], [mn]
1690

    
1691
  def CheckPrereq(self):
1692
    """Verify that the passed name is a valid one.
1693

1694
    """
1695
    hostname = utils.HostInfo(self.op.name)
1696

    
1697
    new_name = hostname.name
1698
    self.ip = new_ip = hostname.ip
1699
    old_name = self.cfg.GetClusterName()
1700
    old_ip = self.cfg.GetMasterIP()
1701
    if new_name == old_name and new_ip == old_ip:
1702
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1703
                                 " cluster has changed")
1704
    if new_ip != old_ip:
1705
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1706
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1707
                                   " reachable on the network. Aborting." %
1708
                                   new_ip)
1709

    
1710
    self.op.name = new_name
1711

    
1712
  def Exec(self, feedback_fn):
1713
    """Rename the cluster.
1714

1715
    """
1716
    clustername = self.op.name
1717
    ip = self.ip
1718

    
1719
    # shutdown the master IP
1720
    master = self.cfg.GetMasterNode()
1721
    result = self.rpc.call_node_stop_master(master, False)
1722
    result.Raise("Could not disable the master role")
1723

    
1724
    try:
1725
      cluster = self.cfg.GetClusterInfo()
1726
      cluster.cluster_name = clustername
1727
      cluster.master_ip = ip
1728
      self.cfg.Update(cluster)
1729

    
1730
      # update the known hosts file
1731
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1732
      node_list = self.cfg.GetNodeList()
1733
      try:
1734
        node_list.remove(master)
1735
      except ValueError:
1736
        pass
1737
      result = self.rpc.call_upload_file(node_list,
1738
                                         constants.SSH_KNOWN_HOSTS_FILE)
1739
      for to_node, to_result in result.iteritems():
1740
        msg = to_result.fail_msg
1741
        if msg:
1742
          msg = ("Copy of file %s to node %s failed: %s" %
1743
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1744
          self.proc.LogWarning(msg)
1745

    
1746
    finally:
1747
      result = self.rpc.call_node_start_master(master, False, False)
1748
      msg = result.fail_msg
1749
      if msg:
1750
        self.LogWarning("Could not re-enable the master role on"
1751
                        " the master, please restart manually: %s", msg)
1752

    
1753

    
1754
def _RecursiveCheckIfLVMBased(disk):
1755
  """Check if the given disk or its children are lvm-based.
1756

1757
  @type disk: L{objects.Disk}
1758
  @param disk: the disk to check
1759
  @rtype: boolean
1760
  @return: boolean indicating whether a LD_LV dev_type was found or not
1761

1762
  """
1763
  if disk.children:
1764
    for chdisk in disk.children:
1765
      if _RecursiveCheckIfLVMBased(chdisk):
1766
        return True
1767
  return disk.dev_type == constants.LD_LV
1768

    
1769

    
1770
class LUSetClusterParams(LogicalUnit):
1771
  """Change the parameters of the cluster.
1772

1773
  """
1774
  HPATH = "cluster-modify"
1775
  HTYPE = constants.HTYPE_CLUSTER
1776
  _OP_REQP = []
1777
  REQ_BGL = False
1778

    
1779
  def CheckArguments(self):
1780
    """Check parameters
1781

1782
    """
1783
    if not hasattr(self.op, "candidate_pool_size"):
1784
      self.op.candidate_pool_size = None
1785
    if self.op.candidate_pool_size is not None:
1786
      try:
1787
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1788
      except (ValueError, TypeError), err:
1789
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1790
                                   str(err))
1791
      if self.op.candidate_pool_size < 1:
1792
        raise errors.OpPrereqError("At least one master candidate needed")
1793

    
1794
  def ExpandNames(self):
1795
    # FIXME: in the future maybe other cluster params won't require checking on
1796
    # all nodes to be modified.
1797
    self.needed_locks = {
1798
      locking.LEVEL_NODE: locking.ALL_SET,
1799
    }
1800
    self.share_locks[locking.LEVEL_NODE] = 1
1801

    
1802
  def BuildHooksEnv(self):
1803
    """Build hooks env.
1804

1805
    """
1806
    env = {
1807
      "OP_TARGET": self.cfg.GetClusterName(),
1808
      "NEW_VG_NAME": self.op.vg_name,
1809
      }
1810
    mn = self.cfg.GetMasterNode()
1811
    return env, [mn], [mn]
1812

    
1813
  def CheckPrereq(self):
1814
    """Check prerequisites.
1815

1816
    This checks whether the given params don't conflict and
1817
    if the given volume group is valid.
1818

1819
    """
1820
    if self.op.vg_name is not None and not self.op.vg_name:
1821
      instances = self.cfg.GetAllInstancesInfo().values()
1822
      for inst in instances:
1823
        for disk in inst.disks:
1824
          if _RecursiveCheckIfLVMBased(disk):
1825
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1826
                                       " lvm-based instances exist")
1827

    
1828
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1829

    
1830
    # if vg_name not None, checks given volume group on all nodes
1831
    if self.op.vg_name:
1832
      vglist = self.rpc.call_vg_list(node_list)
1833
      for node in node_list:
1834
        msg = vglist[node].fail_msg
1835
        if msg:
1836
          # ignoring down node
1837
          self.LogWarning("Error while gathering data on node %s"
1838
                          " (ignoring node): %s", node, msg)
1839
          continue
1840
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1841
                                              self.op.vg_name,
1842
                                              constants.MIN_VG_SIZE)
1843
        if vgstatus:
1844
          raise errors.OpPrereqError("Error on node '%s': %s" %
1845
                                     (node, vgstatus))
1846

    
1847
    self.cluster = cluster = self.cfg.GetClusterInfo()
1848
    # validate params changes
1849
    if self.op.beparams:
1850
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1851
      self.new_beparams = objects.FillDict(
1852
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1853

    
1854
    if self.op.nicparams:
1855
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1856
      self.new_nicparams = objects.FillDict(
1857
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1858
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1859

    
1860
    # hypervisor list/parameters
1861
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1862
    if self.op.hvparams:
1863
      if not isinstance(self.op.hvparams, dict):
1864
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1865
      for hv_name, hv_dict in self.op.hvparams.items():
1866
        if hv_name not in self.new_hvparams:
1867
          self.new_hvparams[hv_name] = hv_dict
1868
        else:
1869
          self.new_hvparams[hv_name].update(hv_dict)
1870

    
1871
    if self.op.enabled_hypervisors is not None:
1872
      self.hv_list = self.op.enabled_hypervisors
1873
      if not self.hv_list:
1874
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1875
                                   " least one member")
1876
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1877
      if invalid_hvs:
1878
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1879
                                   " entries: %s" %
1880
                                   utils.CommaJoin(invalid_hvs))
1881
    else:
1882
      self.hv_list = cluster.enabled_hypervisors
1883

    
1884
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1885
      # either the enabled list has changed, or the parameters have, validate
1886
      for hv_name, hv_params in self.new_hvparams.items():
1887
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1888
            (self.op.enabled_hypervisors and
1889
             hv_name in self.op.enabled_hypervisors)):
1890
          # either this is a new hypervisor, or its parameters have changed
1891
          hv_class = hypervisor.GetHypervisor(hv_name)
1892
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1893
          hv_class.CheckParameterSyntax(hv_params)
1894
          _CheckHVParams(self, node_list, hv_name, hv_params)
1895

    
1896
  def Exec(self, feedback_fn):
1897
    """Change the parameters of the cluster.
1898

1899
    """
1900
    if self.op.vg_name is not None:
1901
      new_volume = self.op.vg_name
1902
      if not new_volume:
1903
        new_volume = None
1904
      if new_volume != self.cfg.GetVGName():
1905
        self.cfg.SetVGName(new_volume)
1906
      else:
1907
        feedback_fn("Cluster LVM configuration already in desired"
1908
                    " state, not changing")
1909
    if self.op.hvparams:
1910
      self.cluster.hvparams = self.new_hvparams
1911
    if self.op.enabled_hypervisors is not None:
1912
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1913
    if self.op.beparams:
1914
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1915
    if self.op.nicparams:
1916
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1917

    
1918
    if self.op.candidate_pool_size is not None:
1919
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1920
      # we need to update the pool size here, otherwise the save will fail
1921
      _AdjustCandidatePool(self)
1922

    
1923
    self.cfg.Update(self.cluster)
1924

    
1925

    
1926
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1927
  """Distribute additional files which are part of the cluster configuration.
1928

1929
  ConfigWriter takes care of distributing the config and ssconf files, but
1930
  there are more files which should be distributed to all nodes. This function
1931
  makes sure those are copied.
1932

1933
  @param lu: calling logical unit
1934
  @param additional_nodes: list of nodes not in the config to distribute to
1935

1936
  """
1937
  # 1. Gather target nodes
1938
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1939
  dist_nodes = lu.cfg.GetNodeList()
1940
  if additional_nodes is not None:
1941
    dist_nodes.extend(additional_nodes)
1942
  if myself.name in dist_nodes:
1943
    dist_nodes.remove(myself.name)
1944
  # 2. Gather files to distribute
1945
  dist_files = set([constants.ETC_HOSTS,
1946
                    constants.SSH_KNOWN_HOSTS_FILE,
1947
                    constants.RAPI_CERT_FILE,
1948
                    constants.RAPI_USERS_FILE,
1949
                    constants.HMAC_CLUSTER_KEY,
1950
                   ])
1951

    
1952
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1953
  for hv_name in enabled_hypervisors:
1954
    hv_class = hypervisor.GetHypervisor(hv_name)
1955
    dist_files.update(hv_class.GetAncillaryFiles())
1956

    
1957
  # 3. Perform the files upload
1958
  for fname in dist_files:
1959
    if os.path.exists(fname):
1960
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1961
      for to_node, to_result in result.items():
1962
        msg = to_result.fail_msg
1963
        if msg:
1964
          msg = ("Copy of file %s to node %s failed: %s" %
1965
                 (fname, to_node, msg))
1966
          lu.proc.LogWarning(msg)
1967

    
1968

    
1969
class LURedistributeConfig(NoHooksLU):
1970
  """Force the redistribution of cluster configuration.
1971

1972
  This is a very simple LU.
1973

1974
  """
1975
  _OP_REQP = []
1976
  REQ_BGL = False
1977

    
1978
  def ExpandNames(self):
1979
    self.needed_locks = {
1980
      locking.LEVEL_NODE: locking.ALL_SET,
1981
    }
1982
    self.share_locks[locking.LEVEL_NODE] = 1
1983

    
1984
  def CheckPrereq(self):
1985
    """Check prerequisites.
1986

1987
    """
1988

    
1989
  def Exec(self, feedback_fn):
1990
    """Redistribute the configuration.
1991

1992
    """
1993
    self.cfg.Update(self.cfg.GetClusterInfo())
1994
    _RedistributeAncillaryFiles(self)
1995

    
1996

    
1997
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1998
  """Sleep and poll for an instance's disk to sync.
1999

2000
  """
2001
  if not instance.disks:
2002
    return True
2003

    
2004
  if not oneshot:
2005
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2006

    
2007
  node = instance.primary_node
2008

    
2009
  for dev in instance.disks:
2010
    lu.cfg.SetDiskID(dev, node)
2011

    
2012
  retries = 0
2013
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2014
  while True:
2015
    max_time = 0
2016
    done = True
2017
    cumul_degraded = False
2018
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2019
    msg = rstats.fail_msg
2020
    if msg:
2021
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2022
      retries += 1
2023
      if retries >= 10:
2024
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2025
                                 " aborting." % node)
2026
      time.sleep(6)
2027
      continue
2028
    rstats = rstats.payload
2029
    retries = 0
2030
    for i, mstat in enumerate(rstats):
2031
      if mstat is None:
2032
        lu.LogWarning("Can't compute data for node %s/%s",
2033
                           node, instance.disks[i].iv_name)
2034
        continue
2035

    
2036
      cumul_degraded = (cumul_degraded or
2037
                        (mstat.is_degraded and mstat.sync_percent is None))
2038
      if mstat.sync_percent is not None:
2039
        done = False
2040
        if mstat.estimated_time is not None:
2041
          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2042
          max_time = mstat.estimated_time
2043
        else:
2044
          rem_time = "no time estimate"
2045
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2046
                        (instance.disks[i].iv_name, mstat.sync_percent,
2047
                         rem_time))
2048

    
2049
    # if we're done but degraded, let's do a few small retries, to
2050
    # make sure we see a stable and not transient situation; therefore
2051
    # we force restart of the loop
2052
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2053
      logging.info("Degraded disks found, %d retries left", degr_retries)
2054
      degr_retries -= 1
2055
      time.sleep(1)
2056
      continue
2057

    
2058
    if done or oneshot:
2059
      break
2060

    
2061
    time.sleep(min(60, max_time))
2062

    
2063
  if done:
2064
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2065
  return not cumul_degraded
2066

    
2067

    
2068
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2069
  """Check that mirrors are not degraded.
2070

2071
  The ldisk parameter, if True, will change the test from the
2072
  is_degraded attribute (which represents overall non-ok status for
2073
  the device(s)) to the ldisk (representing the local storage status).
2074

2075
  """
2076
  lu.cfg.SetDiskID(dev, node)
2077

    
2078
  result = True
2079

    
2080
  if on_primary or dev.AssembleOnSecondary():
2081
    rstats = lu.rpc.call_blockdev_find(node, dev)
2082
    msg = rstats.fail_msg
2083
    if msg:
2084
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2085
      result = False
2086
    elif not rstats.payload:
2087
      lu.LogWarning("Can't find disk on node %s", node)
2088
      result = False
2089
    else:
2090
      if ldisk:
2091
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2092
      else:
2093
        result = result and not rstats.payload.is_degraded
2094

    
2095
  if dev.children:
2096
    for child in dev.children:
2097
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2098

    
2099
  return result
2100

    
2101

    
2102
class LUDiagnoseOS(NoHooksLU):
2103
  """Logical unit for OS diagnose/query.
2104

2105
  """
2106
  _OP_REQP = ["output_fields", "names"]
2107
  REQ_BGL = False
2108
  _FIELDS_STATIC = utils.FieldSet()
2109
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2110

    
2111
  def ExpandNames(self):
2112
    if self.op.names:
2113
      raise errors.OpPrereqError("Selective OS query not supported")
2114

    
2115
    _CheckOutputFields(static=self._FIELDS_STATIC,
2116
                       dynamic=self._FIELDS_DYNAMIC,
2117
                       selected=self.op.output_fields)
2118

    
2119
    # Lock all nodes, in shared mode
2120
    # Temporary removal of locks, should be reverted later
2121
    # TODO: reintroduce locks when they are lighter-weight
2122
    self.needed_locks = {}
2123
    #self.share_locks[locking.LEVEL_NODE] = 1
2124
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2125

    
2126
  def CheckPrereq(self):
2127
    """Check prerequisites.
2128

2129
    """
2130

    
2131
  @staticmethod
2132
  def _DiagnoseByOS(node_list, rlist):
2133
    """Remaps a per-node return list into an a per-os per-node dictionary
2134

2135
    @param node_list: a list with the names of all nodes
2136
    @param rlist: a map with node names as keys and OS objects as values
2137

2138
    @rtype: dict
2139
    @return: a dictionary with osnames as keys and as value another map, with
2140
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
2141

2142
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2143
                                     (/srv/..., False, "invalid api")],
2144
                           "node2": [(/srv/..., True, "")]}
2145
          }
2146

2147
    """
2148
    all_os = {}
2149
    # we build here the list of nodes that didn't fail the RPC (at RPC
2150
    # level), so that nodes with a non-responding node daemon don't
2151
    # make all OSes invalid
2152
    good_nodes = [node_name for node_name in rlist
2153
                  if not rlist[node_name].fail_msg]
2154
    for node_name, nr in rlist.items():
2155
      if nr.fail_msg or not nr.payload:
2156
        continue
2157
      for name, path, status, diagnose in nr.payload:
2158
        if name not in all_os:
2159
          # build a list of nodes for this os containing empty lists
2160
          # for each node in node_list
2161
          all_os[name] = {}
2162
          for nname in good_nodes:
2163
            all_os[name][nname] = []
2164
        all_os[name][node_name].append((path, status, diagnose))
2165
    return all_os
2166

    
2167
  def Exec(self, feedback_fn):
2168
    """Compute the list of OSes.
2169

2170
    """
2171
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2172
    node_data = self.rpc.call_os_diagnose(valid_nodes)
2173
    pol = self._DiagnoseByOS(valid_nodes, node_data)
2174
    output = []
2175
    for os_name, os_data in pol.items():
2176
      row = []
2177
      for field in self.op.output_fields:
2178
        if field == "name":
2179
          val = os_name
2180
        elif field == "valid":
2181
          val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2182
        elif field == "node_status":
2183
          # this is just a copy of the dict
2184
          val = {}
2185
          for node_name, nos_list in os_data.items():
2186
            val[node_name] = nos_list
2187
        else:
2188
          raise errors.ParameterError(field)
2189
        row.append(val)
2190
      output.append(row)
2191

    
2192
    return output
2193

    
2194

    
2195
class LURemoveNode(LogicalUnit):
2196
  """Logical unit for removing a node.
2197

2198
  """
2199
  HPATH = "node-remove"
2200
  HTYPE = constants.HTYPE_NODE
2201
  _OP_REQP = ["node_name"]
2202

    
2203
  def BuildHooksEnv(self):
2204
    """Build hooks env.
2205

2206
    This doesn't run on the target node in the pre phase as a failed
2207
    node would then be impossible to remove.
2208

2209
    """
2210
    env = {
2211
      "OP_TARGET": self.op.node_name,
2212
      "NODE_NAME": self.op.node_name,
2213
      }
2214
    all_nodes = self.cfg.GetNodeList()
2215
    if self.op.node_name in all_nodes:
2216
      all_nodes.remove(self.op.node_name)
2217
    return env, all_nodes, all_nodes
2218

    
2219
  def CheckPrereq(self):
2220
    """Check prerequisites.
2221

2222
    This checks:
2223
     - the node exists in the configuration
2224
     - it does not have primary or secondary instances
2225
     - it's not the master
2226

2227
    Any errors are signaled by raising errors.OpPrereqError.
2228

2229
    """
2230
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2231
    if node is None:
2232
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2233

    
2234
    instance_list = self.cfg.GetInstanceList()
2235

    
2236
    masternode = self.cfg.GetMasterNode()
2237
    if node.name == masternode:
2238
      raise errors.OpPrereqError("Node is the master node,"
2239
                                 " you need to failover first.")
2240

    
2241
    for instance_name in instance_list:
2242
      instance = self.cfg.GetInstanceInfo(instance_name)
2243
      if node.name in instance.all_nodes:
2244
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2245
                                   " please remove first." % instance_name)
2246
    self.op.node_name = node.name
2247
    self.node = node
2248

    
2249
  def Exec(self, feedback_fn):
2250
    """Removes the node from the cluster.
2251

2252
    """
2253
    node = self.node
2254
    logging.info("Stopping the node daemon and removing configs from node %s",
2255
                 node.name)
2256

    
2257
    self.context.RemoveNode(node.name)
2258

    
2259
    # Run post hooks on the node before it's removed
2260
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2261
    try:
2262
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2263
    except:
2264
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
2265

    
2266
    result = self.rpc.call_node_leave_cluster(node.name)
2267
    msg = result.fail_msg
2268
    if msg:
2269
      self.LogWarning("Errors encountered on the remote node while leaving"
2270
                      " the cluster: %s", msg)
2271

    
2272
    # Promote nodes to master candidate as needed
2273
    _AdjustCandidatePool(self)
2274

    
2275

    
2276
class LUQueryNodes(NoHooksLU):
2277
  """Logical unit for querying nodes.
2278

2279
  """
2280
  _OP_REQP = ["output_fields", "names", "use_locking"]
2281
  REQ_BGL = False
2282

    
2283
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2284
                    "master_candidate", "offline", "drained"]
2285

    
2286
  _FIELDS_DYNAMIC = utils.FieldSet(
2287
    "dtotal", "dfree",
2288
    "mtotal", "mnode", "mfree",
2289
    "bootid",
2290
    "ctotal", "cnodes", "csockets",
2291
    )
2292

    
2293
  _FIELDS_STATIC = utils.FieldSet(*[
2294
    "pinst_cnt", "sinst_cnt",
2295
    "pinst_list", "sinst_list",
2296
    "pip", "sip", "tags",
2297
    "master",
2298
    "role"] + _SIMPLE_FIELDS
2299
    )
2300

    
2301
  def ExpandNames(self):
2302
    _CheckOutputFields(static=self._FIELDS_STATIC,
2303
                       dynamic=self._FIELDS_DYNAMIC,
2304
                       selected=self.op.output_fields)
2305

    
2306
    self.needed_locks = {}
2307
    self.share_locks[locking.LEVEL_NODE] = 1
2308

    
2309
    if self.op.names:
2310
      self.wanted = _GetWantedNodes(self, self.op.names)
2311
    else:
2312
      self.wanted = locking.ALL_SET
2313

    
2314
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2315
    self.do_locking = self.do_node_query and self.op.use_locking
2316
    if self.do_locking:
2317
      # if we don't request only static fields, we need to lock the nodes
2318
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2319

    
2320

    
2321
  def CheckPrereq(self):
2322
    """Check prerequisites.
2323

2324
    """
2325
    # The validation of the node list is done in the _GetWantedNodes,
2326
    # if non empty, and if empty, there's no validation to do
2327
    pass
2328

    
2329
  def Exec(self, feedback_fn):
2330
    """Computes the list of nodes and their attributes.
2331

2332
    """
2333
    all_info = self.cfg.GetAllNodesInfo()
2334
    if self.do_locking:
2335
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2336
    elif self.wanted != locking.ALL_SET:
2337
      nodenames = self.wanted
2338
      missing = set(nodenames).difference(all_info.keys())
2339
      if missing:
2340
        raise errors.OpExecError(
2341
          "Some nodes were removed before retrieving their data: %s" % missing)
2342
    else:
2343
      nodenames = all_info.keys()
2344

    
2345
    nodenames = utils.NiceSort(nodenames)
2346
    nodelist = [all_info[name] for name in nodenames]
2347

    
2348
    # begin data gathering
2349

    
2350
    if self.do_node_query:
2351
      live_data = {}
2352
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2353
                                          self.cfg.GetHypervisorType())
2354
      for name in nodenames:
2355
        nodeinfo = node_data[name]
2356
        if not nodeinfo.fail_msg and nodeinfo.payload:
2357
          nodeinfo = nodeinfo.payload
2358
          fn = utils.TryConvert
2359
          live_data[name] = {
2360
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2361
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2362
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2363
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2364
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2365
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2366
            "bootid": nodeinfo.get('bootid', None),
2367
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2368
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2369
            }
2370
        else:
2371
          live_data[name] = {}
2372
    else:
2373
      live_data = dict.fromkeys(nodenames, {})
2374

    
2375
    node_to_primary = dict([(name, set()) for name in nodenames])
2376
    node_to_secondary = dict([(name, set()) for name in nodenames])
2377

    
2378
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2379
                             "sinst_cnt", "sinst_list"))
2380
    if inst_fields & frozenset(self.op.output_fields):
2381
      instancelist = self.cfg.GetInstanceList()
2382

    
2383
      for instance_name in instancelist:
2384
        inst = self.cfg.GetInstanceInfo(instance_name)
2385
        if inst.primary_node in node_to_primary:
2386
          node_to_primary[inst.primary_node].add(inst.name)
2387
        for secnode in inst.secondary_nodes:
2388
          if secnode in node_to_secondary:
2389
            node_to_secondary[secnode].add(inst.name)
2390

    
2391
    master_node = self.cfg.GetMasterNode()
2392

    
2393
    # end data gathering
2394

    
2395
    output = []
2396
    for node in nodelist:
2397
      node_output = []
2398
      for field in self.op.output_fields:
2399
        if field in self._SIMPLE_FIELDS:
2400
          val = getattr(node, field)
2401
        elif field == "pinst_list":
2402
          val = list(node_to_primary[node.name])
2403
        elif field == "sinst_list":
2404
          val = list(node_to_secondary[node.name])
2405
        elif field == "pinst_cnt":
2406
          val = len(node_to_primary[node.name])
2407
        elif field == "sinst_cnt":
2408
          val = len(node_to_secondary[node.name])
2409
        elif field == "pip":
2410
          val = node.primary_ip
2411
        elif field == "sip":
2412
          val = node.secondary_ip
2413
        elif field == "tags":
2414
          val = list(node.GetTags())
2415
        elif field == "master":
2416
          val = node.name == master_node
2417
        elif self._FIELDS_DYNAMIC.Matches(field):
2418
          val = live_data[node.name].get(field, None)
2419
        elif field == "role":
2420
          if node.name == master_node:
2421
            val = "M"
2422
          elif node.master_candidate:
2423
            val = "C"
2424
          elif node.drained:
2425
            val = "D"
2426
          elif node.offline:
2427
            val = "O"
2428
          else:
2429
            val = "R"
2430
        else:
2431
          raise errors.ParameterError(field)
2432
        node_output.append(val)
2433
      output.append(node_output)
2434

    
2435
    return output
2436

    
2437

    
2438
class LUQueryNodeVolumes(NoHooksLU):
2439
  """Logical unit for getting volumes on node(s).
2440

2441
  """
2442
  _OP_REQP = ["nodes", "output_fields"]
2443
  REQ_BGL = False
2444
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2445
  _FIELDS_STATIC = utils.FieldSet("node")
2446

    
2447
  def ExpandNames(self):
2448
    _CheckOutputFields(static=self._FIELDS_STATIC,
2449
                       dynamic=self._FIELDS_DYNAMIC,
2450
                       selected=self.op.output_fields)
2451

    
2452
    self.needed_locks = {}
2453
    self.share_locks[locking.LEVEL_NODE] = 1
2454
    if not self.op.nodes:
2455
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2456
    else:
2457
      self.needed_locks[locking.LEVEL_NODE] = \
2458
        _GetWantedNodes(self, self.op.nodes)
2459

    
2460
  def CheckPrereq(self):
2461
    """Check prerequisites.
2462

2463
    This checks that the fields required are valid output fields.
2464

2465
    """
2466
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2467

    
2468
  def Exec(self, feedback_fn):
2469
    """Computes the list of nodes and their attributes.
2470

2471
    """
2472
    nodenames = self.nodes
2473
    volumes = self.rpc.call_node_volumes(nodenames)
2474

    
2475
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2476
             in self.cfg.GetInstanceList()]
2477

    
2478
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2479

    
2480
    output = []
2481
    for node in nodenames:
2482
      nresult = volumes[node]
2483
      if nresult.offline:
2484
        continue
2485
      msg = nresult.fail_msg
2486
      if msg:
2487
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2488
        continue
2489

    
2490
      node_vols = nresult.payload[:]
2491
      node_vols.sort(key=lambda vol: vol['dev'])
2492

    
2493
      for vol in node_vols:
2494
        node_output = []
2495
        for field in self.op.output_fields:
2496
          if field == "node":
2497
            val = node
2498
          elif field == "phys":
2499
            val = vol['dev']
2500
          elif field == "vg":
2501
            val = vol['vg']
2502
          elif field == "name":
2503
            val = vol['name']
2504
          elif field == "size":
2505
            val = int(float(vol['size']))
2506
          elif field == "instance":
2507
            for inst in ilist:
2508
              if node not in lv_by_node[inst]:
2509
                continue
2510
              if vol['name'] in lv_by_node[inst][node]:
2511
                val = inst.name
2512
                break
2513
            else:
2514
              val = '-'
2515
          else:
2516
            raise errors.ParameterError(field)
2517
          node_output.append(str(val))
2518

    
2519
        output.append(node_output)
2520

    
2521
    return output
2522

    
2523

    
2524
class LUQueryNodeStorage(NoHooksLU):
2525
  """Logical unit for getting information on storage units on node(s).
2526

2527
  """
2528
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
2529
  REQ_BGL = False
2530
  _FIELDS_STATIC = utils.FieldSet("node")
2531

    
2532
  def ExpandNames(self):
2533
    storage_type = self.op.storage_type
2534

    
2535
    if storage_type not in constants.VALID_STORAGE_FIELDS:
2536
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2537

    
2538
    dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2539

    
2540
    _CheckOutputFields(static=self._FIELDS_STATIC,
2541
                       dynamic=utils.FieldSet(*dynamic_fields),
2542
                       selected=self.op.output_fields)
2543

    
2544
    self.needed_locks = {}
2545
    self.share_locks[locking.LEVEL_NODE] = 1
2546

    
2547
    if self.op.nodes:
2548
      self.needed_locks[locking.LEVEL_NODE] = \
2549
        _GetWantedNodes(self, self.op.nodes)
2550
    else:
2551
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2552

    
2553
  def CheckPrereq(self):
2554
    """Check prerequisites.
2555

2556
    This checks that the fields required are valid output fields.
2557

2558
    """
2559
    self.op.name = getattr(self.op, "name", None)
2560

    
2561
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2562

    
2563
  def Exec(self, feedback_fn):
2564
    """Computes the list of nodes and their attributes.
2565

2566
    """
2567
    # Always get name to sort by
2568
    if constants.SF_NAME in self.op.output_fields:
2569
      fields = self.op.output_fields[:]
2570
    else:
2571
      fields = [constants.SF_NAME] + self.op.output_fields
2572

    
2573
    # Never ask for node as it's only known to the LU
2574
    while "node" in fields:
2575
      fields.remove("node")
2576

    
2577
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2578
    name_idx = field_idx[constants.SF_NAME]
2579

    
2580
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2581
    data = self.rpc.call_storage_list(self.nodes,
2582
                                      self.op.storage_type, st_args,
2583
                                      self.op.name, fields)
2584

    
2585
    result = []
2586

    
2587
    for node in utils.NiceSort(self.nodes):
2588
      nresult = data[node]
2589
      if nresult.offline:
2590
        continue
2591

    
2592
      msg = nresult.fail_msg
2593
      if msg:
2594
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2595
        continue
2596

    
2597
      rows = dict([(row[name_idx], row) for row in nresult.payload])
2598

    
2599
      for name in utils.NiceSort(rows.keys()):
2600
        row = rows[name]
2601

    
2602
        out = []
2603

    
2604
        for field in self.op.output_fields:
2605
          if field == "node":
2606
            val = node
2607
          elif field in field_idx:
2608
            val = row[field_idx[field]]
2609
          else:
2610
            raise errors.ParameterError(field)
2611

    
2612
          out.append(val)
2613

    
2614
        result.append(out)
2615

    
2616
    return result
2617

    
2618

    
2619
class LUModifyNodeStorage(NoHooksLU):
2620
  """Logical unit for modifying a storage volume on a node.
2621

2622
  """
2623
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2624
  REQ_BGL = False
2625

    
2626
  def CheckArguments(self):
2627
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2628
    if node_name is None:
2629
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2630

    
2631
    self.op.node_name = node_name
2632

    
2633
    storage_type = self.op.storage_type
2634
    if storage_type not in constants.VALID_STORAGE_FIELDS:
2635
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2636

    
2637
  def ExpandNames(self):
2638
    self.needed_locks = {
2639
      locking.LEVEL_NODE: self.op.node_name,
2640
      }
2641

    
2642
  def CheckPrereq(self):
2643
    """Check prerequisites.
2644

2645
    """
2646
    storage_type = self.op.storage_type
2647

    
2648
    try:
2649
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2650
    except KeyError:
2651
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
2652
                                 " modified" % storage_type)
2653

    
2654
    diff = set(self.op.changes.keys()) - modifiable
2655
    if diff:
2656
      raise errors.OpPrereqError("The following fields can not be modified for"
2657
                                 " storage units of type '%s': %r" %
2658
                                 (storage_type, list(diff)))
2659

    
2660
  def Exec(self, feedback_fn):
2661
    """Computes the list of nodes and their attributes.
2662

2663
    """
2664
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2665
    result = self.rpc.call_storage_modify(self.op.node_name,
2666
                                          self.op.storage_type, st_args,
2667
                                          self.op.name, self.op.changes)
2668
    result.Raise("Failed to modify storage unit '%s' on %s" %
2669
                 (self.op.name, self.op.node_name))
2670

    
2671

    
2672
class LUAddNode(LogicalUnit):
2673
  """Logical unit for adding node to the cluster.
2674

2675
  """
2676
  HPATH = "node-add"
2677
  HTYPE = constants.HTYPE_NODE
2678
  _OP_REQP = ["node_name"]
2679

    
2680
  def BuildHooksEnv(self):
2681
    """Build hooks env.
2682

2683
    This will run on all nodes before, and on all nodes + the new node after.
2684

2685
    """
2686
    env = {
2687
      "OP_TARGET": self.op.node_name,
2688
      "NODE_NAME": self.op.node_name,
2689
      "NODE_PIP": self.op.primary_ip,
2690
      "NODE_SIP": self.op.secondary_ip,
2691
      }
2692
    nodes_0 = self.cfg.GetNodeList()
2693
    nodes_1 = nodes_0 + [self.op.node_name, ]
2694
    return env, nodes_0, nodes_1
2695

    
2696
  def CheckPrereq(self):
2697
    """Check prerequisites.
2698

2699
    This checks:
2700
     - the new node is not already in the config
2701
     - it is resolvable
2702
     - its parameters (single/dual homed) matches the cluster
2703

2704
    Any errors are signaled by raising errors.OpPrereqError.
2705

2706
    """
2707
    node_name = self.op.node_name
2708
    cfg = self.cfg
2709

    
2710
    dns_data = utils.HostInfo(node_name)
2711

    
2712
    node = dns_data.name
2713
    primary_ip = self.op.primary_ip = dns_data.ip
2714
    secondary_ip = getattr(self.op, "secondary_ip", None)
2715
    if secondary_ip is None:
2716
      secondary_ip = primary_ip
2717
    if not utils.IsValidIP(secondary_ip):
2718
      raise errors.OpPrereqError("Invalid secondary IP given")
2719
    self.op.secondary_ip = secondary_ip
2720

    
2721
    node_list = cfg.GetNodeList()
2722
    if not self.op.readd and node in node_list:
2723
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2724
                                 node)
2725
    elif self.op.readd and node not in node_list:
2726
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2727

    
2728
    for existing_node_name in node_list:
2729
      existing_node = cfg.GetNodeInfo(existing_node_name)
2730

    
2731
      if self.op.readd and node == existing_node_name:
2732
        if (existing_node.primary_ip != primary_ip or
2733
            existing_node.secondary_ip != secondary_ip):
2734
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2735
                                     " address configuration as before")
2736
        continue
2737

    
2738
      if (existing_node.primary_ip == primary_ip or
2739
          existing_node.secondary_ip == primary_ip or
2740
          existing_node.primary_ip == secondary_ip or
2741
          existing_node.secondary_ip == secondary_ip):
2742
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2743
                                   " existing node %s" % existing_node.name)
2744

    
2745
    # check that the type of the node (single versus dual homed) is the
2746
    # same as for the master
2747
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2748
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2749
    newbie_singlehomed = secondary_ip == primary_ip
2750
    if master_singlehomed != newbie_singlehomed:
2751
      if master_singlehomed:
2752
        raise errors.OpPrereqError("The master has no private ip but the"
2753
                                   " new node has one")
2754
      else:
2755
        raise errors.OpPrereqError("The master has a private ip but the"
2756
                                   " new node doesn't have one")
2757

    
2758
    # checks reachability
2759
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2760
      raise errors.OpPrereqError("Node not reachable by ping")
2761

    
2762
    if not newbie_singlehomed:
2763
      # check reachability from my secondary ip to newbie's secondary ip
2764
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2765
                           source=myself.secondary_ip):
2766
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2767
                                   " based ping to noded port")
2768

    
2769
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2770
    if self.op.readd:
2771
      exceptions = [node]
2772
    else:
2773
      exceptions = []
2774
    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2775
    # the new node will increase mc_max with one, so:
2776
    mc_max = min(mc_max + 1, cp_size)
2777
    self.master_candidate = mc_now < mc_max
2778

    
2779
    if self.op.readd:
2780
      self.new_node = self.cfg.GetNodeInfo(node)
2781
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2782
    else:
2783
      self.new_node = objects.Node(name=node,
2784
                                   primary_ip=primary_ip,
2785
                                   secondary_ip=secondary_ip,
2786
                                   master_candidate=self.master_candidate,
2787
                                   offline=False, drained=False)
2788

    
2789
  def Exec(self, feedback_fn):
2790
    """Adds the new node to the cluster.
2791

2792
    """
2793
    new_node = self.new_node
2794
    node = new_node.name
2795

    
2796
    # for re-adds, reset the offline/drained/master-candidate flags;
2797
    # we need to reset here, otherwise offline would prevent RPC calls
2798
    # later in the procedure; this also means that if the re-add
2799
    # fails, we are left with a non-offlined, broken node
2800
    if self.op.readd:
2801
      new_node.drained = new_node.offline = False
2802
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2803
      # if we demote the node, we do cleanup later in the procedure
2804
      new_node.master_candidate = self.master_candidate
2805

    
2806
    # notify the user about any possible mc promotion
2807
    if new_node.master_candidate:
2808
      self.LogInfo("Node will be a master candidate")
2809

    
2810
    # check connectivity
2811
    result = self.rpc.call_version([node])[node]
2812
    result.Raise("Can't get version information from node %s" % node)
2813
    if constants.PROTOCOL_VERSION == result.payload:
2814
      logging.info("Communication to node %s fine, sw version %s match",
2815
                   node, result.payload)
2816
    else:
2817
      raise errors.OpExecError("Version mismatch master version %s,"
2818
                               " node version %s" %
2819
                               (constants.PROTOCOL_VERSION, result.payload))
2820

    
2821
    # setup ssh on node
2822
    logging.info("Copy ssh key to node %s", node)
2823
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2824
    keyarray = []
2825
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2826
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2827
                priv_key, pub_key]
2828

    
2829
    for i in keyfiles:
2830
      keyarray.append(utils.ReadFile(i))
2831

    
2832
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2833
                                    keyarray[2],
2834
                                    keyarray[3], keyarray[4], keyarray[5])
2835
    result.Raise("Cannot transfer ssh keys to the new node")
2836

    
2837
    # Add node to our /etc/hosts, and add key to known_hosts
2838
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2839
      utils.AddHostToEtcHosts(new_node.name)
2840

    
2841
    if new_node.secondary_ip != new_node.primary_ip:
2842
      result = self.rpc.call_node_has_ip_address(new_node.name,
2843
                                                 new_node.secondary_ip)
2844
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2845
                   prereq=True)
2846
      if not result.payload:
2847
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2848
                                 " you gave (%s). Please fix and re-run this"
2849
                                 " command." % new_node.secondary_ip)
2850

    
2851
    node_verify_list = [self.cfg.GetMasterNode()]
2852
    node_verify_param = {
2853
      constants.NV_NODELIST: [node],
2854
      # TODO: do a node-net-test as well?
2855
    }
2856

    
2857
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2858
                                       self.cfg.GetClusterName())
2859
    for verifier in node_verify_list:
2860
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2861
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
2862
      if nl_payload:
2863
        for failed in nl_payload:
2864
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2865
                      (verifier, nl_payload[failed]))
2866
        raise errors.OpExecError("ssh/hostname verification failed.")
2867

    
2868
    if self.op.readd:
2869
      _RedistributeAncillaryFiles(self)
2870
      self.context.ReaddNode(new_node)
2871
      # make sure we redistribute the config
2872
      self.cfg.Update(new_node)
2873
      # and make sure the new node will not have old files around
2874
      if not new_node.master_candidate:
2875
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2876
        msg = result.fail_msg
2877
        if msg:
2878
          self.LogWarning("Node failed to demote itself from master"
2879
                          " candidate status: %s" % msg)
2880
    else:
2881
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2882
      self.context.AddNode(new_node)
2883

    
2884

    
2885
class LUSetNodeParams(LogicalUnit):
2886
  """Modifies the parameters of a node.
2887

2888
  """
2889
  HPATH = "node-modify"
2890
  HTYPE = constants.HTYPE_NODE
2891
  _OP_REQP = ["node_name"]
2892
  REQ_BGL = False
2893

    
2894
  def CheckArguments(self):
2895
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2896
    if node_name is None:
2897
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2898
    self.op.node_name = node_name
2899
    _CheckBooleanOpField(self.op, 'master_candidate')
2900
    _CheckBooleanOpField(self.op, 'offline')
2901
    _CheckBooleanOpField(self.op, 'drained')
2902
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2903
    if all_mods.count(None) == 3:
2904
      raise errors.OpPrereqError("Please pass at least one modification")
2905
    if all_mods.count(True) > 1:
2906
      raise errors.OpPrereqError("Can't set the node into more than one"
2907
                                 " state at the same time")
2908

    
2909
  def ExpandNames(self):
2910
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2911

    
2912
  def BuildHooksEnv(self):
2913
    """Build hooks env.
2914

2915
    This runs on the master node.
2916

2917
    """
2918
    env = {
2919
      "OP_TARGET": self.op.node_name,
2920
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2921
      "OFFLINE": str(self.op.offline),
2922
      "DRAINED": str(self.op.drained),
2923
      }
2924
    nl = [self.cfg.GetMasterNode(),
2925
          self.op.node_name]
2926
    return env, nl, nl
2927

    
2928
  def CheckPrereq(self):
2929
    """Check prerequisites.
2930

2931
    This only checks the instance list against the existing names.
2932

2933
    """
2934
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2935

    
2936
    if (self.op.master_candidate is not None or
2937
        self.op.drained is not None or
2938
        self.op.offline is not None):
2939
      # we can't change the master's node flags
2940
      if self.op.node_name == self.cfg.GetMasterNode():
2941
        raise errors.OpPrereqError("The master role can be changed"
2942
                                   " only via masterfailover")
2943

    
2944
    if ((self.op.master_candidate == False or self.op.offline == True or
2945
         self.op.drained == True) and node.master_candidate):
2946
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2947
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2948
      if num_candidates <= cp_size:
2949
        msg = ("Not enough master candidates (desired"
2950
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2951
        if self.op.force:
2952
          self.LogWarning(msg)
2953
        else:
2954
          raise errors.OpPrereqError(msg)
2955

    
2956
    if (self.op.master_candidate == True and
2957
        ((node.offline and not self.op.offline == False) or
2958
         (node.drained and not self.op.drained == False))):
2959
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2960
                                 " to master_candidate" % node.name)
2961

    
2962
    return
2963

    
2964
  def Exec(self, feedback_fn):
2965
    """Modifies a node.
2966

2967
    """
2968
    node = self.node
2969

    
2970
    result = []
2971
    changed_mc = False
2972

    
2973
    if self.op.offline is not None:
2974
      node.offline = self.op.offline
2975
      result.append(("offline", str(self.op.offline)))
2976
      if self.op.offline == True:
2977
        if node.master_candidate:
2978
          node.master_candidate = False
2979
          changed_mc = True
2980
          result.append(("master_candidate", "auto-demotion due to offline"))
2981
        if node.drained:
2982
          node.drained = False
2983
          result.append(("drained", "clear drained status due to offline"))
2984

    
2985
    if self.op.master_candidate is not None:
2986
      node.master_candidate = self.op.master_candidate
2987
      changed_mc = True
2988
      result.append(("master_candidate", str(self.op.master_candidate)))
2989
      if self.op.master_candidate == False:
2990
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2991
        msg = rrc.fail_msg
2992
        if msg:
2993
          self.LogWarning("Node failed to demote itself: %s" % msg)
2994

    
2995
    if self.op.drained is not None:
2996
      node.drained = self.op.drained
2997
      result.append(("drained", str(self.op.drained)))
2998
      if self.op.drained == True:
2999
        if node.master_candidate:
3000
          node.master_candidate = False
3001
          changed_mc = True
3002
          result.append(("master_candidate", "auto-demotion due to drain"))
3003
          rrc = self.rpc.call_node_demote_from_mc(node.name)
3004
          msg = rrc.fail_msg
3005
          if msg:
3006
            self.LogWarning("Node failed to demote itself: %s" % msg)
3007
        if node.offline:
3008
          node.offline = False
3009
          result.append(("offline", "clear offline status due to drain"))
3010

    
3011
    # this will trigger configuration file update, if needed
3012
    self.cfg.Update(node)
3013
    # this will trigger job queue propagation or cleanup
3014
    if changed_mc:
3015
      self.context.ReaddNode(node)
3016

    
3017
    return result
3018

    
3019

    
3020
class LUPowercycleNode(NoHooksLU):
3021
  """Powercycles a node.
3022

3023
  """
3024
  _OP_REQP = ["node_name", "force"]
3025
  REQ_BGL = False
3026

    
3027
  def CheckArguments(self):
3028
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
3029
    if node_name is None:
3030
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
3031
    self.op.node_name = node_name
3032
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
3033
      raise errors.OpPrereqError("The node is the master and the force"
3034
                                 " parameter was not set")
3035

    
3036
  def ExpandNames(self):
3037
    """Locking for PowercycleNode.
3038

3039
    This is a last-resort option and shouldn't block on other
3040
    jobs. Therefore, we grab no locks.
3041

3042
    """
3043
    self.needed_locks = {}
3044

    
3045
  def CheckPrereq(self):
3046
    """Check prerequisites.
3047

3048
    This LU has no prereqs.
3049

3050
    """
3051
    pass
3052

    
3053
  def Exec(self, feedback_fn):
3054
    """Reboots a node.
3055

3056
    """
3057
    result = self.rpc.call_node_powercycle(self.op.node_name,
3058
                                           self.cfg.GetHypervisorType())
3059
    result.Raise("Failed to schedule the reboot")
3060
    return result.payload
3061

    
3062

    
3063
class LUQueryClusterInfo(NoHooksLU):
3064
  """Query cluster configuration.
3065

3066
  """
3067
  _OP_REQP = []
3068
  REQ_BGL = False
3069

    
3070
  def ExpandNames(self):
3071
    self.needed_locks = {}
3072

    
3073
  def CheckPrereq(self):
3074
    """No prerequsites needed for this LU.
3075

3076
    """
3077
    pass
3078

    
3079
  def Exec(self, feedback_fn):
3080
    """Return cluster config.
3081

3082
    """
3083
    cluster = self.cfg.GetClusterInfo()
3084
    result = {
3085
      "software_version": constants.RELEASE_VERSION,
3086
      "protocol_version": constants.PROTOCOL_VERSION,
3087
      "config_version": constants.CONFIG_VERSION,
3088
      "os_api_version": max(constants.OS_API_VERSIONS),
3089
      "export_version": constants.EXPORT_VERSION,
3090
      "architecture": (platform.architecture()[0], platform.machine()),
3091
      "name": cluster.cluster_name,
3092
      "master": cluster.master_node,
3093
      "default_hypervisor": cluster.enabled_hypervisors[0],
3094
      "enabled_hypervisors": cluster.enabled_hypervisors,
3095
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3096
                        for hypervisor_name in cluster.enabled_hypervisors]),
3097
      "beparams": cluster.beparams,
3098
      "nicparams": cluster.nicparams,
3099
      "candidate_pool_size": cluster.candidate_pool_size,
3100
      "master_netdev": cluster.master_netdev,
3101
      "volume_group_name": cluster.volume_group_name,
3102
      "file_storage_dir": cluster.file_storage_dir,
3103
      "ctime": cluster.ctime,
3104
      "mtime": cluster.mtime,
3105
      "tags": list(cluster.GetTags()),
3106
      }
3107

    
3108
    return result
3109

    
3110

    
3111
class LUQueryConfigValues(NoHooksLU):
3112
  """Return configuration values.
3113

3114
  """
3115
  _OP_REQP = []
3116
  REQ_BGL = False
3117
  _FIELDS_DYNAMIC = utils.FieldSet()
3118
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3119
                                  "watcher_pause")
3120

    
3121
  def ExpandNames(self):
3122
    self.needed_locks = {}
3123

    
3124
    _CheckOutputFields(static=self._FIELDS_STATIC,
3125
                       dynamic=self._FIELDS_DYNAMIC,
3126
                       selected=self.op.output_fields)
3127

    
3128
  def CheckPrereq(self):
3129
    """No prerequisites.
3130

3131
    """
3132
    pass
3133

    
3134
  def Exec(self, feedback_fn):
3135
    """Dump a representation of the cluster config to the standard output.
3136

3137
    """
3138
    values = []
3139
    for field in self.op.output_fields:
3140
      if field == "cluster_name":
3141
        entry = self.cfg.GetClusterName()
3142
      elif field == "master_node":
3143
        entry = self.cfg.GetMasterNode()
3144
      elif field == "drain_flag":
3145
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3146
      elif field == "watcher_pause":
3147
        return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3148
      else:
3149
        raise errors.ParameterError(field)
3150
      values.append(entry)
3151
    return values
3152

    
3153

    
3154
class LUActivateInstanceDisks(NoHooksLU):
3155
  """Bring up an instance's disks.
3156

3157
  """
3158
  _OP_REQP = ["instance_name"]
3159
  REQ_BGL = False
3160

    
3161
  def ExpandNames(self):
3162
    self._ExpandAndLockInstance()
3163
    self.needed_locks[locking.LEVEL_NODE] = []
3164
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3165

    
3166
  def DeclareLocks(self, level):
3167
    if level == locking.LEVEL_NODE:
3168
      self._LockInstancesNodes()
3169

    
3170
  def CheckPrereq(self):
3171
    """Check prerequisites.
3172

3173
    This checks that the instance is in the cluster.
3174

3175
    """
3176
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3177
    assert self.instance is not None, \
3178
      "Cannot retrieve locked instance %s" % self.op.instance_name
3179
    _CheckNodeOnline(self, self.instance.primary_node)
3180
    if not hasattr(self.op, "ignore_size"):
3181
      self.op.ignore_size = False
3182

    
3183
  def Exec(self, feedback_fn):
3184
    """Activate the disks.
3185

3186
    """
3187
    disks_ok, disks_info = \
3188
              _AssembleInstanceDisks(self, self.instance,
3189
                                     ignore_size=self.op.ignore_size)
3190
    if not disks_ok:
3191
      raise errors.OpExecError("Cannot activate block devices")
3192

    
3193
    return disks_info
3194

    
3195

    
3196
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3197
                           ignore_size=False):
3198
  """Prepare the block devices for an instance.
3199

3200
  This sets up the block devices on all nodes.
3201

3202
  @type lu: L{LogicalUnit}
3203
  @param lu: the logical unit on whose behalf we execute
3204
  @type instance: L{objects.Instance}
3205
  @param instance: the instance for whose disks we assemble
3206
  @type ignore_secondaries: boolean
3207
  @param ignore_secondaries: if true, errors on secondary nodes
3208
      won't result in an error return from the function
3209
  @type ignore_size: boolean
3210
  @param ignore_size: if true, the current known size of the disk
3211
      will not be used during the disk activation, useful for cases
3212
      when the size is wrong
3213
  @return: False if the operation failed, otherwise a list of
3214
      (host, instance_visible_name, node_visible_name)
3215
      with the mapping from node devices to instance devices
3216

3217
  """
3218
  device_info = []
3219
  disks_ok = True
3220
  iname = instance.name
3221
  # With the two passes mechanism we try to reduce the window of
3222
  # opportunity for the race condition of switching DRBD to primary
3223
  # before handshaking occured, but we do not eliminate it
3224

    
3225
  # The proper fix would be to wait (with some limits) until the
3226
  # connection has been made and drbd transitions from WFConnection
3227
  # into any other network-connected state (Connected, SyncTarget,
3228
  # SyncSource, etc.)
3229

    
3230
  # 1st pass, assemble on all nodes in secondary mode
3231
  for inst_disk in instance.disks:
3232
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3233
      if ignore_size:
3234
        node_disk = node_disk.Copy()
3235
        node_disk.UnsetSize()
3236
      lu.cfg.SetDiskID(node_disk, node)
3237
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3238
      msg = result.fail_msg
3239
      if msg:
3240
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3241
                           " (is_primary=False, pass=1): %s",
3242
                           inst_disk.iv_name, node, msg)
3243
        if not ignore_secondaries:
3244
          disks_ok = False
3245

    
3246
  # FIXME: race condition on drbd migration to primary
3247

    
3248
  # 2nd pass, do only the primary node
3249
  for inst_disk in instance.disks:
3250
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3251
      if node != instance.primary_node:
3252
        continue
3253
      if ignore_size:
3254
        node_disk = node_disk.Copy()
3255
        node_disk.UnsetSize()
3256
      lu.cfg.SetDiskID(node_disk, node)
3257
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3258
      msg = result.fail_msg
3259
      if msg:
3260
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3261
                           " (is_primary=True, pass=2): %s",
3262
                           inst_disk.iv_name, node, msg)
3263
        disks_ok = False
3264
    device_info.append((instance.primary_node, inst_disk.iv_name,
3265
                        result.payload))
3266

    
3267
  # leave the disks configured for the primary node
3268
  # this is a workaround that would be fixed better by
3269
  # improving the logical/physical id handling
3270
  for disk in instance.disks:
3271
    lu.cfg.SetDiskID(disk, instance.primary_node)
3272

    
3273
  return disks_ok, device_info
3274

    
3275

    
3276
def _StartInstanceDisks(lu, instance, force):
3277
  """Start the disks of an instance.
3278

3279
  """
3280
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3281
                                           ignore_secondaries=force)
3282
  if not disks_ok:
3283
    _ShutdownInstanceDisks(lu, instance)
3284
    if force is not None and not force:
3285
      lu.proc.LogWarning("", hint="If the message above refers to a"
3286
                         " secondary node,"
3287
                         " you can retry the operation using '--force'.")
3288
    raise errors.OpExecError("Disk consistency error")
3289

    
3290

    
3291
class LUDeactivateInstanceDisks(NoHooksLU):
3292
  """Shutdown an instance's disks.
3293

3294
  """
3295
  _OP_REQP = ["instance_name"]
3296
  REQ_BGL = False
3297

    
3298
  def ExpandNames(self):
3299
    self._ExpandAndLockInstance()
3300
    self.needed_locks[locking.LEVEL_NODE] = []
3301
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3302

    
3303
  def DeclareLocks(self, level):
3304
    if level == locking.LEVEL_NODE:
3305
      self._LockInstancesNodes()
3306

    
3307
  def CheckPrereq(self):
3308
    """Check prerequisites.
3309

3310
    This checks that the instance is in the cluster.
3311

3312
    """
3313
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3314
    assert self.instance is not None, \
3315
      "Cannot retrieve locked instance %s" % self.op.instance_name
3316

    
3317
  def Exec(self, feedback_fn):
3318
    """Deactivate the disks
3319

3320
    """
3321
    instance = self.instance
3322
    _SafeShutdownInstanceDisks(self, instance)
3323

    
3324

    
3325
def _SafeShutdownInstanceDisks(lu, instance):
3326
  """Shutdown block devices of an instance.
3327

3328
  This function checks if an instance is running, before calling
3329
  _ShutdownInstanceDisks.
3330

3331
  """
3332
  pnode = instance.primary_node
3333
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3334
  ins_l.Raise("Can't contact node %s" % pnode)
3335

    
3336
  if instance.name in ins_l.payload:
3337
    raise errors.OpExecError("Instance is running, can't shutdown"
3338
                             " block devices.")
3339

    
3340
  _ShutdownInstanceDisks(lu, instance)
3341

    
3342

    
3343
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3344
  """Shutdown block devices of an instance.
3345

3346
  This does the shutdown on all nodes of the instance.
3347

3348
  If the ignore_primary is false, errors on the primary node are
3349
  ignored.
3350

3351
  """
3352
  all_result = True
3353
  for disk in instance.disks:
3354
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3355
      lu.cfg.SetDiskID(top_disk, node)
3356
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3357
      msg = result.fail_msg
3358
      if msg:
3359
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3360
                      disk.iv_name, node, msg)
3361
        if not ignore_primary or node != instance.primary_node:
3362
          all_result = False
3363
  return all_result
3364

    
3365

    
3366
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3367
  """Checks if a node has enough free memory.
3368

3369
  This function check if a given node has the needed amount of free
3370
  memory. In case the node has less memory or we cannot get the
3371
  information from the node, this function raise an OpPrereqError
3372
  exception.
3373

3374
  @type lu: C{LogicalUnit}
3375
  @param lu: a logical unit from which we get configuration data
3376
  @type node: C{str}
3377
  @param node: the node to check
3378
  @type reason: C{str}
3379
  @param reason: string to use in the error message
3380
  @type requested: C{int}
3381
  @param requested: the amount of memory in MiB to check for
3382
  @type hypervisor_name: C{str}
3383
  @param hypervisor_name: the hypervisor to ask for memory stats
3384
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3385
      we cannot check the node
3386

3387
  """
3388
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3389
  nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3390
  free_mem = nodeinfo[node].payload.get('memory_free', None)
3391
  if not isinstance(free_mem, int):
3392
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3393
                               " was '%s'" % (node, free_mem))
3394
  if requested > free_mem:
3395
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3396
                               " needed %s MiB, available %s MiB" %
3397
                               (node, reason, requested, free_mem))
3398

    
3399

    
3400
class LUStartupInstance(LogicalUnit):
3401
  """Starts an instance.
3402

3403
  """
3404
  HPATH = "instance-start"
3405
  HTYPE = constants.HTYPE_INSTANCE
3406
  _OP_REQP = ["instance_name", "force"]
3407
  REQ_BGL = False
3408

    
3409
  def ExpandNames(self):
3410
    self._ExpandAndLockInstance()
3411

    
3412
  def BuildHooksEnv(self):
3413
    """Build hooks env.
3414

3415
    This runs on master, primary and secondary nodes of the instance.
3416

3417
    """
3418
    env = {
3419
      "FORCE": self.op.force,
3420
      }
3421
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3422
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3423
    return env, nl, nl
3424

    
3425
  def CheckPrereq(self):
3426
    """Check prerequisites.
3427

3428
    This checks that the instance is in the cluster.
3429

3430
    """
3431
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3432
    assert self.instance is not None, \
3433
      "Cannot retrieve locked instance %s" % self.op.instance_name
3434

    
3435
    # extra beparams
3436
    self.beparams = getattr(self.op, "beparams", {})
3437
    if self.beparams:
3438
      if not isinstance(self.beparams, dict):
3439
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3440
                                   " dict" % (type(self.beparams), ))
3441
      # fill the beparams dict
3442
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3443
      self.op.beparams = self.beparams
3444

    
3445
    # extra hvparams
3446
    self.hvparams = getattr(self.op, "hvparams", {})
3447
    if self.hvparams:
3448
      if not isinstance(self.hvparams, dict):
3449
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3450
                                   " dict" % (type(self.hvparams), ))
3451

    
3452
      # check hypervisor parameter syntax (locally)
3453
      cluster = self.cfg.GetClusterInfo()
3454
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3455
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3456
                                    instance.hvparams)
3457
      filled_hvp.update(self.hvparams)
3458
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3459
      hv_type.CheckParameterSyntax(filled_hvp)
3460
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3461
      self.op.hvparams = self.hvparams
3462

    
3463
    _CheckNodeOnline(self, instance.primary_node)
3464

    
3465
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3466
    # check bridges existence
3467
    _CheckInstanceBridgesExist(self, instance)
3468

    
3469
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3470
                                              instance.name,
3471
                                              instance.hypervisor)
3472
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3473
                      prereq=True)
3474
    if not remote_info.payload: # not running already
3475
      _CheckNodeFreeMemory(self, instance.primary_node,
3476
                           "starting instance %s" % instance.name,
3477
                           bep[constants.BE_MEMORY], instance.hypervisor)
3478

    
3479
  def Exec(self, feedback_fn):
3480
    """Start the instance.
3481

3482
    """
3483
    instance = self.instance
3484
    force = self.op.force
3485

    
3486
    self.cfg.MarkInstanceUp(instance.name)
3487

    
3488
    node_current = instance.primary_node
3489

    
3490
    _StartInstanceDisks(self, instance, force)
3491

    
3492
    result = self.rpc.call_instance_start(node_current, instance,
3493
                                          self.hvparams, self.beparams)
3494
    msg = result.fail_msg
3495
    if msg:
3496
      _ShutdownInstanceDisks(self, instance)
3497
      raise errors.OpExecError("Could not start instance: %s" % msg)
3498

    
3499

    
3500
class LURebootInstance(LogicalUnit):
3501
  """Reboot an instance.
3502

3503
  """
3504
  HPATH = "instance-reboot"
3505
  HTYPE = constants.HTYPE_INSTANCE
3506
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3507
  REQ_BGL = False
3508

    
3509
  def ExpandNames(self):
3510
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3511
                                   constants.INSTANCE_REBOOT_HARD,
3512
                                   constants.INSTANCE_REBOOT_FULL]:
3513
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3514
                                  (constants.INSTANCE_REBOOT_SOFT,
3515
                                   constants.INSTANCE_REBOOT_HARD,
3516
                                   constants.INSTANCE_REBOOT_FULL))
3517
    self._ExpandAndLockInstance()
3518

    
3519
  def BuildHooksEnv(self):
3520
    """Build hooks env.
3521

3522
    This runs on master, primary and secondary nodes of the instance.
3523

3524
    """
3525
    env = {
3526
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3527
      "REBOOT_TYPE": self.op.reboot_type,
3528
      }
3529
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3530
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3531
    return env, nl, nl
3532

    
3533
  def CheckPrereq(self):
3534
    """Check prerequisites.
3535

3536
    This checks that the instance is in the cluster.
3537

3538
    """
3539
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3540
    assert self.instance is not None, \
3541
      "Cannot retrieve locked instance %s" % self.op.instance_name
3542

    
3543
    _CheckNodeOnline(self, instance.primary_node)
3544

    
3545
    # check bridges existence
3546
    _CheckInstanceBridgesExist(self, instance)
3547

    
3548
  def Exec(self, feedback_fn):
3549
    """Reboot the instance.
3550

3551
    """
3552
    instance = self.instance
3553
    ignore_secondaries = self.op.ignore_secondaries
3554
    reboot_type = self.op.reboot_type
3555

    
3556
    node_current = instance.primary_node
3557

    
3558
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3559
                       constants.INSTANCE_REBOOT_HARD]:
3560
      for disk in instance.disks:
3561
        self.cfg.SetDiskID(disk, node_current)
3562
      result = self.rpc.call_instance_reboot(node_current, instance,
3563
                                             reboot_type)
3564
      result.Raise("Could not reboot instance")
3565
    else:
3566
      result = self.rpc.call_instance_shutdown(node_current, instance)
3567
      result.Raise("Could not shutdown instance for full reboot")
3568
      _ShutdownInstanceDisks(self, instance)
3569
      _StartInstanceDisks(self, instance, ignore_secondaries)
3570
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3571
      msg = result.fail_msg
3572
      if msg:
3573
        _ShutdownInstanceDisks(self, instance)
3574
        raise errors.OpExecError("Could not start instance for"
3575
                                 " full reboot: %s" % msg)
3576

    
3577
    self.cfg.MarkInstanceUp(instance.name)
3578

    
3579

    
3580
class LUShutdownInstance(LogicalUnit):
3581
  """Shutdown an instance.
3582

3583
  """
3584
  HPATH = "instance-stop"
3585
  HTYPE = constants.HTYPE_INSTANCE
3586
  _OP_REQP = ["instance_name"]
3587
  REQ_BGL = False
3588

    
3589
  def ExpandNames(self):
3590
    self._ExpandAndLockInstance()
3591

    
3592
  def BuildHooksEnv(self):
3593
    """Build hooks env.
3594

3595
    This runs on master, primary and secondary nodes of the instance.
3596

3597
    """
3598
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3599
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3600
    return env, nl, nl
3601

    
3602
  def CheckPrereq(self):
3603
    """Check prerequisites.
3604

3605
    This checks that the instance is in the cluster.
3606

3607
    """
3608
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3609
    assert self.instance is not None, \
3610
      "Cannot retrieve locked instance %s" % self.op.instance_name
3611
    _CheckNodeOnline(self, self.instance.primary_node)
3612

    
3613
  def Exec(self, feedback_fn):
3614
    """Shutdown the instance.
3615

3616
    """
3617
    instance = self.instance
3618
    node_current = instance.primary_node
3619
    self.cfg.MarkInstanceDown(instance.name)
3620
    result = self.rpc.call_instance_shutdown(node_current, instance)
3621
    msg = result.fail_msg
3622
    if msg:
3623
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3624

    
3625
    _ShutdownInstanceDisks(self, instance)
3626

    
3627

    
3628
class LUReinstallInstance(LogicalUnit):
3629
  """Reinstall an instance.
3630

3631
  """
3632
  HPATH = "instance-reinstall"
3633
  HTYPE = constants.HTYPE_INSTANCE
3634
  _OP_REQP = ["instance_name"]
3635
  REQ_BGL = False
3636

    
3637
  def ExpandNames(self):
3638
    self._ExpandAndLockInstance()
3639

    
3640
  def BuildHooksEnv(self):
3641
    """Build hooks env.
3642

3643
    This runs on master, primary and secondary nodes of the instance.
3644

3645
    """
3646
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3647
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3648
    return env, nl, nl
3649

    
3650
  def CheckPrereq(self):
3651
    """Check prerequisites.
3652

3653
    This checks that the instance is in the cluster and is not running.
3654

3655
    """
3656
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3657
    assert instance is not None, \
3658
      "Cannot retrieve locked instance %s" % self.op.instance_name
3659
    _CheckNodeOnline(self, instance.primary_node)
3660

    
3661
    if instance.disk_template == constants.DT_DISKLESS:
3662
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3663
                                 self.op.instance_name)
3664
    if instance.admin_up:
3665
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3666
                                 self.op.instance_name)
3667
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3668
                                              instance.name,
3669
                                              instance.hypervisor)
3670
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3671
                      prereq=True)
3672
    if remote_info.payload:
3673
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3674
                                 (self.op.instance_name,
3675
                                  instance.primary_node))
3676

    
3677
    self.op.os_type = getattr(self.op, "os_type", None)
3678
    if self.op.os_type is not None:
3679
      # OS verification
3680
      pnode = self.cfg.GetNodeInfo(
3681
        self.cfg.ExpandNodeName(instance.primary_node))
3682
      if pnode is None:
3683
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3684
                                   self.op.pnode)
3685
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3686
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3687
                   (self.op.os_type, pnode.name), prereq=True)
3688

    
3689
    self.instance = instance
3690

    
3691
  def Exec(self, feedback_fn):
3692
    """Reinstall the instance.
3693

3694
    """
3695
    inst = self.instance
3696

    
3697
    if self.op.os_type is not None:
3698
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3699
      inst.os = self.op.os_type
3700
      self.cfg.Update(inst)
3701

    
3702
    _StartInstanceDisks(self, inst, None)
3703
    try:
3704
      feedback_fn("Running the instance OS create scripts...")
3705
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3706
      result.Raise("Could not install OS for instance %s on node %s" %
3707
                   (inst.name, inst.primary_node))
3708
    finally:
3709
      _ShutdownInstanceDisks(self, inst)
3710

    
3711

    
3712
class LURecreateInstanceDisks(LogicalUnit):
3713
  """Recreate an instance's missing disks.
3714

3715
  """
3716
  HPATH = "instance-recreate-disks"
3717
  HTYPE = constants.HTYPE_INSTANCE
3718
  _OP_REQP = ["instance_name", "disks"]
3719
  REQ_BGL = False
3720

    
3721
  def CheckArguments(self):
3722
    """Check the arguments.
3723

3724
    """
3725
    if not isinstance(self.op.disks, list):
3726
      raise errors.OpPrereqError("Invalid disks parameter")
3727
    for item in self.op.disks:
3728
      if (not isinstance(item, int) or
3729
          item < 0):
3730
        raise errors.OpPrereqError("Invalid disk specification '%s'" %
3731
                                   str(item))
3732

    
3733
  def ExpandNames(self):
3734
    self._ExpandAndLockInstance()
3735

    
3736
  def BuildHooksEnv(self):
3737
    """Build hooks env.
3738

3739
    This runs on master, primary and secondary nodes of the instance.
3740

3741
    """
3742
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3743
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3744
    return env, nl, nl
3745

    
3746
  def CheckPrereq(self):
3747
    """Check prerequisites.
3748

3749
    This checks that the instance is in the cluster and is not running.
3750

3751
    """
3752
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3753
    assert instance is not None, \
3754
      "Cannot retrieve locked instance %s" % self.op.instance_name
3755
    _CheckNodeOnline(self, instance.primary_node)
3756

    
3757
    if instance.disk_template == constants.DT_DISKLESS:
3758
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3759
                                 self.op.instance_name)
3760
    if instance.admin_up:
3761
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3762
                                 self.op.instance_name)
3763
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3764
                                              instance.name,
3765
                                              instance.hypervisor)
3766
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3767
                      prereq=True)
3768
    if remote_info.payload:
3769
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3770
                                 (self.op.instance_name,
3771
                                  instance.primary_node))
3772

    
3773
    if not self.op.disks:
3774
      self.op.disks = range(len(instance.disks))
3775
    else:
3776
      for idx in self.op.disks:
3777
        if idx >= len(instance.disks):
3778
          raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3779

    
3780
    self.instance = instance
3781

    
3782
  def Exec(self, feedback_fn):
3783
    """Recreate the disks.
3784

3785
    """
3786
    to_skip = []
3787
    for idx, disk in enumerate(self.instance.disks):
3788
      if idx not in self.op.disks: # disk idx has not been passed in
3789
        to_skip.append(idx)
3790
        continue
3791

    
3792
    _CreateDisks(self, self.instance, to_skip=to_skip)
3793

    
3794

    
3795
class LURenameInstance(LogicalUnit):
3796
  """Rename an instance.
3797

3798
  """
3799
  HPATH = "instance-rename"
3800
  HTYPE = constants.HTYPE_INSTANCE
3801
  _OP_REQP = ["instance_name", "new_name"]
3802

    
3803
  def BuildHooksEnv(self):
3804
    """Build hooks env.
3805

3806
    This runs on master, primary and secondary nodes of the instance.
3807

3808
    """
3809
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3810
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3811
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3812
    return env, nl, nl
3813

    
3814
  def CheckPrereq(self):
3815
    """Check prerequisites.
3816

3817
    This checks that the instance is in the cluster and is not running.
3818

3819
    """
3820
    instance = self.cfg.GetInstanceInfo(
3821
      self.cfg.ExpandInstanceName(self.op.instance_name))
3822
    if instance is None:
3823
      raise errors.OpPrereqError("Instance '%s' not known" %
3824
                                 self.op.instance_name)
3825
    _CheckNodeOnline(self, instance.primary_node)
3826

    
3827
    if instance.admin_up:
3828
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3829
                                 self.op.instance_name)
3830
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3831
                                              instance.name,
3832
                                              instance.hypervisor)
3833
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3834
                      prereq=True)
3835
    if remote_info.payload:
3836
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3837
                                 (self.op.instance_name,
3838
                                  instance.primary_node))
3839
    self.instance = instance
3840

    
3841
    # new name verification
3842
    name_info = utils.HostInfo(self.op.new_name)
3843

    
3844
    self.op.new_name = new_name = name_info.name
3845
    instance_list = self.cfg.GetInstanceList()
3846
    if new_name in instance_list:
3847
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3848
                                 new_name)
3849

    
3850
    if not getattr(self.op, "ignore_ip", False):
3851
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3852
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3853
                                   (name_info.ip, new_name))
3854

    
3855

    
3856
  def Exec(self, feedback_fn):
3857
    """Reinstall the instance.
3858

3859
    """
3860
    inst = self.instance
3861
    old_name = inst.name
3862

    
3863
    if inst.disk_template == constants.DT_FILE:
3864
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3865

    
3866
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3867
    # Change the instance lock. This is definitely safe while we hold the BGL
3868
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3869
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3870

    
3871
    # re-read the instance from the configuration after rename
3872
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3873

    
3874
    if inst.disk_template == constants.DT_FILE:
3875
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3876
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3877
                                                     old_file_storage_dir,
3878
                                                     new_file_storage_dir)
3879
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
3880
                   " (but the instance has been renamed in Ganeti)" %
3881
                   (inst.primary_node, old_file_storage_dir,
3882
                    new_file_storage_dir))
3883

    
3884
    _StartInstanceDisks(self, inst, None)
3885
    try:
3886
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3887
                                                 old_name)
3888
      msg = result.fail_msg
3889
      if msg:
3890
        msg = ("Could not run OS rename script for instance %s on node %s"
3891
               " (but the instance has been renamed in Ganeti): %s" %
3892
               (inst.name, inst.primary_node, msg))
3893
        self.proc.LogWarning(msg)
3894
    finally:
3895
      _ShutdownInstanceDisks(self, inst)
3896

    
3897

    
3898
class LURemoveInstance(LogicalUnit):
3899
  """Remove an instance.
3900

3901
  """
3902
  HPATH = "instance-remove"
3903
  HTYPE = constants.HTYPE_INSTANCE
3904
  _OP_REQP = ["instance_name", "ignore_failures"]
3905
  REQ_BGL = False
3906

    
3907
  def ExpandNames(self):
3908
    self._ExpandAndLockInstance()
3909
    self.needed_locks[locking.LEVEL_NODE] = []
3910
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3911

    
3912
  def DeclareLocks(self, level):
3913
    if level == locking.LEVEL_NODE:
3914
      self._LockInstancesNodes()
3915

    
3916
  def BuildHooksEnv(self):
3917
    """Build hooks env.
3918

3919
    This runs on master, primary and secondary nodes of the instance.
3920

3921
    """
3922
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3923
    nl = [self.cfg.GetMasterNode()]
3924
    return env, nl, nl
3925

    
3926
  def CheckPrereq(self):
3927
    """Check prerequisites.
3928

3929
    This checks that the instance is in the cluster.
3930

3931
    """
3932
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3933
    assert self.instance is not None, \
3934
      "Cannot retrieve locked instance %s" % self.op.instance_name
3935

    
3936
  def Exec(self, feedback_fn):
3937
    """Remove the instance.
3938

3939
    """
3940
    instance = self.instance
3941
    logging.info("Shutting down instance %s on node %s",
3942
                 instance.name, instance.primary_node)
3943

    
3944
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3945
    msg = result.fail_msg
3946
    if msg:
3947
      if self.op.ignore_failures:
3948
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3949
      else:
3950
        raise errors.OpExecError("Could not shutdown instance %s on"
3951
                                 " node %s: %s" %
3952
                                 (instance.name, instance.primary_node, msg))
3953

    
3954
    logging.info("Removing block devices for instance %s", instance.name)
3955

    
3956
    if not _RemoveDisks(self, instance):
3957
      if self.op.ignore_failures:
3958
        feedback_fn("Warning: can't remove instance's disks")
3959
      else:
3960
        raise errors.OpExecError("Can't remove instance's disks")
3961

    
3962
    logging.info("Removing instance %s out of cluster config", instance.name)
3963

    
3964
    self.cfg.RemoveInstance(instance.name)
3965
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3966

    
3967

    
3968
class LUQueryInstances(NoHooksLU):
3969
  """Logical unit for querying instances.
3970

3971
  """
3972
  _OP_REQP = ["output_fields", "names", "use_locking"]
3973
  REQ_BGL = False
3974
  _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
3975
                    "serial_no", "ctime", "mtime", "uuid"]
3976
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3977
                                    "admin_state",
3978
                                    "disk_template", "ip", "mac", "bridge",
3979
                                    "nic_mode", "nic_link",
3980
                                    "sda_size", "sdb_size", "vcpus", "tags",
3981
                                    "network_port", "beparams",
3982
                                    r"(disk)\.(size)/([0-9]+)",
3983
                                    r"(disk)\.(sizes)", "disk_usage",
3984
                                    r"(nic)\.(mac|ip|mo