Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ e623dbe3

History | View | Annotate | Download (290.3 kB)

<
1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq (except when tasklets are used)
51
    - implement Exec (except when tasklets are used)
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  @ivar dry_run_result: the value (if any) that will be returned to the caller
60
      in dry-run mode (signalled by opcode dry_run parameter)
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overridden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92
    self.LogStep = processor.LogStep
93
    # support for dry-run
94
    self.dry_run_result = None
95

    
96
    # Tasklets
97
    self.tasklets = None
98

    
99
    for attr_name in self._OP_REQP:
100
      attr_val = getattr(op, attr_name, None)
101
      if attr_val is None:
102
        raise errors.OpPrereqError("Required parameter '%s' missing" %
103
                                   attr_name)
104

    
105
    self.CheckArguments()
106

    
107
  def __GetSSH(self):
108
    """Returns the SshRunner object
109

110
    """
111
    if not self.__ssh:
112
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113
    return self.__ssh
114

    
115
  ssh = property(fget=__GetSSH)
116

    
117
  def CheckArguments(self):
118
    """Check syntactic validity for the opcode arguments.
119

120
    This method is for doing a simple syntactic check and ensure
121
    validity of opcode parameters, without any cluster-related
122
    checks. While the same can be accomplished in ExpandNames and/or
123
    CheckPrereq, doing these separate is better because:
124

125
      - ExpandNames is left as as purely a lock-related function
126
      - CheckPrereq is run after we have acquired locks (and possible
127
        waited for them)
128

129
    The function is allowed to change the self.op attribute so that
130
    later methods can no longer worry about missing parameters.
131

132
    """
133
    pass
134

    
135
  def ExpandNames(self):
136
    """Expand names for this LU.
137

138
    This method is called before starting to execute the opcode, and it should
139
    update all the parameters of the opcode to their canonical form (e.g. a
140
    short node name must be fully expanded after this method has successfully
141
    completed). This way locking, hooks, logging, ecc. can work correctly.
142

143
    LUs which implement this method must also populate the self.needed_locks
144
    member, as a dict with lock levels as keys, and a list of needed lock names
145
    as values. Rules:
146

147
      - use an empty dict if you don't need any lock
148
      - if you don't need any lock at a particular level omit that level
149
      - don't put anything for the BGL level
150
      - if you want all locks at a level use locking.ALL_SET as a value
151

152
    If you need to share locks (rather than acquire them exclusively) at one
153
    level you can modify self.share_locks, setting a true value (usually 1) for
154
    that level. By default locks are not shared.
155

156
    This function can also define a list of tasklets, which then will be
157
    executed in order instead of the usual LU-level CheckPrereq and Exec
158
    functions, if those are not defined by the LU.
159

160
    Examples::
161

162
      # Acquire all nodes and one instance
163
      self.needed_locks = {
164
        locking.LEVEL_NODE: locking.ALL_SET,
165
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
166
      }
167
      # Acquire just two nodes
168
      self.needed_locks = {
169
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
170
      }
171
      # Acquire no locks
172
      self.needed_locks = {} # No, you can't leave it to the default value None
173

174
    """
175
    # The implementation of this method is mandatory only if the new LU is
176
    # concurrent, so that old LUs don't need to be changed all at the same
177
    # time.
178
    if self.REQ_BGL:
179
      self.needed_locks = {} # Exclusive LUs don't need locks.
180
    else:
181
      raise NotImplementedError
182

    
183
  def DeclareLocks(self, level):
184
    """Declare LU locking needs for a level
185

186
    While most LUs can just declare their locking needs at ExpandNames time,
187
    sometimes there's the need to calculate some locks after having acquired
188
    the ones before. This function is called just before acquiring locks at a
189
    particular level, but after acquiring the ones at lower levels, and permits
190
    such calculations. It can be used to modify self.needed_locks, and by
191
    default it does nothing.
192

193
    This function is only called if you have something already set in
194
    self.needed_locks for the level.
195

196
    @param level: Locking level which is going to be locked
197
    @type level: member of ganeti.locking.LEVELS
198

199
    """
200

    
201
  def CheckPrereq(self):
202
    """Check prerequisites for this LU.
203

204
    This method should check that the prerequisites for the execution
205
    of this LU are fulfilled. It can do internode communication, but
206
    it should be idempotent - no cluster or system changes are
207
    allowed.
208

209
    The method should raise errors.OpPrereqError in case something is
210
    not fulfilled. Its return value is ignored.
211

212
    This method should also update all the parameters of the opcode to
213
    their canonical form if it hasn't been done by ExpandNames before.
214

215
    """
216
    if self.tasklets is not None:
217
      for (idx, tl) in enumerate(self.tasklets):
218
        logging.debug("Checking prerequisites for tasklet %s/%s",
219
                      idx + 1, len(self.tasklets))
220
        tl.CheckPrereq()
221
    else:
222
      raise NotImplementedError
223

    
224
  def Exec(self, feedback_fn):
225
    """Execute the LU.
226

227
    This method should implement the actual work. It should raise
228
    errors.OpExecError for failures that are somewhat dealt with in
229
    code, or expected.
230

231
    """
232
    if self.tasklets is not None:
233
      for (idx, tl) in enumerate(self.tasklets):
234
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
235
        tl.Exec(feedback_fn)
236
    else:
237
      raise NotImplementedError
238

    
239
  def BuildHooksEnv(self):
240
    """Build hooks environment for this LU.
241

242
    This method should return a three-node tuple consisting of: a dict
243
    containing the environment that will be used for running the
244
    specific hook for this LU, a list of node names on which the hook
245
    should run before the execution, and a list of node names on which
246
    the hook should run after the execution.
247

248
    The keys of the dict must not have 'GANETI_' prefixed as this will
249
    be handled in the hooks runner. Also note additional keys will be
250
    added by the hooks runner. If the LU doesn't define any
251
    environment, an empty dict (and not None) should be returned.
252

253
    No nodes should be returned as an empty list (and not None).
254

255
    Note that if the HPATH for a LU class is None, this function will
256
    not be called.
257

258
    """
259
    raise NotImplementedError
260

    
261
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262
    """Notify the LU about the results of its hooks.
263

264
    This method is called every time a hooks phase is executed, and notifies
265
    the Logical Unit about the hooks' result. The LU can then use it to alter
266
    its result based on the hooks.  By default the method does nothing and the
267
    previous result is passed back unchanged but any LU can define it if it
268
    wants to use the local cluster hook-scripts somehow.
269

270
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
271
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272
    @param hook_results: the results of the multi-node hooks rpc call
273
    @param feedback_fn: function used send feedback back to the caller
274
    @param lu_result: the previous Exec result this LU had, or None
275
        in the PRE phase
276
    @return: the new Exec result, based on the previous result
277
        and hook results
278

279
    """
280
    return lu_result
281

    
282
  def _ExpandAndLockInstance(self):
283
    """Helper function to expand and lock an instance.
284

285
    Many LUs that work on an instance take its name in self.op.instance_name
286
    and need to expand it and then declare the expanded name for locking. This
287
    function does it, and then updates self.op.instance_name to the expanded
288
    name. It also initializes needed_locks as a dict, if this hasn't been done
289
    before.
290

291
    """
292
    if self.needed_locks is None:
293
      self.needed_locks = {}
294
    else:
295
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296
        "_ExpandAndLockInstance called with instance-level locks set"
297
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298
    if expanded_name is None:
299
      raise errors.OpPrereqError("Instance '%s' not known" %
300
                                  self.op.instance_name)
301
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302
    self.op.instance_name = expanded_name
303

    
304
  def _LockInstancesNodes(self, primary_only=False):
305
    """Helper function to declare instances' nodes for locking.
306

307
    This function should be called after locking one or more instances to lock
308
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309
    with all primary or secondary nodes for instances already locked and
310
    present in self.needed_locks[locking.LEVEL_INSTANCE].
311

312
    It should be called from DeclareLocks, and for safety only works if
313
    self.recalculate_locks[locking.LEVEL_NODE] is set.
314

315
    In the future it may grow parameters to just lock some instance's nodes, or
316
    to just lock primaries or secondary nodes, if needed.
317

318
    If should be called in DeclareLocks in a way similar to::
319

320
      if level == locking.LEVEL_NODE:
321
        self._LockInstancesNodes()
322

323
    @type primary_only: boolean
324
    @param primary_only: only lock primary nodes of locked instances
325

326
    """
327
    assert locking.LEVEL_NODE in self.recalculate_locks, \
328
      "_LockInstancesNodes helper function called with no nodes to recalculate"
329

    
330
    # TODO: check if we're really been called with the instance locks held
331

    
332
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333
    # future we might want to have different behaviors depending on the value
334
    # of self.recalculate_locks[locking.LEVEL_NODE]
335
    wanted_nodes = []
336
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337
      instance = self.context.cfg.GetInstanceInfo(instance_name)
338
      wanted_nodes.append(instance.primary_node)
339
      if not primary_only:
340
        wanted_nodes.extend(instance.secondary_nodes)
341

    
342
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
346

    
347
    del self.recalculate_locks[locking.LEVEL_NODE]
348

    
349

    
350
class NoHooksLU(LogicalUnit):
351
  """Simple LU which runs no hooks.
352

353
  This LU is intended as a parent for other LogicalUnits which will
354
  run no hooks, in order to reduce duplicate code.
355

356
  """
357
  HPATH = None
358
  HTYPE = None
359

    
360

    
361
class Tasklet:
362
  """Tasklet base class.
363

364
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
366
  tasklets know nothing about locks.
367

368
  Subclasses must follow these rules:
369
    - Implement CheckPrereq
370
    - Implement Exec
371

372
  """
373
  def __init__(self, lu):
374
    self.lu = lu
375

    
376
    # Shortcuts
377
    self.cfg = lu.cfg
378
    self.rpc = lu.rpc
379

    
380
  def CheckPrereq(self):
381
    """Check prerequisites for this tasklets.
382

383
    This method should check whether the prerequisites for the execution of
384
    this tasklet are fulfilled. It can do internode communication, but it
385
    should be idempotent - no cluster or system changes are allowed.
386

387
    The method should raise errors.OpPrereqError in case something is not
388
    fulfilled. Its return value is ignored.
389

390
    This method should also update all parameters to their canonical form if it
391
    hasn't been done before.
392

393
    """
394
    raise NotImplementedError
395

    
396
  def Exec(self, feedback_fn):
397
    """Execute the tasklet.
398

399
    This method should implement the actual work. It should raise
400
    errors.OpExecError for failures that are somewhat dealt with in code, or
401
    expected.
402

403
    """
404
    raise NotImplementedError
405

    
406

    
407
def _GetWantedNodes(lu, nodes):
408
  """Returns list of checked and expanded node names.
409

410
  @type lu: L{LogicalUnit}
411
  @param lu: the logical unit on whose behalf we execute
412
  @type nodes: list
413
  @param nodes: list of node names or None for all nodes
414
  @rtype: list
415
  @return: the list of nodes, sorted
416
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
417

418
  """
419
  if not isinstance(nodes, list):
420
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
421

    
422
  if not nodes:
423
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424
      " non-empty list of nodes whose name is to be expanded.")
425

    
426
  wanted = []
427
  for name in nodes:
428
    node = lu.cfg.ExpandNodeName(name)
429
    if node is None:
430
      raise errors.OpPrereqError("No such node name '%s'" % name)
431
    wanted.append(node)
432

    
433
  return utils.NiceSort(wanted)
434

    
435

    
436
def _GetWantedInstances(lu, instances):
437
  """Returns list of checked and expanded instance names.
438

439
  @type lu: L{LogicalUnit}
440
  @param lu: the logical unit on whose behalf we execute
441
  @type instances: list
442
  @param instances: list of instance names or None for all instances
443
  @rtype: list
444
  @return: the list of instances, sorted
445
  @raise errors.OpPrereqError: if the instances parameter is wrong type
446
  @raise errors.OpPrereqError: if any of the passed instances is not found
447

448
  """
449
  if not isinstance(instances, list):
450
    raise errors.OpPrereqError("Invalid argument type 'instances'")
451

    
452
  if instances:
453
    wanted = []
454

    
455
    for name in instances:
456
      instance = lu.cfg.ExpandInstanceName(name)
457
      if instance is None:
458
        raise errors.OpPrereqError("No such instance name '%s'" % name)
459
      wanted.append(instance)
460

    
461
  else:
462
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
463
  return wanted
464

    
465

    
466
def _CheckOutputFields(static, dynamic, selected):
467
  """Checks whether all selected fields are valid.
468

469
  @type static: L{utils.FieldSet}
470
  @param static: static fields set
471
  @type dynamic: L{utils.FieldSet}
472
  @param dynamic: dynamic fields set
473

474
  """
475
  f = utils.FieldSet()
476
  f.Extend(static)
477
  f.Extend(dynamic)
478

    
479
  delta = f.NonMatching(selected)
480
  if delta:
481
    raise errors.OpPrereqError("Unknown output fields selected: %s"
482
                               % ",".join(delta))
483

    
484

    
485
def _CheckBooleanOpField(op, name):
486
  """Validates boolean opcode parameters.
487

488
  This will ensure that an opcode parameter is either a boolean value,
489
  or None (but that it always exists).
490

491
  """
492
  val = getattr(op, name, None)
493
  if not (val is None or isinstance(val, bool)):
494
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
495
                               (name, str(val)))
496
  setattr(op, name, val)
497

    
498

    
499
def _CheckNodeOnline(lu, node):
500
  """Ensure that a given node is online.
501

502
  @param lu: the LU on behalf of which we make the check
503
  @param node: the node to check
504
  @raise errors.OpPrereqError: if the node is offline
505

506
  """
507
  if lu.cfg.GetNodeInfo(node).offline:
508
    raise errors.OpPrereqError("Can't use offline node %s" % node)
509

    
510

    
511
def _CheckNodeNotDrained(lu, node):
512
  """Ensure that a given node is not drained.
513

514
  @param lu: the LU on behalf of which we make the check
515
  @param node: the node to check
516
  @raise errors.OpPrereqError: if the node is drained
517

518
  """
519
  if lu.cfg.GetNodeInfo(node).drained:
520
    raise errors.OpPrereqError("Can't use drained node %s" % node)
521

    
522

    
523
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524
                          memory, vcpus, nics, disk_template, disks,
525
                          bep, hvp, hypervisor_name):
526
  """Builds instance related env variables for hooks
527

528
  This builds the hook environment from individual variables.
529

530
  @type name: string
531
  @param name: the name of the instance
532
  @type primary_node: string
533
  @param primary_node: the name of the instance's primary node
534
  @type secondary_nodes: list
535
  @param secondary_nodes: list of secondary nodes as strings
536
  @type os_type: string
537
  @param os_type: the name of the instance's OS
538
  @type status: boolean
539
  @param status: the should_run status of the instance
540
  @type memory: string
541
  @param memory: the memory size of the instance
542
  @type vcpus: string
543
  @param vcpus: the count of VCPUs the instance has
544
  @type nics: list
545
  @param nics: list of tuples (ip, mac, mode, link) representing
546
      the NICs the instance has
547
  @type disk_template: string
548
  @param disk_template: the disk template of the instance
549
  @type disks: list
550
  @param disks: the list of (size, mode) pairs
551
  @type bep: dict
552
  @param bep: the backend parameters for the instance
553
  @type hvp: dict
554
  @param hvp: the hypervisor parameters for the instance
555
  @type hypervisor_name: string
556
  @param hypervisor_name: the hypervisor for the instance
557
  @rtype: dict
558
  @return: the hook environment for this instance
559

560
  """
561
  if status:
562
    str_status = "up"
563
  else:
564
    str_status = "down"
565
  env = {
566
    "OP_TARGET": name,
567
    "INSTANCE_NAME": name,
568
    "INSTANCE_PRIMARY": primary_node,
569
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570
    "INSTANCE_OS_TYPE": os_type,
571
    "INSTANCE_STATUS": str_status,
572
    "INSTANCE_MEMORY": memory,
573
    "INSTANCE_VCPUS": vcpus,
574
    "INSTANCE_DISK_TEMPLATE": disk_template,
575
    "INSTANCE_HYPERVISOR": hypervisor_name,
576
  }
577

    
578
  if nics:
579
    nic_count = len(nics)
580
    for idx, (ip, mac, mode, link) in enumerate(nics):
581
      if ip is None:
582
        ip = ""
583
      env["INSTANCE_NIC%d_IP" % idx] = ip
584
      env["INSTANCE_NIC%d_MAC" % idx] = mac
585
      env["INSTANCE_NIC%d_MODE" % idx] = mode
586
      env["INSTANCE_NIC%d_LINK" % idx] = link
587
      if mode == constants.NIC_MODE_BRIDGED:
588
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
589
  else:
590
    nic_count = 0
591

    
592
  env["INSTANCE_NIC_COUNT"] = nic_count
593

    
594
  if disks:
595
    disk_count = len(disks)
596
    for idx, (size, mode) in enumerate(disks):
597
      env["INSTANCE_DISK%d_SIZE" % idx] = size
598
      env["INSTANCE_DISK%d_MODE" % idx] = mode
599
  else:
600
    disk_count = 0
601

    
602
  env["INSTANCE_DISK_COUNT"] = disk_count
603

    
604
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
605
    for key, value in source.items():
606
      env["INSTANCE_%s_%s" % (kind, key)] = value
607

    
608
  return env
609

    
610

    
611
def _NICListToTuple(lu, nics):
612
  """Build a list of nic information tuples.
613

614
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615
  value in LUQueryInstanceData.
616

617
  @type lu:  L{LogicalUnit}
618
  @param lu: the logical unit on whose behalf we execute
619
  @type nics: list of L{objects.NIC}
620
  @param nics: list of nics to convert to hooks tuples
621

622
  """
623
  hooks_nics = []
624
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
625
  for nic in nics:
626
    ip = nic.ip
627
    mac = nic.mac
628
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629
    mode = filled_params[constants.NIC_MODE]
630
    link = filled_params[constants.NIC_LINK]
631
    hooks_nics.append((ip, mac, mode, link))
632
  return hooks_nics
633

    
634

    
635
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636
  """Builds instance related env variables for hooks from an object.
637

638
  @type lu: L{LogicalUnit}
639
  @param lu: the logical unit on whose behalf we execute
640
  @type instance: L{objects.Instance}
641
  @param instance: the instance for which we should build the
642
      environment
643
  @type override: dict
644
  @param override: dictionary with key/values that will override
645
      our values
646
  @rtype: dict
647
  @return: the hook environment dictionary
648

649
  """
650
  cluster = lu.cfg.GetClusterInfo()
651
  bep = cluster.FillBE(instance)
652
  hvp = cluster.FillHV(instance)
653
  args = {
654
    'name': instance.name,
655
    'primary_node': instance.primary_node,
656
    'secondary_nodes': instance.secondary_nodes,
657
    'os_type': instance.os,
658
    'status': instance.admin_up,
659
    'memory': bep[constants.BE_MEMORY],
660
    'vcpus': bep[constants.BE_VCPUS],
661
    'nics': _NICListToTuple(lu, instance.nics),
662
    'disk_template': instance.disk_template,
663
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
664
    'bep': bep,
665
    'hvp': hvp,
666
    'hypervisor_name': instance.hypervisor,
667
  }
668
  if override:
669
    args.update(override)
670
  return _BuildInstanceHookEnv(**args)
671

    
672

    
673
def _AdjustCandidatePool(lu):
674
  """Adjust the candidate pool after node operations.
675

676
  """
677
  mod_list = lu.cfg.MaintainCandidatePool()
678
  if mod_list:
679
    lu.LogInfo("Promoted nodes to master candidate role: %s",
680
               ", ".join(node.name for node in mod_list))
681
    for name in mod_list:
682
      lu.context.ReaddNode(name)
683
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats()
684
  if mc_now > mc_max:
685
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
686
               (mc_now, mc_max))
687

    
688

    
689
def _CheckNicsBridgesExist(lu, target_nics, target_node,
690
                               profile=constants.PP_DEFAULT):
691
  """Check that the brigdes needed by a list of nics exist.
692

693
  """
694
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696
                for nic in target_nics]
697
  brlist = [params[constants.NIC_LINK] for params in paramslist
698
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
699
  if brlist:
700
    result = lu.rpc.call_bridges_exist(target_node, brlist)
701
    result.Raise("Error checking bridges on destination node '%s'" %
702
                 target_node, prereq=True)
703

    
704

    
705
def _CheckInstanceBridgesExist(lu, instance, node=None):
706
  """Check that the brigdes needed by an instance exist.
707

708
  """
709
  if node is None:
710
    node = instance.primary_node
711
  _CheckNicsBridgesExist(lu, instance.nics, node)
712

    
713

    
714
def _GetNodeInstancesInner(cfg, fn):
715
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
716

    
717

    
718
def _GetNodeInstances(cfg, node_name):
719
  """Returns a list of all primary and secondary instances on a node.
720

721
  """
722

    
723
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
724

    
725

    
726
def _GetNodePrimaryInstances(cfg, node_name):
727
  """Returns primary instances on a node.
728

729
  """
730
  return _GetNodeInstancesInner(cfg,
731
                                lambda inst: node_name == inst.primary_node)
732

    
733

    
734
def _GetNodeSecondaryInstances(cfg, node_name):
735
  """Returns secondary instances on a node.
736

737
  """
738
  return _GetNodeInstancesInner(cfg,
739
                                lambda inst: node_name in inst.secondary_nodes)
740

    
741

    
742
def _GetStorageTypeArgs(cfg, storage_type):
743
  """Returns the arguments for a storage type.
744

745
  """
746
  # Special case for file storage
747
  if storage_type == constants.ST_FILE:
748
    # storage.FileStorage wants a list of storage directories
749
    return [[cfg.GetFileStorageDir()]]
750

    
751
  return []
752

    
753

    
754
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
755
  faulty = []
756

    
757
  for dev in instance.disks:
758
    cfg.SetDiskID(dev, node_name)
759

    
760
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
761
  result.Raise("Failed to get disk status from node %s" % node_name,
762
               prereq=prereq)
763

    
764
  for idx, bdev_status in enumerate(result.payload):
765
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
766
      faulty.append(idx)
767

    
768
  return faulty
769

    
770

    
771
class LUPostInitCluster(LogicalUnit):
772
  """Logical unit for running hooks after cluster initialization.
773

774
  """
775
  HPATH = "cluster-init"
776
  HTYPE = constants.HTYPE_CLUSTER
777
  _OP_REQP = []
778

    
779
  def BuildHooksEnv(self):
780
    """Build hooks env.
781

782
    """
783
    env = {"OP_TARGET": self.cfg.GetClusterName()}
784
    mn = self.cfg.GetMasterNode()
785
    return env, [], [mn]
786

    
787
  def CheckPrereq(self):
788
    """No prerequisites to check.
789

790
    """
791
    return True
792

    
793
  def Exec(self, feedback_fn):
794
    """Nothing to do.
795

796
    """
797
    return True
798

    
799

    
800
class LUDestroyCluster(LogicalUnit):
801
  """Logical unit for destroying the cluster.
802

803
  """
804
  HPATH = "cluster-destroy"
805
  HTYPE = constants.HTYPE_CLUSTER
806
  _OP_REQP = []
807

    
808
  def BuildHooksEnv(self):
809
    """Build hooks env.
810

811
    """
812
    env = {"OP_TARGET": self.cfg.GetClusterName()}
813
    return env, [], []
814

    
815
  def CheckPrereq(self):
816
    """Check prerequisites.
817

818
    This checks whether the cluster is empty.
819

820
    Any errors are signaled by raising errors.OpPrereqError.
821

822
    """
823
    master = self.cfg.GetMasterNode()
824

    
825
    nodelist = self.cfg.GetNodeList()
826
    if len(nodelist) != 1 or nodelist[0] != master:
827
      raise errors.OpPrereqError("There are still %d node(s) in"
828
                                 " this cluster." % (len(nodelist) - 1))
829
    instancelist = self.cfg.GetInstanceList()
830
    if instancelist:
831
      raise errors.OpPrereqError("There are still %d instance(s) in"
832
                                 " this cluster." % len(instancelist))
833

    
834
  def Exec(self, feedback_fn):
835
    """Destroys the cluster.
836

837
    """
838
    master = self.cfg.GetMasterNode()
839

    
840
    # Run post hooks on master node before it's removed
841
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
842
    try:
843
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
844
    except:
845
      self.LogWarning("Errors occurred running hooks on %s" % master)
846

    
847
    result = self.rpc.call_node_stop_master(master, False)
848
    result.Raise("Could not disable the master role")
849
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
850
    utils.CreateBackup(priv_key)
851
    utils.CreateBackup(pub_key)
852
    return master
853

    
854

    
855
class LUVerifyCluster(LogicalUnit):
856
  """Verifies the cluster status.
857

858
  """
859
  HPATH = "cluster-verify"
860
  HTYPE = constants.HTYPE_CLUSTER
861
  _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
862
  REQ_BGL = False
863

    
864
  TCLUSTER = "cluster"
865
  TNODE = "node"
866
  TINSTANCE = "instance"
867

    
868
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
869
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
870
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
871
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
872
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
873
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
874
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
875
  ENODEDRBD = (TNODE, "ENODEDRBD")
876
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
877
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
878
  ENODEHV = (TNODE, "ENODEHV")
879
  ENODELVM = (TNODE, "ENODELVM")
880
  ENODEN1 = (TNODE, "ENODEN1")
881
  ENODENET = (TNODE, "ENODENET")
882
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
883
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
884
  ENODERPC = (TNODE, "ENODERPC")
885
  ENODESSH = (TNODE, "ENODESSH")
886
  ENODEVERSION = (TNODE, "ENODEVERSION")
887

    
888
  ETYPE_FIELD = "code"
889
  ETYPE_ERROR = "ERROR"
890
  ETYPE_WARNING = "WARNING"
891

    
892
  def ExpandNames(self):
893
    self.needed_locks = {
894
      locking.LEVEL_NODE: locking.ALL_SET,
895
      locking.LEVEL_INSTANCE: locking.ALL_SET,
896
    }
897
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
898

    
899
  def _Error(self, ecode, item, msg, *args, **kwargs):
900
    """Format an error message.
901

902
    Based on the opcode's error_codes parameter, either format a
903
    parseable error code, or a simpler error string.
904

905
    This must be called only from Exec and functions called from Exec.
906

907
    """
908
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
909
    itype, etxt = ecode
910
    # first complete the msg
911
    if args:
912
      msg = msg % args
913
    # then format the whole message
914
    if self.op.error_codes:
915
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
916
    else:
917
      if item:
918
        item = " " + item
919
      else:
920
        item = ""
921
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
922
    # and finally report it via the feedback_fn
923
    self._feedback_fn("  - %s" % msg)
924

    
925
  def _ErrorIf(self, cond, *args, **kwargs):
926
    """Log an error message if the passed condition is True.
927

928
    """
929
    cond = bool(cond) or self.op.debug_simulate_errors
930
    if cond:
931
      self._Error(*args, **kwargs)
932
    # do not mark the operation as failed for WARN cases only
933
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
934
      self.bad = self.bad or cond
935

    
936
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
937
                  node_result, master_files, drbd_map, vg_name):
938
    """Run multiple tests against a node.
939

940
    Test list:
941

942
      - compares ganeti version
943
      - checks vg existence and size > 20G
944
      - checks config file checksum
945
      - checks ssh to other nodes
946

947
    @type nodeinfo: L{objects.Node}
948
    @param nodeinfo: the node to check
949
    @param file_list: required list of files
950
    @param local_cksum: dictionary of local files and their checksums
951
    @param node_result: the results from the node
952
    @param master_files: list of files that only masters should have
953
    @param drbd_map: the useddrbd minors for this node, in
954
        form of minor: (instance, must_exist) which correspond to instances
955
        and their running status
956
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
957

958
    """
959
    node = nodeinfo.name
960
    _ErrorIf = self._ErrorIf
961

    
962
    # main result, node_result should be a non-empty dict
963
    test = not node_result or not isinstance(node_result, dict)
964
    _ErrorIf(test, self.ENODERPC, node,
965
                  "unable to verify node: no data returned")
966
    if test:
967
      return
968

    
969
    # compares ganeti version
970
    local_version = constants.PROTOCOL_VERSION
971
    remote_version = node_result.get('version', None)
972
    test = not (remote_version and
973
                isinstance(remote_version, (list, tuple)) and
974
                len(remote_version) == 2)
975
    _ErrorIf(test, self.ENODERPC, node,
976
             "connection to node returned invalid data")
977
    if test:
978
      return
979

    
980
    test = local_version != remote_version[0]
981
    _ErrorIf(test, self.ENODEVERSION, node,
982
             "incompatible protocol versions: master %s,"
983
             " node %s", local_version, remote_version[0])
984
    if test:
985
      return
986

    
987
    # node seems compatible, we can actually try to look into its results
988

    
989
    # full package version
990
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
991
                  self.ENODEVERSION, node,
992
                  "software version mismatch: master %s, node %s",
993
                  constants.RELEASE_VERSION, remote_version[1],
994
                  code=self.ETYPE_WARNING)
995

    
996
    # checks vg existence and size > 20G
997
    if vg_name is not None:
998
      vglist = node_result.get(constants.NV_VGLIST, None)
999
      test = not vglist
1000
      _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1001
      if not test:
1002
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1003
                                              constants.MIN_VG_SIZE)
1004
        _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1005

    
1006
    # checks config file checksum
1007

    
1008
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
1009
    test = not isinstance(remote_cksum, dict)
1010
    _ErrorIf(test, self.ENODEFILECHECK, node,
1011
             "node hasn't returned file checksum data")
1012
    if not test:
1013
      for file_name in file_list:
1014
        node_is_mc = nodeinfo.master_candidate
1015
        must_have = (file_name not in master_files) or node_is_mc
1016
        # missing
1017
        test1 = file_name not in remote_cksum
1018
        # invalid checksum
1019
        test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1020
        # existing and good
1021
        test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1022
        _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1023
                 "file '%s' missing", file_name)
1024
        _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1025
                 "file '%s' has wrong checksum", file_name)
1026
        # not candidate and this is not a must-have file
1027
        _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1028
                 "file '%s' should not exist on non master"
1029
                 " candidates (and the file is outdated)", file_name)
1030
        # all good, except non-master/non-must have combination
1031
        _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1032
                 "file '%s' should not exist"
1033
                 " on non master candidates", file_name)
1034

    
1035
    # checks ssh to any
1036

    
1037
    test = constants.NV_NODELIST not in node_result
1038
    _ErrorIf(test, self.ENODESSH, node,
1039
             "node hasn't returned node ssh connectivity data")
1040
    if not test:
1041
      if node_result[constants.NV_NODELIST]:
1042
        for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1043
          _ErrorIf(True, self.ENODESSH, node,
1044
                   "ssh communication with node '%s': %s", a_node, a_msg)
1045

    
1046
    test = constants.NV_NODENETTEST not in node_result
1047
    _ErrorIf(test, self.ENODENET, node,
1048
             "node hasn't returned node tcp connectivity data")
1049
    if not test:
1050
      if node_result[constants.NV_NODENETTEST]:
1051
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1052
        for anode in nlist:
1053
          _ErrorIf(True, self.ENODENET, node,
1054
                   "tcp communication with node '%s': %s",
1055
                   anode, node_result[constants.NV_NODENETTEST][anode])
1056

    
1057
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1058
    if isinstance(hyp_result, dict):
1059
      for hv_name, hv_result in hyp_result.iteritems():
1060
        test = hv_result is not None
1061
        _ErrorIf(test, self.ENODEHV, node,
1062
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1063

    
1064
    # check used drbd list
1065
    if vg_name is not None:
1066
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
1067
      test = not isinstance(used_minors, (tuple, list))
1068
      _ErrorIf(test, self.ENODEDRBD, node,
1069
               "cannot parse drbd status file: %s", str(used_minors))
1070
      if not test:
1071
        for minor, (iname, must_exist) in drbd_map.items():
1072
          test = minor not in used_minors and must_exist
1073
          _ErrorIf(test, self.ENODEDRBD, node,
1074
                   "drbd minor %d of instance %s is not active",
1075
                   minor, iname)
1076
        for minor in used_minors:
1077
          test = minor not in drbd_map
1078
          _ErrorIf(test, self.ENODEDRBD, node,
1079
                   "unallocated drbd minor %d is in use", minor)
1080

    
1081
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1082
                      node_instance, n_offline):
1083
    """Verify an instance.
1084

1085
    This function checks to see if the required block devices are
1086
    available on the instance's node.
1087

1088
    """
1089
    _ErrorIf = self._ErrorIf
1090
    node_current = instanceconfig.primary_node
1091

    
1092
    node_vol_should = {}
1093
    instanceconfig.MapLVsByNode(node_vol_should)
1094

    
1095
    for node in node_vol_should:
1096
      if node in n_offline:
1097
        # ignore missing volumes on offline nodes
1098
        continue
1099
      for volume in node_vol_should[node]:
1100
        test = node not in node_vol_is or volume not in node_vol_is[node]
1101
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1102
                 "volume %s missing on node %s", volume, node)
1103

    
1104
    if instanceconfig.admin_up:
1105
      test = ((node_current not in node_instance or
1106
               not instance in node_instance[node_current]) and
1107
              node_current not in n_offline)
1108
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1109
               "instance not running on its primary node %s",
1110
               node_current)
1111

    
1112
    for node in node_instance:
1113
      if (not node == node_current):
1114
        test = instance in node_instance[node]
1115
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1116
                 "instance should not run on node %s", node)
1117

    
1118
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1119
    """Verify if there are any unknown volumes in the cluster.
1120

1121
    The .os, .swap and backup volumes are ignored. All other volumes are
1122
    reported as unknown.
1123

1124
    """
1125
    for node in node_vol_is:
1126
      for volume in node_vol_is[node]:
1127
        test = (node not in node_vol_should or
1128
                volume not in node_vol_should[node])
1129
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1130
                      "volume %s is unknown", volume)
1131

    
1132
  def _VerifyOrphanInstances(self, instancelist, node_instance):
1133
    """Verify the list of running instances.
1134

1135
    This checks what instances are running but unknown to the cluster.
1136

1137
    """
1138
    for node in node_instance:
1139
      for o_inst in node_instance[node]:
1140
        test = o_inst not in instancelist
1141
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1142
                      "instance %s on node %s should not exist", o_inst, node)
1143

    
1144
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1145
    """Verify N+1 Memory Resilience.
1146

1147
    Check that if one single node dies we can still start all the instances it
1148
    was primary for.
1149

1150
    """
1151
    for node, nodeinfo in node_info.iteritems():
1152
      # This code checks that every node which is now listed as secondary has
1153
      # enough memory to host all instances it is supposed to should a single
1154
      # other node in the cluster fail.
1155
      # FIXME: not ready for failover to an arbitrary node
1156
      # FIXME: does not support file-backed instances
1157
      # WARNING: we currently take into account down instances as well as up
1158
      # ones, considering that even if they're down someone might want to start
1159
      # them even in the event of a node failure.
1160
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1161
        needed_mem = 0
1162
        for instance in instances:
1163
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1164
          if bep[constants.BE_AUTO_BALANCE]:
1165
            needed_mem += bep[constants.BE_MEMORY]
1166
        test = nodeinfo['mfree'] < needed_mem
1167
        self._ErrorIf(test, self.ENODEN1, node,
1168
                      "not enough memory on to accommodate"
1169
                      " failovers should peer node %s fail", prinode)
1170

    
1171
  def CheckPrereq(self):
1172
    """Check prerequisites.
1173

1174
    Transform the list of checks we're going to skip into a set and check that
1175
    all its members are valid.
1176

1177
    """
1178
    self.skip_set = frozenset(self.op.skip_checks)
1179
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1180
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
1181

    
1182
  def BuildHooksEnv(self):
1183
    """Build hooks env.
1184

1185
    Cluster-Verify hooks just ran in the post phase and their failure makes
1186
    the output be logged in the verify output and the verification to fail.
1187

1188
    """
1189
    all_nodes = self.cfg.GetNodeList()
1190
    env = {
1191
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1192
      }
1193
    for node in self.cfg.GetAllNodesInfo().values():
1194
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1195

    
1196
    return env, [], all_nodes
1197

    
1198
  def Exec(self, feedback_fn):
1199
    """Verify integrity of cluster, performing various test on nodes.
1200

1201
    """
1202
    self.bad = False
1203
    _ErrorIf = self._ErrorIf
1204
    verbose = self.op.verbose
1205
    self._feedback_fn = feedback_fn
1206
    feedback_fn("* Verifying global settings")
1207
    for msg in self.cfg.VerifyConfig():
1208
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1209

    
1210
    vg_name = self.cfg.GetVGName()
1211
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1212
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1213
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1214
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1215
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1216
                        for iname in instancelist)
1217
    i_non_redundant = [] # Non redundant instances
1218
    i_non_a_balanced = [] # Non auto-balanced instances
1219
    n_offline = [] # List of offline nodes
1220
    n_drained = [] # List of nodes being drained
1221
    node_volume = {}
1222
    node_instance = {}
1223
    node_info = {}
1224
    instance_cfg = {}
1225

    
1226
    # FIXME: verify OS list
1227
    # do local checksums
1228
    master_files = [constants.CLUSTER_CONF_FILE]
1229

    
1230
    file_names = ssconf.SimpleStore().GetFileList()
1231
    file_names.append(constants.SSL_CERT_FILE)
1232
    file_names.append(constants.RAPI_CERT_FILE)
1233
    file_names.extend(master_files)
1234

    
1235
    local_checksums = utils.FingerprintFiles(file_names)
1236

    
1237
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1238
    node_verify_param = {
1239
      constants.NV_FILELIST: file_names,
1240
      constants.NV_NODELIST: [node.name for node in nodeinfo
1241
                              if not node.offline],
1242
      constants.NV_HYPERVISOR: hypervisors,
1243
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1244
                                  node.secondary_ip) for node in nodeinfo
1245
                                 if not node.offline],
1246
      constants.NV_INSTANCELIST: hypervisors,
1247
      constants.NV_VERSION: None,
1248
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1249
      }
1250
    if vg_name is not None:
1251
      node_verify_param[constants.NV_VGLIST] = None
1252
      node_verify_param[constants.NV_LVLIST] = vg_name
1253
      node_verify_param[constants.NV_DRBDLIST] = None
1254
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1255
                                           self.cfg.GetClusterName())
1256

    
1257
    cluster = self.cfg.GetClusterInfo()
1258
    master_node = self.cfg.GetMasterNode()
1259
    all_drbd_map = self.cfg.ComputeDRBDMap()
1260

    
1261
    feedback_fn("* Verifying node status")
1262
    for node_i in nodeinfo:
1263
      node = node_i.name
1264

    
1265
      if node_i.offline:
1266
        if verbose:
1267
          feedback_fn("* Skipping offline node %s" % (node,))
1268
        n_offline.append(node)
1269
        continue
1270

    
1271
      if node == master_node:
1272
        ntype = "master"
1273
      elif node_i.master_candidate:
1274
        ntype = "master candidate"
1275
      elif node_i.drained:
1276
        ntype = "drained"
1277
        n_drained.append(node)
1278
      else:
1279
        ntype = "regular"
1280
      if verbose:
1281
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1282

    
1283
      msg = all_nvinfo[node].fail_msg
1284
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1285
      if msg:
1286
        continue
1287

    
1288
      nresult = all_nvinfo[node].payload
1289
      node_drbd = {}
1290
      for minor, instance in all_drbd_map[node].items():
1291
        test = instance not in instanceinfo
1292
        _ErrorIf(test, self.ECLUSTERCFG, None,
1293
                 "ghost instance '%s' in temporary DRBD map", instance)
1294
          # ghost instance should not be running, but otherwise we
1295
          # don't give double warnings (both ghost instance and
1296
          # unallocated minor in use)
1297
        if test:
1298
          node_drbd[minor] = (instance, False)
1299
        else:
1300
          instance = instanceinfo[instance]
1301
          node_drbd[minor] = (instance.name, instance.admin_up)
1302
      self._VerifyNode(node_i, file_names, local_checksums,
1303
                       nresult, master_files, node_drbd, vg_name)
1304

    
1305
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1306
      if vg_name is None:
1307
        node_volume[node] = {}
1308
      elif isinstance(lvdata, basestring):
1309
        _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1310
                 utils.SafeEncode(lvdata))
1311
        node_volume[node] = {}
1312
      elif not isinstance(lvdata, dict):
1313
        _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1314
        continue
1315
      else:
1316
        node_volume[node] = lvdata
1317

    
1318
      # node_instance
1319
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1320
      test = not isinstance(idata, list)
1321
      _ErrorIf(test, self.ENODEHV, node,
1322
               "rpc call to node failed (instancelist)")
1323
      if test:
1324
        continue
1325

    
1326
      node_instance[node] = idata
1327

    
1328
      # node_info
1329
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1330
      test = not isinstance(nodeinfo, dict)
1331
      _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1332
      if test:
1333
        continue
1334

    
1335
      try:
1336
        node_info[node] = {
1337
          "mfree": int(nodeinfo['memory_free']),
1338
          "pinst": [],
1339
          "sinst": [],
1340
          # dictionary holding all instances this node is secondary for,
1341
          # grouped by their primary node. Each key is a cluster node, and each
1342
          # value is a list of instances which have the key as primary and the
1343
          # current node as secondary.  this is handy to calculate N+1 memory
1344
          # availability if you can only failover from a primary to its
1345
          # secondary.
1346
          "sinst-by-pnode": {},
1347
        }
1348
        # FIXME: devise a free space model for file based instances as well
1349
        if vg_name is not None:
1350
          test = (constants.NV_VGLIST not in nresult or
1351
                  vg_name not in nresult[constants.NV_VGLIST])
1352
          _ErrorIf(test, self.ENODELVM, node,
1353
                   "node didn't return data for the volume group '%s'"
1354
                   " - it is either missing or broken", vg_name)
1355
          if test:
1356
            continue
1357
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1358
      except (ValueError, KeyError):
1359
        _ErrorIf(True, self.ENODERPC, node,
1360
                 "node returned invalid nodeinfo, check lvm/hypervisor")
1361
        continue
1362

    
1363
    node_vol_should = {}
1364

    
1365
    feedback_fn("* Verifying instance status")
1366
    for instance in instancelist:
1367
      if verbose:
1368
        feedback_fn("* Verifying instance %s" % instance)
1369
      inst_config = instanceinfo[instance]
1370
      self._VerifyInstance(instance, inst_config, node_volume,
1371
                           node_instance, n_offline)
1372
      inst_nodes_offline = []
1373

    
1374
      inst_config.MapLVsByNode(node_vol_should)
1375

    
1376
      instance_cfg[instance] = inst_config
1377

    
1378
      pnode = inst_config.primary_node
1379
      _ErrorIf(pnode not in node_info and pnode not in n_offline,
1380
               self.ENODERPC, pnode, "instance %s, connection to"
1381
               " primary node failed", instance)
1382
      if pnode in node_info:
1383
        node_info[pnode]['pinst'].append(instance)
1384

    
1385
      if pnode in n_offline:
1386
        inst_nodes_offline.append(pnode)
1387

    
1388
      # If the instance is non-redundant we cannot survive losing its primary
1389
      # node, so we are not N+1 compliant. On the other hand we have no disk
1390
      # templates with more than one secondary so that situation is not well
1391
      # supported either.
1392
      # FIXME: does not support file-backed instances
1393
      if len(inst_config.secondary_nodes) == 0:
1394
        i_non_redundant.append(instance)
1395
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
1396
               self.EINSTANCELAYOUT, instance,
1397
               "instance has multiple secondary nodes", code="WARNING")
1398

    
1399
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1400
        i_non_a_balanced.append(instance)
1401

    
1402
      for snode in inst_config.secondary_nodes:
1403
        _ErrorIf(snode not in node_info and snode not in n_offline,
1404
                 self.ENODERPC, snode,
1405
                 "instance %s, connection to secondary node"
1406
                 "failed", instance)
1407

    
1408
        if snode in node_info:
1409
          node_info[snode]['sinst'].append(instance)
1410
          if pnode not in node_info[snode]['sinst-by-pnode']:
1411
            node_info[snode]['sinst-by-pnode'][pnode] = []
1412
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1413

    
1414
        if snode in n_offline:
1415
          inst_nodes_offline.append(snode)
1416

    
1417
      # warn that the instance lives on offline nodes
1418
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1419
               "instance lives on offline node(s) %s",
1420
               ", ".join(inst_nodes_offline))
1421

    
1422
    feedback_fn("* Verifying orphan volumes")
1423
    self._VerifyOrphanVolumes(node_vol_should, node_volume)
1424

    
1425
    feedback_fn("* Verifying remaining instances")
1426
    self._VerifyOrphanInstances(instancelist, node_instance)
1427

    
1428
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1429
      feedback_fn("* Verifying N+1 Memory redundancy")
1430
      self._VerifyNPlusOneMemory(node_info, instance_cfg)
1431

    
1432
    feedback_fn("* Other Notes")
1433
    if i_non_redundant:
1434
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1435
                  % len(i_non_redundant))
1436

    
1437
    if i_non_a_balanced:
1438
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1439
                  % len(i_non_a_balanced))
1440

    
1441
    if n_offline:
1442
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1443

    
1444
    if n_drained:
1445
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1446

    
1447
    return not self.bad
1448

    
1449
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1450
    """Analyze the post-hooks' result
1451

1452
    This method analyses the hook result, handles it, and sends some
1453
    nicely-formatted feedback back to the user.
1454

1455
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1456
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1457
    @param hooks_results: the results of the multi-node hooks rpc call
1458
    @param feedback_fn: function used send feedback back to the caller
1459
    @param lu_result: previous Exec result
1460
    @return: the new Exec result, based on the previous result
1461
        and hook results
1462

1463
    """
1464
    # We only really run POST phase hooks, and are only interested in
1465
    # their results
1466
    if phase == constants.HOOKS_PHASE_POST:
1467
      # Used to change hooks' output to proper indentation
1468
      indent_re = re.compile('^', re.M)
1469
      feedback_fn("* Hooks Results")
1470
      assert hooks_results, "invalid result from hooks"
1471

    
1472
      for node_name in hooks_results:
1473
        show_node_header = True
1474
        res = hooks_results[node_name]
1475
        msg = res.fail_msg
1476
        test = msg and not res.offline
1477
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
1478
                      "Communication failure in hooks execution: %s", msg)
1479
        if test:
1480
          # override manually lu_result here as _ErrorIf only
1481
          # overrides self.bad
1482
          lu_result = 1
1483
          continue
1484
        for script, hkr, output in res.payload:
1485
          test = hkr == constants.HKR_FAIL
1486
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
1487
                        "Script %s failed, output:", script)
1488
          if test:
1489
            output = indent_re.sub('      ', output)
1490
            feedback_fn("%s" % output)
1491
            lu_result = 1
1492

    
1493
      return lu_result
1494

    
1495

    
1496
class LUVerifyDisks(NoHooksLU):
1497
  """Verifies the cluster disks status.
1498

1499
  """
1500
  _OP_REQP = []
1501
  REQ_BGL = False
1502

    
1503
  def ExpandNames(self):
1504
    self.needed_locks = {
1505
      locking.LEVEL_NODE: locking.ALL_SET,
1506
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1507
    }
1508
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1509

    
1510
  def CheckPrereq(self):
1511
    """Check prerequisites.
1512

1513
    This has no prerequisites.
1514

1515
    """
1516
    pass
1517

    
1518
  def Exec(self, feedback_fn):
1519
    """Verify integrity of cluster disks.
1520

1521
    @rtype: tuple of three items
1522
    @return: a tuple of (dict of node-to-node_error, list of instances
1523
        which need activate-disks, dict of instance: (node, volume) for
1524
        missing volumes
1525

1526
    """
1527
    result = res_nodes, res_instances, res_missing = {}, [], {}
1528

    
1529
    vg_name = self.cfg.GetVGName()
1530
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1531
    instances = [self.cfg.GetInstanceInfo(name)
1532
                 for name in self.cfg.GetInstanceList()]
1533

    
1534
    nv_dict = {}
1535
    for inst in instances:
1536
      inst_lvs = {}
1537
      if (not inst.admin_up or
1538
          inst.disk_template not in constants.DTS_NET_MIRROR):
1539
        continue
1540
      inst.MapLVsByNode(inst_lvs)
1541
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1542
      for node, vol_list in inst_lvs.iteritems():
1543
        for vol in vol_list:
1544
          nv_dict[(node, vol)] = inst
1545

    
1546
    if not nv_dict:
1547
      return result
1548

    
1549
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1550

    
1551
    for node in nodes:
1552
      # node_volume
1553
      node_res = node_lvs[node]
1554
      if node_res.offline:
1555
        continue
1556
      msg = node_res.fail_msg
1557
      if msg:
1558
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1559
        res_nodes[node] = msg
1560
        continue
1561

    
1562
      lvs = node_res.payload
1563
      for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1564
        inst = nv_dict.pop((node, lv_name), None)
1565
        if (not lv_online and inst is not None
1566
            and inst.name not in res_instances):
1567
          res_instances.append(inst.name)
1568

    
1569
    # any leftover items in nv_dict are missing LVs, let's arrange the
1570
    # data better
1571
    for key, inst in nv_dict.iteritems():
1572
      if inst.name not in res_missing:
1573
        res_missing[inst.name] = []
1574
      res_missing[inst.name].append(key)
1575

    
1576
    return result
1577

    
1578

    
1579
class LURepairDiskSizes(NoHooksLU):
1580
  """Verifies the cluster disks sizes.
1581

1582
  """
1583
  _OP_REQP = ["instances"]
1584
  REQ_BGL = False
1585

    
1586
  def ExpandNames(self):
1587
    if not isinstance(self.op.instances, list):
1588
      raise errors.OpPrereqError("Invalid argument type 'instances'")
1589

    
1590
    if self.op.instances:
1591
      self.wanted_names = []
1592
      for name in self.op.instances:
1593
        full_name = self.cfg.ExpandInstanceName(name)
1594
        if full_name is None:
1595
          raise errors.OpPrereqError("Instance '%s' not known" % name)
1596
        self.wanted_names.append(full_name)
1597
      self.needed_locks = {
1598
        locking.LEVEL_NODE: [],
1599
        locking.LEVEL_INSTANCE: self.wanted_names,
1600
        }
1601
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1602
    else:
1603
      self.wanted_names = None
1604
      self.needed_locks = {
1605
        locking.LEVEL_NODE: locking.ALL_SET,
1606
        locking.LEVEL_INSTANCE: locking.ALL_SET,
1607
        }
1608
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1609

    
1610
  def DeclareLocks(self, level):
1611
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
1612
      self._LockInstancesNodes(primary_only=True)
1613

    
1614
  def CheckPrereq(self):
1615
    """Check prerequisites.
1616

1617
    This only checks the optional instance list against the existing names.
1618

1619
    """
1620
    if self.wanted_names is None:
1621
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1622

    
1623
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1624
                             in self.wanted_names]
1625

    
1626
  def _EnsureChildSizes(self, disk):
1627
    """Ensure children of the disk have the needed disk size.
1628

1629
    This is valid mainly for DRBD8 and fixes an issue where the
1630
    children have smaller disk size.
1631

1632
    @param disk: an L{ganeti.objects.Disk} object
1633

1634
    """
1635
    if disk.dev_type == constants.LD_DRBD8:
1636
      assert disk.children, "Empty children for DRBD8?"
1637
      fchild = disk.children[0]
1638
      mismatch = fchild.size < disk.size
1639
      if mismatch:
1640
        self.LogInfo("Child disk has size %d, parent %d, fixing",
1641
                     fchild.size, disk.size)
1642
        fchild.size = disk.size
1643

    
1644
      # and we recurse on this child only, not on the metadev
1645
      return self._EnsureChildSizes(fchild) or mismatch
1646
    else:
1647
      return False
1648

    
1649
  def Exec(self, feedback_fn):
1650
    """Verify the size of cluster disks.
1651

1652
    """
1653
    # TODO: check child disks too
1654
    # TODO: check differences in size between primary/secondary nodes
1655
    per_node_disks = {}
1656
    for instance in self.wanted_instances:
1657
      pnode = instance.primary_node
1658
      if pnode not in per_node_disks:
1659
        per_node_disks[pnode] = []
1660
      for idx, disk in enumerate(instance.disks):
1661
        per_node_disks[pnode].append((instance, idx, disk))
1662

    
1663
    changed = []
1664
    for node, dskl in per_node_disks.items():
1665
      newl = [v[2].Copy() for v in dskl]
1666
      for dsk in newl:
1667
        self.cfg.SetDiskID(dsk, node)
1668
      result = self.rpc.call_blockdev_getsizes(node, newl)
1669
      if result.fail_msg:
1670
        self.LogWarning("Failure in blockdev_getsizes call to node"
1671
                        " %s, ignoring", node)
1672
        continue
1673
      if len(result.data) != len(dskl):
1674
        self.LogWarning("Invalid result from node %s, ignoring node results",
1675
                        node)
1676
        continue
1677
      for ((instance, idx, disk), size) in zip(dskl, result.data):
1678
        if size is None:
1679
          self.LogWarning("Disk %d of instance %s did not return size"
1680
                          " information, ignoring", idx, instance.name)
1681
          continue
1682
        if not isinstance(size, (int, long)):
1683
          self.LogWarning("Disk %d of instance %s did not return valid"
1684
                          " size information, ignoring", idx, instance.name)
1685
          continue
1686
        size = size >> 20
1687
        if size != disk.size:
1688
          self.LogInfo("Disk %d of instance %s has mismatched size,"
1689
                       " correcting: recorded %d, actual %d", idx,
1690
                       instance.name, disk.size, size)
1691
          disk.size = size
1692
          self.cfg.Update(instance)
1693
          changed.append((instance.name, idx, size))
1694
        if self._EnsureChildSizes(disk):
1695
          self.cfg.Update(instance)
1696
          changed.append((instance.name, idx, disk.size))
1697
    return changed
1698

    
1699

    
1700
class LURenameCluster(LogicalUnit):
1701
  """Rename the cluster.
1702

1703
  """
1704
  HPATH = "cluster-rename"
1705
  HTYPE = constants.HTYPE_CLUSTER
1706
  _OP_REQP = ["name"]
1707

    
1708
  def BuildHooksEnv(self):
1709
    """Build hooks env.
1710

1711
    """
1712
    env = {
1713
      "OP_TARGET": self.cfg.GetClusterName(),
1714
      "NEW_NAME": self.op.name,
1715
      }
1716
    mn = self.cfg.GetMasterNode()
1717
    return env, [mn], [mn]
1718

    
1719
  def CheckPrereq(self):
1720
    """Verify that the passed name is a valid one.
1721

1722
    """
1723
    hostname = utils.HostInfo(self.op.name)
1724

    
1725
    new_name = hostname.name
1726
    self.ip = new_ip = hostname.ip
1727
    old_name = self.cfg.GetClusterName()
1728
    old_ip = self.cfg.GetMasterIP()
1729
    if new_name == old_name and new_ip == old_ip:
1730
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1731
                                 " cluster has changed")
1732
    if new_ip != old_ip:
1733
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1734
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1735
                                   " reachable on the network. Aborting." %
1736
                                   new_ip)
1737

    
1738
    self.op.name = new_name
1739

    
1740
  def Exec(self, feedback_fn):
1741
    """Rename the cluster.
1742

1743
    """
1744
    clustername = self.op.name
1745
    ip = self.ip
1746

    
1747
    # shutdown the master IP
1748
    master = self.cfg.GetMasterNode()
1749
    result = self.rpc.call_node_stop_master(master, False)
1750
    result.Raise("Could not disable the master role")
1751

    
1752
    try:
1753
      cluster = self.cfg.GetClusterInfo()
1754
      cluster.cluster_name = clustername
1755
      cluster.master_ip = ip
1756
      self.cfg.Update(cluster)
1757

    
1758
      # update the known hosts file
1759
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1760
      node_list = self.cfg.GetNodeList()
1761
      try:
1762
        node_list.remove(master)
1763
      except ValueError:
1764
        pass
1765
      result = self.rpc.call_upload_file(node_list,
1766
                                         constants.SSH_KNOWN_HOSTS_FILE)
1767
      for to_node, to_result in result.iteritems():
1768
        msg = to_result.fail_msg
1769
        if msg:
1770
          msg = ("Copy of file %s to node %s failed: %s" %
1771
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1772
          self.proc.LogWarning(msg)
1773

    
1774
    finally:
1775
      result = self.rpc.call_node_start_master(master, False, False)
1776
      msg = result.fail_msg
1777
      if msg:
1778
        self.LogWarning("Could not re-enable the master role on"
1779
                        " the master, please restart manually: %s", msg)
1780

    
1781

    
1782
def _RecursiveCheckIfLVMBased(disk):
1783
  """Check if the given disk or its children are lvm-based.
1784

1785
  @type disk: L{objects.Disk}
1786
  @param disk: the disk to check
1787
  @rtype: boolean
1788
  @return: boolean indicating whether a LD_LV dev_type was found or not
1789

1790
  """
1791
  if disk.children:
1792
    for chdisk in disk.children:
1793
      if _RecursiveCheckIfLVMBased(chdisk):
1794
        return True
1795
  return disk.dev_type == constants.LD_LV
1796

    
1797

    
1798
class LUSetClusterParams(LogicalUnit):
1799
  """Change the parameters of the cluster.
1800

1801
  """
1802
  HPATH = "cluster-modify"
1803
  HTYPE = constants.HTYPE_CLUSTER
1804
  _OP_REQP = []
1805
  REQ_BGL = False
1806

    
1807
  def CheckArguments(self):
1808
    """Check parameters
1809

1810
    """
1811
    if not hasattr(self.op, "candidate_pool_size"):
1812
      self.op.candidate_pool_size = None
1813
    if self.op.candidate_pool_size is not None:
1814
      try:
1815
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1816
      except (ValueError, TypeError), err:
1817
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1818
                                   str(err))
1819
      if self.op.candidate_pool_size < 1:
1820
        raise errors.OpPrereqError("At least one master candidate needed")
1821

    
1822
  def ExpandNames(self):
1823
    # FIXME: in the future maybe other cluster params won't require checking on
1824
    # all nodes to be modified.
1825
    self.needed_locks = {
1826
      locking.LEVEL_NODE: locking.ALL_SET,
1827
    }
1828
    self.share_locks[locking.LEVEL_NODE] = 1
1829

    
1830
  def BuildHooksEnv(self):
1831
    """Build hooks env.
1832

1833
    """
1834
    env = {
1835
      "OP_TARGET": self.cfg.GetClusterName(),
1836
      "NEW_VG_NAME": self.op.vg_name,
1837
      }
1838
    mn = self.cfg.GetMasterNode()
1839
    return env, [mn], [mn]
1840

    
1841
  def CheckPrereq(self):
1842
    """Check prerequisites.
1843

1844
    This checks whether the given params don't conflict and
1845
    if the given volume group is valid.
1846

1847
    """
1848
    if self.op.vg_name is not None and not self.op.vg_name:
1849
      instances = self.cfg.GetAllInstancesInfo().values()
1850
      for inst in instances:
1851
        for disk in inst.disks:
1852
          if _RecursiveCheckIfLVMBased(disk):
1853
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1854
                                       " lvm-based instances exist")
1855

    
1856
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1857

    
1858
    # if vg_name not None, checks given volume group on all nodes
1859
    if self.op.vg_name:
1860
      vglist = self.rpc.call_vg_list(node_list)
1861
      for node in node_list:
1862
        msg = vglist[node].fail_msg
1863
        if msg:
1864
          # ignoring down node
1865
          self.LogWarning("Error while gathering data on node %s"
1866
                          " (ignoring node): %s", node, msg)
1867
          continue
1868
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1869
                                              self.op.vg_name,
1870
                                              constants.MIN_VG_SIZE)
1871
        if vgstatus:
1872
          raise errors.OpPrereqError("Error on node '%s': %s" %
1873
                                     (node, vgstatus))
1874

    
1875
    self.cluster = cluster = self.cfg.GetClusterInfo()
1876
    # validate params changes
1877
    if self.op.beparams:
1878
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1879
      self.new_beparams = objects.FillDict(
1880
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1881

    
1882
    if self.op.nicparams:
1883
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1884
      self.new_nicparams = objects.FillDict(
1885
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1886
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1887

    
1888
    # hypervisor list/parameters
1889
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1890
    if self.op.hvparams:
1891
      if not isinstance(self.op.hvparams, dict):
1892
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1893
      for hv_name, hv_dict in self.op.hvparams.items():
1894
        if hv_name not in self.new_hvparams:
1895
          self.new_hvparams[hv_name] = hv_dict
1896
        else:
1897
          self.new_hvparams[hv_name].update(hv_dict)
1898

    
1899
    if self.op.enabled_hypervisors is not None:
1900
      self.hv_list = self.op.enabled_hypervisors
1901
      if not self.hv_list:
1902
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1903
                                   " least one member")
1904
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1905
      if invalid_hvs:
1906
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1907
                                   " entries: %s" %
1908
                                   utils.CommaJoin(invalid_hvs))
1909
    else:
1910
      self.hv_list = cluster.enabled_hypervisors
1911

    
1912
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1913
      # either the enabled list has changed, or the parameters have, validate
1914
      for hv_name, hv_params in self.new_hvparams.items():
1915
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1916
            (self.op.enabled_hypervisors and
1917
             hv_name in self.op.enabled_hypervisors)):
1918
          # either this is a new hypervisor, or its parameters have changed
1919
          hv_class = hypervisor.GetHypervisor(hv_name)
1920
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1921
          hv_class.CheckParameterSyntax(hv_params)
1922
          _CheckHVParams(self, node_list, hv_name, hv_params)
1923

    
1924
  def Exec(self, feedback_fn):
1925
    """Change the parameters of the cluster.
1926

1927
    """
1928
    if self.op.vg_name is not None:
1929
      new_volume = self.op.vg_name
1930
      if not new_volume:
1931
        new_volume = None
1932
      if new_volume != self.cfg.GetVGName():
1933
        self.cfg.SetVGName(new_volume)
1934
      else:
1935
        feedback_fn("Cluster LVM configuration already in desired"
1936
                    " state, not changing")
1937
    if self.op.hvparams:
1938
      self.cluster.hvparams = self.new_hvparams
1939
    if self.op.enabled_hypervisors is not None:
1940
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1941
    if self.op.beparams:
1942
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1943
    if self.op.nicparams:
1944
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1945

    
1946
    if self.op.candidate_pool_size is not None:
1947
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1948
      # we need to update the pool size here, otherwise the save will fail
1949
      _AdjustCandidatePool(self)
1950

    
1951
    self.cfg.Update(self.cluster)
1952

    
1953

    
1954
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1955
  """Distribute additional files which are part of the cluster configuration.
1956

1957
  ConfigWriter takes care of distributing the config and ssconf files, but
1958
  there are more files which should be distributed to all nodes. This function
1959
  makes sure those are copied.
1960

1961
  @param lu: calling logical unit
1962
  @param additional_nodes: list of nodes not in the config to distribute to
1963

1964
  """
1965
  # 1. Gather target nodes
1966
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1967
  dist_nodes = lu.cfg.GetNodeList()
1968
  if additional_nodes is not None:
1969
    dist_nodes.extend(additional_nodes)
1970
  if myself.name in dist_nodes:
1971
    dist_nodes.remove(myself.name)
1972
  # 2. Gather files to distribute
1973
  dist_files = set([constants.ETC_HOSTS,
1974
                    constants.SSH_KNOWN_HOSTS_FILE,
1975
                    constants.RAPI_CERT_FILE,
1976
                    constants.RAPI_USERS_FILE,
1977
                    constants.HMAC_CLUSTER_KEY,
1978
                   ])
1979

    
1980
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1981
  for hv_name in enabled_hypervisors:
1982
    hv_class = hypervisor.GetHypervisor(hv_name)
1983
    dist_files.update(hv_class.GetAncillaryFiles())
1984

    
1985
  # 3. Perform the files upload
1986
  for fname in dist_files:
1987
    if os.path.exists(fname):
1988
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1989
      for to_node, to_result in result.items():
1990
        msg = to_result.fail_msg
1991
        if msg:
1992
          msg = ("Copy of file %s to node %s failed: %s" %
1993
                 (fname, to_node, msg))
1994
          lu.proc.LogWarning(msg)
1995

    
1996

    
1997
class LURedistributeConfig(NoHooksLU):
1998
  """Force the redistribution of cluster configuration.
1999

2000
  This is a very simple LU.
2001

2002
  """
2003
  _OP_REQP = []
2004
  REQ_BGL = False
2005

    
2006
  def ExpandNames(self):
2007
    self.needed_locks = {
2008
      locking.LEVEL_NODE: locking.ALL_SET,
2009
    }
2010
    self.share_locks[locking.LEVEL_NODE] = 1
2011

    
2012
  def CheckPrereq(self):
2013
    """Check prerequisites.
2014

2015
    """
2016

    
2017
  def Exec(self, feedback_fn):
2018
    """Redistribute the configuration.
2019

2020
    """
2021
    self.cfg.Update(self.cfg.GetClusterInfo())
2022
    _RedistributeAncillaryFiles(self)
2023

    
2024

    
2025
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
2026
  """Sleep and poll for an instance's disk to sync.
2027

2028
  """
2029
  if not instance.disks:
2030
    return True
2031

    
2032
  if not oneshot:
2033
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2034

    
2035
  node = instance.primary_node
2036

    
2037
  for dev in instance.disks:
2038
    lu.cfg.SetDiskID(dev, node)
2039

    
2040
  retries = 0
2041
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2042
  while True:
2043
    max_time = 0
2044
    done = True
2045
    cumul_degraded = False
2046
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2047
    msg = rstats.fail_msg
2048
    if msg:
2049
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2050
      retries += 1
2051
      if retries >= 10:
2052
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2053
                                 " aborting." % node)
2054
      time.sleep(6)
2055
      continue
2056
    rstats = rstats.payload
2057
    retries = 0
2058
    for i, mstat in enumerate(rstats):
2059
      if mstat is None:
2060
        lu.LogWarning("Can't compute data for node %s/%s",
2061
                           node, instance.disks[i].iv_name)
2062
        continue
2063

    
2064
      cumul_degraded = (cumul_degraded or
2065
                        (mstat.is_degraded and mstat.sync_percent is None))
2066
      if mstat.sync_percent is not None:
2067
        done = False
2068
        if mstat.estimated_time is not None:
2069
          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2070
          max_time = mstat.estimated_time
2071
        else:
2072
          rem_time = "no time estimate"
2073
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2074
                        (instance.disks[i].iv_name, mstat.sync_percent,
2075
                         rem_time))
2076

    
2077
    # if we're done but degraded, let's do a few small retries, to
2078
    # make sure we see a stable and not transient situation; therefore
2079
    # we force restart of the loop
2080
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2081
      logging.info("Degraded disks found, %d retries left", degr_retries)
2082
      degr_retries -= 1
2083
      time.sleep(1)
2084
      continue
2085

    
2086
    if done or oneshot:
2087
      break
2088

    
2089
    time.sleep(min(60, max_time))
2090

    
2091
  if done:
2092
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2093
  return not cumul_degraded
2094

    
2095

    
2096
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2097
  """Check that mirrors are not degraded.
2098

2099
  The ldisk parameter, if True, will change the test from the
2100
  is_degraded attribute (which represents overall non-ok status for
2101
  the device(s)) to the ldisk (representing the local storage status).
2102

2103
  """
2104
  lu.cfg.SetDiskID(dev, node)
2105

    
2106
  result = True
2107

    
2108
  if on_primary or dev.AssembleOnSecondary():
2109
    rstats = lu.rpc.call_blockdev_find(node, dev)
2110
    msg = rstats.fail_msg
2111
    if msg:
2112
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2113
      result = False
2114
    elif not rstats.payload:
2115
      lu.LogWarning("Can't find disk on node %s", node)
2116
      result = False
2117
    else:
2118
      if ldisk:
2119
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2120
      else:
2121
        result = result and not rstats.payload.is_degraded
2122

    
2123
  if dev.children:
2124
    for child in dev.children:
2125
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2126

    
2127
  return result
2128

    
2129

    
2130
class LUDiagnoseOS(NoHooksLU):
2131
  """Logical unit for OS diagnose/query.
2132

2133
  """
2134
  _OP_REQP = ["output_fields", "names"]
2135
  REQ_BGL = False
2136
  _FIELDS_STATIC = utils.FieldSet()
2137
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2138

    
2139
  def ExpandNames(self):
2140
    if self.op.names:
2141
      raise errors.OpPrereqError("Selective OS query not supported")
2142

    
2143
    _CheckOutputFields(static=self._FIELDS_STATIC,
2144
                       dynamic=self._FIELDS_DYNAMIC,
2145
                       selected=self.op.output_fields)
2146

    
2147
    # Lock all nodes, in shared mode
2148
    # Temporary removal of locks, should be reverted later
2149
    # TODO: reintroduce locks when they are lighter-weight
2150
    self.needed_locks = {}
2151
    #self.share_locks[locking.LEVEL_NODE] = 1
2152
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2153

    
2154
  def CheckPrereq(self):
2155
    """Check prerequisites.
2156

2157
    """
2158

    
2159
  @staticmethod
2160
  def _DiagnoseByOS(node_list, rlist):
2161
    """Remaps a per-node return list into an a per-os per-node dictionary
2162

2163
    @param node_list: a list with the names of all nodes
2164
    @param rlist: a map with node names as keys and OS objects as values
2165

2166
    @rtype: dict
2167
    @return: a dictionary with osnames as keys and as value another map, with
2168
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
2169

2170
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2171
                                     (/srv/..., False, "invalid api")],
2172
                           "node2": [(/srv/..., True, "")]}
2173
          }
2174

2175
    """
2176
    all_os = {}
2177
    # we build here the list of nodes that didn't fail the RPC (at RPC
2178
    # level), so that nodes with a non-responding node daemon don't
2179
    # make all OSes invalid
2180
    good_nodes = [node_name for node_name in rlist
2181
                  if not rlist[node_name].fail_msg]
2182
    for node_name, nr in rlist.items():
2183
      if nr.fail_msg or not nr.payload:
2184
        continue
2185
      for name, path, status, diagnose in nr.payload:
2186
        if name not in all_os:
2187
          # build a list of nodes for this os containing empty lists
2188
          # for each node in node_list
2189
          all_os[name] = {}
2190
          for nname in good_nodes:
2191
            all_os[name][nname] = []
2192
        all_os[name][node_name].append((path, status, diagnose))
2193
    return all_os
2194

    
2195
  def Exec(self, feedback_fn):
2196
    """Compute the list of OSes.
2197

2198
    """
2199
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2200
    node_data = self.rpc.call_os_diagnose(valid_nodes)
2201
    pol = self._DiagnoseByOS(valid_nodes, node_data)
2202
    output = []
2203
    for os_name, os_data in pol.items():
2204
      row = []
2205
      for field in self.op.output_fields:
2206
        if field == "name":
2207
          val = os_name
2208
        elif field == "valid":
2209
          val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2210
        elif field == "node_status":
2211
          # this is just a copy of the dict
2212
          val = {}
2213
          for node_name, nos_list in os_data.items():
2214
            val[node_name] = nos_list
2215
        else:
2216
          raise errors.ParameterError(field)
2217
        row.append(val)
2218
      output.append(row)
2219

    
2220
    return output
2221

    
2222

    
2223
class LURemoveNode(LogicalUnit):
2224
  """Logical unit for removing a node.
2225

2226
  """
2227
  HPATH = "node-remove"
2228
  HTYPE = constants.HTYPE_NODE
2229
  _OP_REQP = ["node_name"]
2230

    
2231
  def BuildHooksEnv(self):
2232
    """Build hooks env.
2233

2234
    This doesn't run on the target node in the pre phase as a failed
2235
    node would then be impossible to remove.
2236

2237
    """
2238
    env = {
2239
      "OP_TARGET": self.op.node_name,
2240
      "NODE_NAME": self.op.node_name,
2241
      }
2242
    all_nodes = self.cfg.GetNodeList()
2243
    if self.op.node_name in all_nodes:
2244
      all_nodes.remove(self.op.node_name)
2245
    return env, all_nodes, all_nodes
2246

    
2247
  def CheckPrereq(self):
2248
    """Check prerequisites.
2249

2250
    This checks:
2251
     - the node exists in the configuration
2252
     - it does not have primary or secondary instances
2253
     - it's not the master
2254

2255
    Any errors are signaled by raising errors.OpPrereqError.
2256

2257
    """
2258
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2259
    if node is None:
2260
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2261

    
2262
    instance_list = self.cfg.GetInstanceList()
2263

    
2264
    masternode = self.cfg.GetMasterNode()
2265
    if node.name == masternode:
2266
      raise errors.OpPrereqError("Node is the master node,"
2267
                                 " you need to failover first.")
2268

    
2269
    for instance_name in instance_list:
2270
      instance = self.cfg.GetInstanceInfo(instance_name)
2271
      if node.name in instance.all_nodes:
2272
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2273
                                   " please remove first." % instance_name)
2274
    self.op.node_name = node.name
2275
    self.node = node
2276

    
2277
  def Exec(self, feedback_fn):
2278
    """Removes the node from the cluster.
2279

2280
    """
2281
    node = self.node
2282
    logging.info("Stopping the node daemon and removing configs from node %s",
2283
                 node.name)
2284

    
2285
    self.context.RemoveNode(node.name)
2286

    
2287
    # Run post hooks on the node before it's removed
2288
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2289
    try:
2290
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2291
    except:
2292
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
2293

    
2294
    result = self.rpc.call_node_leave_cluster(node.name)
2295
    msg = result.fail_msg
2296
    if msg:
2297
      self.LogWarning("Errors encountered on the remote node while leaving"
2298
                      " the cluster: %s", msg)
2299

    
2300
    # Promote nodes to master candidate as needed
2301
    _AdjustCandidatePool(self)
2302

    
2303

    
2304
class LUQueryNodes(NoHooksLU):
2305
  """Logical unit for querying nodes.
2306

2307
  """
2308
  _OP_REQP = ["output_fields", "names", "use_locking"]
2309
  REQ_BGL = False
2310

    
2311
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2312
                    "master_candidate", "offline", "drained"]
2313

    
2314
  _FIELDS_DYNAMIC = utils.FieldSet(
2315
    "dtotal", "dfree",
2316
    "mtotal", "mnode", "mfree",
2317
    "bootid",
2318
    "ctotal", "cnodes", "csockets",
2319
    )
2320

    
2321
  _FIELDS_STATIC = utils.FieldSet(*[
2322
    "pinst_cnt", "sinst_cnt",
2323
    "pinst_list", "sinst_list",
2324
    "pip", "sip", "tags",
2325
    "master",
2326
    "role"] + _SIMPLE_FIELDS
2327
    )
2328

    
2329
  def ExpandNames(self):
2330
    _CheckOutputFields(static=self._FIELDS_STATIC,
2331
                       dynamic=self._FIELDS_DYNAMIC,
2332
                       selected=self.op.output_fields)
2333

    
2334
    self.needed_locks = {}
2335
    self.share_locks[locking.LEVEL_NODE] = 1
2336

    
2337
    if self.op.names:
2338
      self.wanted = _GetWantedNodes(self, self.op.names)
2339
    else:
2340
      self.wanted = locking.ALL_SET
2341

    
2342
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2343
    self.do_locking = self.do_node_query and self.op.use_locking
2344
    if self.do_locking:
2345
      # if we don't request only static fields, we need to lock the nodes
2346
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2347

    
2348

    
2349
  def CheckPrereq(self):
2350
    """Check prerequisites.
2351

2352
    """
2353
    # The validation of the node list is done in the _GetWantedNodes,
2354
    # if non empty, and if empty, there's no validation to do
2355
    pass
2356

    
2357
  def Exec(self, feedback_fn):
2358
    """Computes the list of nodes and their attributes.
2359

2360
    """
2361
    all_info = self.cfg.GetAllNodesInfo()
2362
    if self.do_locking:
2363
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2364
    elif self.wanted != locking.ALL_SET:
2365
      nodenames = self.wanted
2366
      missing = set(nodenames).difference(all_info.keys())
2367
      if missing:
2368
        raise errors.OpExecError(
2369
          "Some nodes were removed before retrieving their data: %s" % missing)
2370
    else:
2371
      nodenames = all_info.keys()
2372

    
2373
    nodenames = utils.NiceSort(nodenames)
2374
    nodelist = [all_info[name] for name in nodenames]
2375

    
2376
    # begin data gathering
2377

    
2378
    if self.do_node_query:
2379
      live_data = {}
2380
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2381
                                          self.cfg.GetHypervisorType())
2382
      for name in nodenames:
2383
        nodeinfo = node_data[name]
2384
        if not nodeinfo.fail_msg and nodeinfo.payload:
2385
          nodeinfo = nodeinfo.payload
2386
          fn = utils.TryConvert
2387
          live_data[name] = {
2388
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2389
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2390
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2391
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2392
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2393
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2394
            "bootid": nodeinfo.get('bootid', None),
2395
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2396
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2397
            }
2398
        else:
2399
          live_data[name] = {}
2400
    else:
2401
      live_data = dict.fromkeys(nodenames, {})
2402

    
2403
    node_to_primary = dict([(name, set()) for name in nodenames])
2404
    node_to_secondary = dict([(name, set()) for name in nodenames])
2405

    
2406
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2407
                             "sinst_cnt", "sinst_list"))
2408
    if inst_fields & frozenset(self.op.output_fields):
2409
      instancelist = self.cfg.GetInstanceList()
2410

    
2411
      for instance_name in instancelist:
2412
        inst = self.cfg.GetInstanceInfo(instance_name)
2413
        if inst.primary_node in node_to_primary:
2414
          node_to_primary[inst.primary_node].add(inst.name)
2415
        for secnode in inst.secondary_nodes:
2416
          if secnode in node_to_secondary:
2417
            node_to_secondary[secnode].add(inst.name)
2418

    
2419
    master_node = self.cfg.GetMasterNode()
2420

    
2421
    # end data gathering
2422

    
2423
    output = []
2424
    for node in nodelist:
2425
      node_output = []
2426
      for field in self.op.output_fields:
2427
        if field in self._SIMPLE_FIELDS:
2428
          val = getattr(node, field)
2429
        elif field == "pinst_list":
2430
          val = list(node_to_primary[node.name])
2431
        elif field == "sinst_list":
2432
          val = list(node_to_secondary[node.name])
2433
        elif field == "pinst_cnt":
2434
          val = len(node_to_primary[node.name])
2435
        elif field == "sinst_cnt":
2436
          val = len(node_to_secondary[node.name])
2437
        elif field == "pip":
2438
          val = node.primary_ip
2439
        elif field == "sip":
2440
          val = node.secondary_ip
2441
        elif field == "tags":
2442
          val = list(node.GetTags())
2443
        elif field == "master":
2444
          val = node.name == master_node
2445
        elif self._FIELDS_DYNAMIC.Matches(field):
2446
          val = live_data[node.name].get(field, None)
2447
        elif field == "role":
2448
          if node.name == master_node:
2449
            val = "M"
2450
          elif node.master_candidate:
2451
            val = "C"
2452
          elif node.drained:
2453
            val = "D"
2454
          elif node.offline:
2455
            val = "O"
2456
          else:
2457
            val = "R"
2458
        else:
2459
          raise errors.ParameterError(field)
2460
        node_output.append(val)
2461
      output.append(node_output)
2462

    
2463
    return output
2464

    
2465

    
2466
class LUQueryNodeVolumes(NoHooksLU):
2467
  """Logical unit for getting volumes on node(s).
2468

2469
  """
2470
  _OP_REQP = ["nodes", "output_fields"]
2471
  REQ_BGL = False
2472
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2473
  _FIELDS_STATIC = utils.FieldSet("node")
2474

    
2475
  def ExpandNames(self):
2476
    _CheckOutputFields(static=self._FIELDS_STATIC,
2477
                       dynamic=self._FIELDS_DYNAMIC,
2478
                       selected=self.op.output_fields)
2479

    
2480
    self.needed_locks = {}
2481
    self.share_locks[locking.LEVEL_NODE] = 1
2482
    if not self.op.nodes:
2483
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2484
    else:
2485
      self.needed_locks[locking.LEVEL_NODE] = \
2486
        _GetWantedNodes(self, self.op.nodes)
2487

    
2488
  def CheckPrereq(self):
2489
    """Check prerequisites.
2490

2491
    This checks that the fields required are valid output fields.
2492

2493
    """
2494
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2495

    
2496
  def Exec(self, feedback_fn):
2497
    """Computes the list of nodes and their attributes.
2498

2499
    """
2500
    nodenames = self.nodes
2501
    volumes = self.rpc.call_node_volumes(nodenames)
2502

    
2503
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2504
             in self.cfg.GetInstanceList()]
2505

    
2506
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2507

    
2508
    output = []
2509
    for node in nodenames:
2510
      nresult = volumes[node]
2511
      if nresult.offline:
2512
        continue
2513
      msg = nresult.fail_msg
2514
      if msg:
2515
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2516
        continue
2517

    
2518
      node_vols = nresult.payload[:]
2519
      node_vols.sort(key=lambda vol: vol['dev'])
2520

    
2521
      for vol in node_vols:
2522
        node_output = []
2523
        for field in self.op.output_fields:
2524
          if field == "node":
2525
            val = node
2526
          elif field == "phys":
2527
            val = vol['dev']
2528
          elif field == "vg":
2529
            val = vol['vg']
2530
          elif field == "name":
2531
            val = vol['name']
2532
          elif field == "size":
2533
            val = int(float(vol['size']))
2534
          elif field == "instance":
2535
            for inst in ilist:
2536
              if node not in lv_by_node[inst]:
2537
                continue
2538
              if vol['name'] in lv_by_node[inst][node]:
2539
                val = inst.name
2540
                break
2541
            else:
2542
              val = '-'
2543
          else:
2544
            raise errors.ParameterError(field)
2545
          node_output.append(str(val))
2546

    
2547
        output.append(node_output)
2548

    
2549
    return output
2550

    
2551

    
2552
class LUQueryNodeStorage(NoHooksLU):
2553
  """Logical unit for getting information on storage units on node(s).
2554

2555
  """
2556
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
2557
  REQ_BGL = False
2558
  _FIELDS_STATIC = utils.FieldSet("node")
2559

    
2560
  def ExpandNames(self):
2561
    storage_type = self.op.storage_type
2562

    
2563
    if storage_type not in constants.VALID_STORAGE_FIELDS:
2564
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2565

    
2566
    dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2567

    
2568
    _CheckOutputFields(static=self._FIELDS_STATIC,
2569
                       dynamic=utils.FieldSet(*dynamic_fields),
2570
                       selected=self.op.output_fields)
2571

    
2572
    self.needed_locks = {}
2573
    self.share_locks[locking.LEVEL_NODE] = 1
2574

    
2575
    if self.op.nodes:
2576
      self.needed_locks[locking.LEVEL_NODE] = \
2577
        _GetWantedNodes(self, self.op.nodes)
2578
    else:
2579
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2580

    
2581
  def CheckPrereq(self):
2582
    """Check prerequisites.
2583

2584
    This checks that the fields required are valid output fields.
2585

2586
    """
2587
    self.op.name = getattr(self.op, "name", None)
2588

    
2589
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2590

    
2591
  def Exec(self, feedback_fn):
2592
    """Computes the list of nodes and their attributes.
2593

2594
    """
2595
    # Always get name to sort by
2596
    if constants.SF_NAME in self.op.output_fields:
2597
      fields = self.op.output_fields[:]
2598
    else:
2599
      fields = [constants.SF_NAME] + self.op.output_fields
2600

    
2601
    # Never ask for node as it's only known to the LU
2602
    while "node" in fields:
2603
      fields.remove("node")
2604

    
2605
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2606
    name_idx = field_idx[constants.SF_NAME]
2607

    
2608
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2609
    data = self.rpc.call_storage_list(self.nodes,
2610
                                      self.op.storage_type, st_args,
2611
                                      self.op.name, fields)
2612

    
2613
    result = []
2614

    
2615
    for node in utils.NiceSort(self.nodes):
2616
      nresult = data[node]
2617
      if nresult.offline:
2618
        continue
2619

    
2620
      msg = nresult.fail_msg
2621
      if msg:
2622
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2623
        continue
2624

    
2625
      rows = dict([(row[name_idx], row) for row in nresult.payload])
2626

    
2627
      for name in utils.NiceSort(rows.keys()):
2628
        row = rows[name]
2629

    
2630
        out = []
2631

    
2632
        for field in self.op.output_fields:
2633
          if field == "node":
2634
            val = node
2635
          elif field in field_idx:
2636
            val = row[field_idx[field]]
2637
          else:
2638
            raise errors.ParameterError(field)
2639

    
2640
          out.append(val)
2641

    
2642
        result.append(out)
2643

    
2644
    return result
2645

    
2646

    
2647
class LUModifyNodeStorage(NoHooksLU):
2648
  """Logical unit for modifying a storage volume on a node.
2649

2650
  """
2651
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2652
  REQ_BGL = False
2653

    
2654
  def CheckArguments(self):
2655
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2656
    if node_name is None:
2657
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2658

    
2659
    self.op.node_name = node_name
2660

    
2661
    storage_type = self.op.storage_type
2662
    if storage_type not in constants.VALID_STORAGE_FIELDS:
2663
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2664

    
2665
  def ExpandNames(self):
2666
    self.needed_locks = {
2667
      locking.LEVEL_NODE: self.op.node_name,
2668
      }
2669

    
2670
  def CheckPrereq(self):
2671
    """Check prerequisites.
2672

2673
    """
2674
    storage_type = self.op.storage_type
2675

    
2676
    try:
2677
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2678
    except KeyError:
2679
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
2680
                                 " modified" % storage_type)
2681

    
2682
    diff = set(self.op.changes.keys()) - modifiable
2683
    if diff:
2684
      raise errors.OpPrereqError("The following fields can not be modified for"
2685
                                 " storage units of type '%s': %r" %
2686
                                 (storage_type, list(diff)))
2687

    
2688
  def Exec(self, feedback_fn):
2689
    """Computes the list of nodes and their attributes.
2690

2691
    """
2692
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2693
    result = self.rpc.call_storage_modify(self.op.node_name,
2694
                                          self.op.storage_type, st_args,
2695
                                          self.op.name, self.op.changes)
2696
    result.Raise("Failed to modify storage unit '%s' on %s" %
2697
                 (self.op.name, self.op.node_name))
2698

    
2699

    
2700
class LUAddNode(LogicalUnit):
2701
  """Logical unit for adding node to the cluster.
2702

2703
  """
2704
  HPATH = "node-add"
2705
  HTYPE = constants.HTYPE_NODE
2706
  _OP_REQP = ["node_name"]
2707

    
2708
  def BuildHooksEnv(self):
2709
    """Build hooks env.
2710

2711
    This will run on all nodes before, and on all nodes + the new node after.
2712

2713
    """
2714
    env = {
2715
      "OP_TARGET": self.op.node_name,
2716
      "NODE_NAME": self.op.node_name,
2717
      "NODE_PIP": self.op.primary_ip,
2718
      "NODE_SIP": self.op.secondary_ip,
2719
      }
2720
    nodes_0 = self.cfg.GetNodeList()
2721
    nodes_1 = nodes_0 + [self.op.node_name, ]
2722
    return env, nodes_0, nodes_1
2723

    
2724
  def CheckPrereq(self):
2725
    """Check prerequisites.
2726

2727
    This checks:
2728
     - the new node is not already in the config
2729
     - it is resolvable
2730
     - its parameters (single/dual homed) matches the cluster
2731

2732
    Any errors are signaled by raising errors.OpPrereqError.
2733

2734
    """
2735
    node_name = self.op.node_name
2736
    cfg = self.cfg
2737

    
2738
    dns_data = utils.HostInfo(node_name)
2739

    
2740
    node = dns_data.name
2741
    primary_ip = self.op.primary_ip = dns_data.ip
2742
    secondary_ip = getattr(self.op, "secondary_ip", None)
2743
    if secondary_ip is None:
2744
      secondary_ip = primary_ip
2745
    if not utils.IsValidIP(secondary_ip):
2746
      raise errors.OpPrereqError("Invalid secondary IP given")
2747
    self.op.secondary_ip = secondary_ip
2748

    
2749
    node_list = cfg.GetNodeList()
2750
    if not self.op.readd and node in node_list:
2751
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2752
                                 node)
2753
    elif self.op.readd and node not in node_list:
2754
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2755

    
2756
    for existing_node_name in node_list:
2757
      existing_node = cfg.GetNodeInfo(existing_node_name)
2758

    
2759
      if self.op.readd and node == existing_node_name:
2760
        if (existing_node.primary_ip != primary_ip or
2761
            existing_node.secondary_ip != secondary_ip):
2762
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2763
                                     " address configuration as before")
2764
        continue
2765

    
2766
      if (existing_node.primary_ip == primary_ip or
2767
          existing_node.secondary_ip == primary_ip or
2768
          existing_node.primary_ip == secondary_ip or
2769
          existing_node.secondary_ip == secondary_ip):
2770
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2771
                                   " existing node %s" % existing_node.name)
2772

    
2773
    # check that the type of the node (single versus dual homed) is the
2774
    # same as for the master
2775
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2776
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2777
    newbie_singlehomed = secondary_ip == primary_ip
2778
    if master_singlehomed != newbie_singlehomed:
2779
      if master_singlehomed:
2780
        raise errors.OpPrereqError("The master has no private ip but the"
2781
                                   " new node has one")
2782
      else:
2783
        raise errors.OpPrereqError("The master has a private ip but the"
2784
                                   " new node doesn't have one")
2785

    
2786
    # checks reachability
2787
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2788
      raise errors.OpPrereqError("Node not reachable by ping")
2789

    
2790
    if not newbie_singlehomed:
2791
      # check reachability from my secondary ip to newbie's secondary ip
2792
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2793
                           source=myself.secondary_ip):
2794
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2795
                                   " based ping to noded port")
2796

    
2797
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2798
    if self.op.readd:
2799
      exceptions = [node]
2800
    else:
2801
      exceptions = []
2802
    mc_now, mc_max, _ = self.cfg.GetMasterCandidateStats(exceptions)
2803
    # the new node will increase mc_max with one, so:
2804
    mc_max = min(mc_max + 1, cp_size)
2805
    self.master_candidate = mc_now < mc_max
2806

    
2807
    if self.op.readd:
2808
      self.new_node = self.cfg.GetNodeInfo(node)
2809
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2810
    else:
2811
      self.new_node = objects.Node(name=node,
2812
                                   primary_ip=primary_ip,
2813
                                   secondary_ip=secondary_ip,
2814
                                   master_candidate=self.master_candidate,
2815
                                   offline=False, drained=False)
2816

    
2817
  def Exec(self, feedback_fn):
2818
    """Adds the new node to the cluster.
2819

2820
    """
2821
    new_node = self.new_node
2822
    node = new_node.name
2823

    
2824
    # for re-adds, reset the offline/drained/master-candidate flags;
2825
    # we need to reset here, otherwise offline would prevent RPC calls
2826
    # later in the procedure; this also means that if the re-add
2827
    # fails, we are left with a non-offlined, broken node
2828
    if self.op.readd:
2829
      new_node.drained = new_node.offline = False
2830
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2831
      # if we demote the node, we do cleanup later in the procedure
2832
      new_node.master_candidate = self.master_candidate
2833

    
2834
    # notify the user about any possible mc promotion
2835
    if new_node.master_candidate:
2836
      self.LogInfo("Node will be a master candidate")
2837

    
2838
    # check connectivity
2839
    result = self.rpc.call_version([node])[node]
2840
    result.Raise("Can't get version information from node %s" % node)
2841
    if constants.PROTOCOL_VERSION == result.payload:
2842
      logging.info("Communication to node %s fine, sw version %s match",
2843
                   node, result.payload)
2844
    else:
2845
      raise errors.OpExecError("Version mismatch master version %s,"
2846
                               " node version %s" %
2847
                               (constants.PROTOCOL_VERSION, result.payload))
2848

    
2849
    # setup ssh on node
2850
    logging.info("Copy ssh key to node %s", node)
2851
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2852
    keyarray = []
2853
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2854
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2855
                priv_key, pub_key]
2856

    
2857
    for i in keyfiles:
2858
      keyarray.append(utils.ReadFile(i))
2859

    
2860
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2861
                                    keyarray[2],
2862
                                    keyarray[3], keyarray[4], keyarray[5])
2863
    result.Raise("Cannot transfer ssh keys to the new node")
2864

    
2865
    # Add node to our /etc/hosts, and add key to known_hosts
2866
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2867
      utils.AddHostToEtcHosts(new_node.name)
2868

    
2869
    if new_node.secondary_ip != new_node.primary_ip:
2870
      result = self.rpc.call_node_has_ip_address(new_node.name,
2871
                                                 new_node.secondary_ip)
2872
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2873
                   prereq=True)
2874
      if not result.payload:
2875
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2876
                                 " you gave (%s). Please fix and re-run this"
2877
                                 " command." % new_node.secondary_ip)
2878

    
2879
    node_verify_list = [self.cfg.GetMasterNode()]
2880
    node_verify_param = {
2881
      constants.NV_NODELIST: [node],
2882
      # TODO: do a node-net-test as well?
2883
    }
2884

    
2885
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2886
                                       self.cfg.GetClusterName())
2887
    for verifier in node_verify_list:
2888
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2889
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
2890
      if nl_payload:
2891
        for failed in nl_payload:
2892
          feedback_fn("ssh/hostname verification failed"
2893
                      " (checking from %s): %s" %
2894
                      (verifier, nl_payload[failed]))
2895
        raise errors.OpExecError("ssh/hostname verification failed.")
2896

    
2897
    if self.op.readd:
2898
      _RedistributeAncillaryFiles(self)
2899
      self.context.ReaddNode(new_node)
2900
      # make sure we redistribute the config
2901
      self.cfg.Update(new_node)
2902
      # and make sure the new node will not have old files around
2903
      if not new_node.master_candidate:
2904
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2905
        msg = result.fail_msg
2906
        if msg:
2907
          self.LogWarning("Node failed to demote itself from master"
2908
                          " candidate status: %s" % msg)
2909
    else:
2910
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2911
      self.context.AddNode(new_node)
2912

    
2913

    
2914
class LUSetNodeParams(LogicalUnit):
2915
  """Modifies the parameters of a node.
2916

2917
  """
2918
  HPATH = "node-modify"
2919
  HTYPE = constants.HTYPE_NODE
2920
  _OP_REQP = ["node_name"]
2921
  REQ_BGL = False
2922

    
2923
  def CheckArguments(self):
2924
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2925
    if node_name is None:
2926
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2927
    self.op.node_name = node_name
2928
    _CheckBooleanOpField(self.op, 'master_candidate')
2929
    _CheckBooleanOpField(self.op, 'offline')
2930
    _CheckBooleanOpField(self.op, 'drained')
2931
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2932
    if all_mods.count(None) == 3:
2933
      raise errors.OpPrereqError("Please pass at least one modification")
2934
    if all_mods.count(True) > 1:
2935
      raise errors.OpPrereqError("Can't set the node into more than one"
2936
                                 " state at the same time")
2937

    
2938
  def ExpandNames(self):
2939
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2940

    
2941
  def BuildHooksEnv(self):
2942
    """Build hooks env.
2943

2944
    This runs on the master node.
2945

2946
    """
2947
    env = {
2948
      "OP_TARGET": self.op.node_name,
2949
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2950
      "OFFLINE": str(self.op.offline),
2951
      "DRAINED": str(self.op.drained),
2952
      }
2953
    nl = [self.cfg.GetMasterNode(),
2954
          self.op.node_name]
2955
    return env, nl, nl
2956

    
2957
  def CheckPrereq(self):
2958
    """Check prerequisites.
2959

2960
    This only checks the instance list against the existing names.
2961

2962
    """
2963
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2964

    
2965
    if (self.op.master_candidate is not None or
2966
        self.op.drained is not None or
2967
        self.op.offline is not None):
2968
      # we can't change the master's node flags
2969
      if self.op.node_name == self.cfg.GetMasterNode():
2970
        raise errors.OpPrereqError("The master role can be changed"
2971
                                   " only via masterfailover")
2972

    
2973
    if ((self.op.master_candidate == False or self.op.offline == True or
2974
         self.op.drained == True) and node.master_candidate):
2975
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2976
      num_candidates, _, _ = self.cfg.GetMasterCandidateStats()
2977
      if num_candidates <= cp_size:
2978
        msg = ("Not enough master candidates (desired"
2979
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2980
        if self.op.force:
2981
          self.LogWarning(msg)
2982
        else:
2983
          raise errors.OpPrereqError(msg)
2984

    
2985
    if (self.op.master_candidate == True and
2986
        ((node.offline and not self.op.offline == False) or
2987
         (node.drained and not self.op.drained == False))):
2988
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2989
                                 " to master_candidate" % node.name)
2990

    
2991
    return
2992

    
2993
  def Exec(self, feedback_fn):
2994
    """Modifies a node.
2995

2996
    """
2997
    node = self.node
2998

    
2999
    result = []
3000
    changed_mc = False
3001

    
3002
    if self.op.offline is not None:
3003
      node.offline = self.op.offline
3004
      result.append(("offline", str(self.op.offline)))
3005
      if self.op.offline == True:
3006
        if node.master_candidate:
3007
          node.master_candidate = False
3008
          changed_mc = True
3009
          result.append(("master_candidate", "auto-demotion due to offline"))
3010
        if node.drained:
3011
          node.drained = False
3012
          result.append(("drained", "clear drained status due to offline"))
3013

    
3014
    if self.op.master_candidate is not None:
3015
      node.master_candidate = self.op.master_candidate
3016
      changed_mc = True
3017
      result.append(("master_candidate", str(self.op.master_candidate)))
3018
      if self.op.master_candidate == False:
3019
        rrc = self.rpc.call_node_demote_from_mc(node.name)
3020
        msg = rrc.fail_msg
3021
        if msg:
3022
          self.LogWarning("Node failed to demote itself: %s" % msg)
3023

    
3024
    if self.op.drained is not None:
3025
      node.drained = self.op.drained
3026
      result.append(("drained", str(self.op.drained)))
3027
      if self.op.drained == True:
3028
        if node.master_candidate:
3029
          node.master_candidate = False
3030
          changed_mc = True
3031
          result.append(("master_candidate", "auto-demotion due to drain"))
3032
          rrc = self.rpc.call_node_demote_from_mc(node.name)
3033
          msg = rrc.fail_msg
3034
          if msg:
3035
            self.LogWarning("Node failed to demote itself: %s" % msg)
3036
        if node.offline:
3037
          node.offline = False
3038
          result.append(("offline", "clear offline status due to drain"))
3039

    
3040
    # this will trigger configuration file update, if needed
3041
    self.cfg.Update(node)
3042
    # this will trigger job queue propagation or cleanup
3043
    if changed_mc:
3044
      self.context.ReaddNode(node)
3045

    
3046
    return result
3047

    
3048

    
3049
class LUPowercycleNode(NoHooksLU):
3050
  """Powercycles a node.
3051

3052
  """
3053
  _OP_REQP = ["node_name", "force"]
3054
  REQ_BGL = False
3055

    
3056
  def CheckArguments(self):
3057
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
3058
    if node_name is None:
3059
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
3060
    self.op.node_name = node_name
3061
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
3062
      raise errors.OpPrereqError("The node is the master and the force"
3063
                                 " parameter was not set")
3064

    
3065
  def ExpandNames(self):
3066
    """Locking for PowercycleNode.
3067

3068
    This is a last-resort option and shouldn't block on other
3069
    jobs. Therefore, we grab no locks.
3070

3071
    """
3072
    self.needed_locks = {}
3073

    
3074
  def CheckPrereq(self):
3075
    """Check prerequisites.
3076

3077
    This LU has no prereqs.
3078

3079
    """
3080
    pass
3081

    
3082
  def Exec(self, feedback_fn):
3083
    """Reboots a node.
3084

3085
    """
3086
    result = self.rpc.call_node_powercycle(self.op.node_name,
3087
                                           self.cfg.GetHypervisorType())
3088
    result.Raise("Failed to schedule the reboot")
3089
    return result.payload
3090

    
3091

    
3092
class LUQueryClusterInfo(NoHooksLU):
3093
  """Query cluster configuration.
3094

3095
  """
3096
  _OP_REQP = []
3097
  REQ_BGL = False
3098

    
3099
  def ExpandNames(self):
3100
    self.needed_locks = {}
3101

    
3102
  def CheckPrereq(self):
3103
    """No prerequsites needed for this LU.
3104

3105
    """
3106
    pass
3107

    
3108
  def Exec(self, feedback_fn):
3109
    """Return cluster config.
3110

3111
    """
3112
    cluster = self.cfg.GetClusterInfo()
3113
    result = {
3114
      "software_version": constants.RELEASE_VERSION,
3115
      "protocol_version": constants.PROTOCOL_VERSION,
3116
      "config_version": constants.CONFIG_VERSION,
3117
      "os_api_version": max(constants.OS_API_VERSIONS),
3118
      "export_version": constants.EXPORT_VERSION,
3119
      "architecture": (platform.architecture()[0], platform.machine()),
3120
      "name": cluster.cluster_name,
3121
      "master": cluster.master_node,
3122
      "default_hypervisor": cluster.enabled_hypervisors[0],
3123
      "enabled_hypervisors": cluster.enabled_hypervisors,
3124
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3125
                        for hypervisor_name in cluster.enabled_hypervisors]),
3126
      "beparams": cluster.beparams,
3127
      "nicparams": cluster.nicparams,
3128
      "candidate_pool_size": cluster.candidate_pool_size,
3129
      "master_netdev": cluster.master_netdev,
3130
      "volume_group_name": cluster.volume_group_name,
3131
      "file_storage_dir": cluster.file_storage_dir,
3132
      "ctime": cluster.ctime,
3133
      "mtime": cluster.mtime,
3134
      "uuid": cluster.uuid,
3135
      "tags": list(cluster.GetTags()),
3136
      }
3137

    
3138
    return result
3139

    
3140

    
3141
class LUQueryConfigValues(NoHooksLU):
3142
  """Return configuration values.
3143

3144
  """
3145
  _OP_REQP = []
3146
  REQ_BGL = False
3147
  _FIELDS_DYNAMIC = utils.FieldSet()
3148
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3149
                                  "watcher_pause")
3150

    
3151
  def ExpandNames(self):
3152
    self.needed_locks = {}
3153

    
3154
    _CheckOutputFields(static=self._FIELDS_STATIC,
3155
                       dynamic=self._FIELDS_DYNAMIC,
3156
                       selected=self.op.output_fields)
3157

    
3158
  def CheckPrereq(self):
3159
    """No prerequisites.
3160

3161
    """
3162
    pass
3163

    
3164
  def Exec(self, feedback_fn):
3165
    """Dump a representation of the cluster config to the standard output.
3166

3167
    """
3168
    values = []
3169
    for field in self.op.output_fields:
3170
      if field == "cluster_name":
3171
        entry = self.cfg.GetClusterName()
3172
      elif field == "master_node":
3173
        entry = self.cfg.GetMasterNode()
3174
      elif field == "drain_flag":
3175
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3176
      elif field == "watcher_pause":
3177
        return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3178
      else:
3179
        raise errors.ParameterError(field)
3180
      values.append(entry)
3181
    return values
3182

    
3183

    
3184
class LUActivateInstanceDisks(NoHooksLU):
3185
  """Bring up an instance's disks.
3186

3187
  """
3188
  _OP_REQP = ["instance_name"]
3189
  REQ_BGL = False
3190

    
3191
  def ExpandNames(self):
3192
    self._ExpandAndLockInstance()
3193
    self.needed_locks[locking.LEVEL_NODE] = []
3194
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3195

    
3196
  def DeclareLocks(self, level):
3197
    if level == locking.LEVEL_NODE:
3198
      self._LockInstancesNodes()
3199

    
3200
  def CheckPrereq(self):
3201
    """Check prerequisites.
3202

3203
    This checks that the instance is in the cluster.
3204

3205
    """
3206
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3207
    assert self.instance is not None, \
3208
      "Cannot retrieve locked instance %s" % self.op.instance_name
3209
    _CheckNodeOnline(self, self.instance.primary_node)
3210
    if not hasattr(self.op, "ignore_size"):
3211
      self.op.ignore_size = False
3212

    
3213
  def Exec(self, feedback_fn):
3214
    """Activate the disks.
3215

3216
    """
3217
    disks_ok, disks_info = \
3218
              _AssembleInstanceDisks(self, self.instance,
3219
                                     ignore_size=self.op.ignore_size)
3220
    if not disks_ok:
3221
      raise errors.OpExecError("Cannot activate block devices")
3222

    
3223
    return disks_info
3224

    
3225

    
3226
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3227
                           ignore_size=False):
3228
  """Prepare the block devices for an instance.
3229

3230
  This sets up the block devices on all nodes.
3231

3232
  @type lu: L{LogicalUnit}
3233
  @param lu: the logical unit on whose behalf we execute
3234
  @type instance: L{objects.Instance}
3235
  @param instance: the instance for whose disks we assemble
3236
  @type ignore_secondaries: boolean
3237
  @param ignore_secondaries: if true, errors on secondary nodes
3238
      won't result in an error return from the function
3239
  @type ignore_size: boolean
3240
  @param ignore_size: if true, the current known size of the disk
3241
      will not be used during the disk activation, useful for cases
3242
      when the size is wrong
3243
  @return: False if the operation failed, otherwise a list of
3244
      (host, instance_visible_name, node_visible_name)
3245
      with the mapping from node devices to instance devices
3246

3247
  """
3248
  device_info = []
3249
  disks_ok = True
3250
  iname = instance.name
3251
  # With the two passes mechanism we try to reduce the window of
3252
  # opportunity for the race condition of switching DRBD to primary
3253
  # before handshaking occured, but we do not eliminate it
3254

    
3255
  # The proper fix would be to wait (with some limits) until the
3256
  # connection has been made and drbd transitions from WFConnection
3257
  # into any other network-connected state (Connected, SyncTarget,
3258
  # SyncSource, etc.)
3259

    
3260
  # 1st pass, assemble on all nodes in secondary mode
3261
  for inst_disk in instance.disks:
3262
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3263
      if ignore_size:
3264
        node_disk = node_disk.Copy()
3265
        node_disk.UnsetSize()
3266
      lu.cfg.SetDiskID(node_disk, node)
3267
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3268
      msg = result.fail_msg
3269
      if msg:
3270
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3271
                           " (is_primary=False, pass=1): %s",
3272
                           inst_disk.iv_name, node, msg)
3273
        if not ignore_secondaries:
3274
          disks_ok = False
3275

    
3276
  # FIXME: race condition on drbd migration to primary
3277

    
3278
  # 2nd pass, do only the primary node
3279
  for inst_disk in instance.disks:
3280
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3281
      if node != instance.primary_node:
3282
        continue
3283
      if ignore_size:
3284
        node_disk = node_disk.Copy()
3285
        node_disk.UnsetSize()
3286
      lu.cfg.SetDiskID(node_disk, node)
3287
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3288
      msg = result.fail_msg
3289
      if msg:
3290
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3291
                           " (is_primary=True, pass=2): %s",
3292
                           inst_disk.iv_name, node, msg)
3293
        disks_ok = False
3294
    device_info.append((instance.primary_node, inst_disk.iv_name,
3295
                        result.payload))
3296

    
3297
  # leave the disks configured for the primary node
3298
  # this is a workaround that would be fixed better by
3299
  # improving the logical/physical id handling
3300
  for disk in instance.disks:
3301
    lu.cfg.SetDiskID(disk, instance.primary_node)
3302

    
3303
  return disks_ok, device_info
3304

    
3305

    
3306
def _StartInstanceDisks(lu, instance, force):
3307
  """Start the disks of an instance.
3308

3309
  """
3310
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3311
                                           ignore_secondaries=force)
3312
  if not disks_ok:
3313
    _ShutdownInstanceDisks(lu, instance)
3314
    if force is not None and not force:
3315
      lu.proc.LogWarning("", hint="If the message above refers to a"
3316
                         " secondary node,"
3317
                         " you can retry the operation using '--force'.")
3318
    raise errors.OpExecError("Disk consistency error")
3319

    
3320

    
3321
class LUDeactivateInstanceDisks(NoHooksLU):
3322
  """Shutdown an instance's disks.
3323

3324
  """
3325
  _OP_REQP = ["instance_name"]
3326
  REQ_BGL = False
3327

    
3328
  def ExpandNames(self):
3329
    self._ExpandAndLockInstance()
3330
    self.needed_locks[locking.LEVEL_NODE] = []
3331
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3332

    
3333
  def DeclareLocks(self, level):
3334
    if level == locking.LEVEL_NODE:
3335
      self._LockInstancesNodes()
3336

    
3337
  def CheckPrereq(self):
3338
    """Check prerequisites.
3339

3340
    This checks that the instance is in the cluster.
3341

3342
    """
3343
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3344
    assert self.instance is not None, \
3345
      "Cannot retrieve locked instance %s" % self.op.instance_name
3346

    
3347
  def Exec(self, feedback_fn):
3348
    """Deactivate the disks
3349

3350
    """
3351
    instance = self.instance
3352
    _SafeShutdownInstanceDisks(self, instance)
3353

    
3354

    
3355
def _SafeShutdownInstanceDisks(lu, instance):
3356
  """Shutdown block devices of an instance.
3357

3358
  This function checks if an instance is running, before calling
3359
  _ShutdownInstanceDisks.
3360

3361
  """
3362
  pnode = instance.primary_node
3363
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3364
  ins_l.Raise("Can't contact node %s" % pnode)
3365

    
3366
  if instance.name in ins_l.payload:
3367
    raise errors.OpExecError("Instance is running, can't shutdown"
3368
                             " block devices.")
3369

    
3370
  _ShutdownInstanceDisks(lu, instance)
3371

    
3372

    
3373
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3374
  """Shutdown block devices of an instance.
3375

3376
  This does the shutdown on all nodes of the instance.
3377

3378
  If the ignore_primary is false, errors on the primary node are
3379
  ignored.
3380

3381
  """
3382
  all_result = True
3383
  for disk in instance.disks:
3384
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3385
      lu.cfg.SetDiskID(top_disk, node)
3386
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3387
      msg = result.fail_msg
3388
      if msg:
3389
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3390
                      disk.iv_name, node, msg)
3391
        if not ignore_primary or node != instance.primary_node:
3392
          all_result = False
3393
  return all_result
3394

    
3395

    
3396
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3397
  """Checks if a node has enough free memory.
3398

3399
  This function check if a given node has the needed amount of free
3400
  memory. In case the node has less memory or we cannot get the
3401
  information from the node, this function raise an OpPrereqError
3402
  exception.
3403

3404
  @type lu: C{LogicalUnit}
3405
  @param lu: a logical unit from which we get configuration data
3406
  @type node: C{str}
3407
  @param node: the node to check
3408
  @type reason: C{str}
3409
  @param reason: string to use in the error message
3410
  @type requested: C{int}
3411
  @param requested: the amount of memory in MiB to check for
3412
  @type hypervisor_name: C{str}
3413
  @param hypervisor_name: the hypervisor to ask for memory stats
3414
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3415
      we cannot check the node
3416

3417
  """
3418
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3419
  nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3420
  free_mem = nodeinfo[node].payload.get('memory_free', None)
3421
  if not isinstance(free_mem, int):
3422
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3423
                               " was '%s'" % (node, free_mem))
3424
  if requested > free_mem:
3425
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3426
                               " needed %s MiB, available %s MiB" %
3427
                               (node, reason, requested, free_mem))
3428

    
3429

    
3430
class LUStartupInstance(LogicalUnit):
3431
  """Starts an instance.
3432

3433
  """
3434
  HPATH = "instance-start"
3435
  HTYPE = constants.HTYPE_INSTANCE
3436
  _OP_REQP = ["instance_name", "force"]
3437
  REQ_BGL = False
3438

    
3439
  def ExpandNames(self):
3440
    self._ExpandAndLockInstance()
3441

    
3442
  def BuildHooksEnv(self):
3443
    """Build hooks env.
3444

3445
    This runs on master, primary and secondary nodes of the instance.
3446

3447
    """
3448
    env = {
3449
      "FORCE": self.op.force,
3450
      }
3451
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3452
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3453
    return env, nl, nl
3454

    
3455
  def CheckPrereq(self):
3456
    """Check prerequisites.
3457

3458
    This checks that the instance is in the cluster.
3459

3460
    """
3461
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3462
    assert self.instance is not None, \
3463
      "Cannot retrieve locked instance %s" % self.op.instance_name
3464

    
3465
    # extra beparams
3466
    self.beparams = getattr(self.op, "beparams", {})
3467
    if self.beparams:
3468
      if not isinstance(self.beparams, dict):
3469
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3470
                                   " dict" % (type(self.beparams), ))
3471
      # fill the beparams dict
3472
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3473
      self.op.beparams = self.beparams
3474

    
3475
    # extra hvparams
3476
    self.hvparams = getattr(self.op, "hvparams", {})
3477
    if self.hvparams:
3478
      if not isinstance(self.hvparams, dict):
3479
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3480
                                   " dict" % (type(self.hvparams), ))
3481

    
3482
      # check hypervisor parameter syntax (locally)
3483
      cluster = self.cfg.GetClusterInfo()
3484
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3485
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3486
                                    instance.hvparams)
3487
      filled_hvp.update(self.hvparams)
3488
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3489
      hv_type.CheckParameterSyntax(filled_hvp)
3490
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3491
      self.op.hvparams = self.hvparams
3492

    
3493
    _CheckNodeOnline(self, instance.primary_node)
3494

    
3495
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3496
    # check bridges existence
3497
    _CheckInstanceBridgesExist(self, instance)
3498

    
3499
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3500
                                              instance.name,
3501
                                              instance.hypervisor)
3502
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3503
                      prereq=True)
3504
    if not remote_info.payload: # not running already
3505
      _CheckNodeFreeMemory(self, instance.primary_node,
3506
                           "starting instance %s" % instance.name,
3507
                           bep[constants.BE_MEMORY], instance.hypervisor)
3508

    
3509
  def Exec(self, feedback_fn):
3510
    """Start the instance.
3511

3512
    """
3513
    instance = self.instance
3514
    force = self.op.force
3515

    
3516
    self.cfg.MarkInstanceUp(instance.name)
3517

    
3518
    node_current = instance.primary_node
3519

    
3520
    _StartInstanceDisks(self, instance, force)
3521

    
3522
    result = self.rpc.call_instance_start(node_current, instance,
3523
                                          self.hvparams, self.beparams)
3524
    msg = result.fail_msg
3525
    if msg:
3526
      _ShutdownInstanceDisks(self, instance)
3527
      raise errors.OpExecError("Could not start instance: %s" % msg)
3528

    
3529

    
3530
class LURebootInstance(LogicalUnit):
3531
  """Reboot an instance.
3532

3533
  """
3534
  HPATH = "instance-reboot"
3535
  HTYPE = constants.HTYPE_INSTANCE
3536
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3537
  REQ_BGL = False
3538

    
3539
  def ExpandNames(self):
3540
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3541
                                   constants.INSTANCE_REBOOT_HARD,
3542
                                   constants.INSTANCE_REBOOT_FULL]:
3543
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3544
                                  (constants.INSTANCE_REBOOT_SOFT,
3545
                                   constants.INSTANCE_REBOOT_HARD,
3546
                                   constants.INSTANCE_REBOOT_FULL))
3547
    self._ExpandAndLockInstance()
3548

    
3549
  def BuildHooksEnv(self):
3550
    """Build hooks env.
3551

3552
    This runs on master, primary and secondary nodes of the instance.
3553

3554
    """
3555
    env = {
3556
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3557
      "REBOOT_TYPE": self.op.reboot_type,
3558
      }
3559
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3560
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3561
    return env, nl, nl
3562

    
3563
  def CheckPrereq(self):
3564
    """Check prerequisites.
3565

3566
    This checks that the instance is in the cluster.
3567

3568
    """
3569
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3570
    assert self.instance is not None, \
3571
      "Cannot retrieve locked instance %s" % self.op.instance_name
3572

    
3573
    _CheckNodeOnline(self, instance.primary_node)
3574

    
3575
    # check bridges existence
3576
    _CheckInstanceBridgesExist(self, instance)
3577

    
3578
  def Exec(self, feedback_fn):
3579
    """Reboot the instance.
3580

3581
    """
3582
    instance = self.instance
3583
    ignore_secondaries = self.op.ignore_secondaries
3584
    reboot_type = self.op.reboot_type
3585

    
3586
    node_current = instance.primary_node
3587

    
3588
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3589
                       constants.INSTANCE_REBOOT_HARD]:
3590
      for disk in instance.disks:
3591
        self.cfg.SetDiskID(disk, node_current)
3592
      result = self.rpc.call_instance_reboot(node_current, instance,
3593
                                             reboot_type)
3594
      result.Raise("Could not reboot instance")
3595
    else:
3596
      result = self.rpc.call_instance_shutdown(node_current, instance)
3597
      result.Raise("Could not shutdown instance for full reboot")
3598
      _ShutdownInstanceDisks(self, instance)
3599
      _StartInstanceDisks(self, instance, ignore_secondaries)
3600
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3601
      msg = result.fail_msg
3602
      if msg:
3603
        _ShutdownInstanceDisks(self, instance)
3604
        raise errors.OpExecError("Could not start instance for"
3605
                                 " full reboot: %s" % msg)
3606

    
3607
    self.cfg.MarkInstanceUp(instance.name)
3608

    
3609

    
3610
class LUShutdownInstance(LogicalUnit):
3611
  """Shutdown an instance.
3612

3613
  """
3614
  HPATH = "instance-stop"
3615
  HTYPE = constants.HTYPE_INSTANCE
3616
  _OP_REQP = ["instance_name"]
3617
  REQ_BGL = False
3618

    
3619
  def ExpandNames(self):
3620
    self._ExpandAndLockInstance()
3621

    
3622
  def BuildHooksEnv(self):
3623
    """Build hooks env.
3624

3625
    This runs on master, primary and secondary nodes of the instance.
3626

3627
    """
3628
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3629
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3630
    return env, nl, nl
3631

    
3632
  def CheckPrereq(self):
3633
    """Check prerequisites.
3634

3635
    This checks that the instance is in the cluster.
3636

3637
    """
3638
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3639
    assert self.instance is not None, \
3640
      "Cannot retrieve locked instance %s" % self.op.instance_name
3641
    _CheckNodeOnline(self, self.instance.primary_node)
3642

    
3643
  def Exec(self, feedback_fn):
3644
    """Shutdown the instance.
3645

3646
    """
3647
    instance = self.instance
3648
    node_current = instance.primary_node
3649
    self.cfg.MarkInstanceDown(instance.name)
3650
    result = self.rpc.call_instance_shutdown(node_current, instance)
3651
    msg = result.fail_msg
3652
    if msg:
3653
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3654

    
3655
    _ShutdownInstanceDisks(self, instance)
3656

    
3657

    
3658
class LUReinstallInstance(LogicalUnit):
3659
  """Reinstall an instance.
3660

3661
  """
3662
  HPATH = "instance-reinstall"
3663
  HTYPE = constants.HTYPE_INSTANCE
3664
  _OP_REQP = ["instance_name"]
3665
  REQ_BGL = False
3666

    
3667
  def ExpandNames(self):
3668
    self._ExpandAndLockInstance()
3669

    
3670
  def BuildHooksEnv(self):
3671
    """Build hooks env.
3672

3673
    This runs on master, primary and secondary nodes of the instance.
3674

3675
    """
3676
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3677
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3678
    return env, nl, nl
3679

    
3680
  def CheckPrereq(self):
3681
    """Check prerequisites.
3682

3683
    This checks that the instance is in the cluster and is not running.
3684

3685
    """
3686
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3687
    assert instance is not None, \
3688
      "Cannot retrieve locked instance %s" % self.op.instance_name
3689
    _CheckNodeOnline(self, instance.primary_node)
3690

    
3691
    if instance.disk_template == constants.DT_DISKLESS:
3692
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3693
                                 self.op.instance_name)
3694
    if instance.admin_up:
3695
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3696
                                 self.op.instance_name)
3697
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3698
                                              instance.name,
3699
                                              instance.hypervisor)
3700
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3701
                      prereq=True)
3702
    if remote_info.payload:
3703
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3704
                                 (self.op.instance_name,
3705
                                  instance.primary_node))
3706

    
3707
    self.op.os_type = getattr(self.op, "os_type", None)
3708
    if self.op.os_type is not None:
3709
      # OS verification
3710
      pnode = self.cfg.GetNodeInfo(
3711
        self.cfg.ExpandNodeName(instance.primary_node))
3712
      if pnode is None:
3713
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3714
                                   self.op.pnode)
3715
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3716
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3717
                   (self.op.os_type, pnode.name), prereq=True)
3718

    
3719
    self.instance = instance
3720

    
3721
  def Exec(self, feedback_fn):
3722
    """Reinstall the instance.
3723

3724
    """
3725
    inst = self.instance
3726

    
3727
    if self.op.os_type is not None:
3728
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3729
      inst.os = self.op.os_type
3730
      self.cfg.Update(inst)
3731

    
3732
    _StartInstanceDisks(self, inst, None)
3733
    try:
3734
      feedback_fn("Running the instance OS create scripts...")
3735
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3736
      result.Raise("Could not install OS for instance %s on node %s" %
3737
                   (inst.name, inst.primary_node))
3738
    finally:
3739
      _ShutdownInstanceDisks(self, inst)
3740

    
3741

    
3742
class LURecreateInstanceDisks(LogicalUnit):
3743
  """Recreate an instance's missing disks.
3744

3745
  """
3746
  HPATH = "instance-recreate-disks"
3747
  HTYPE = constants.HTYPE_INSTANCE
3748
  _OP_REQP = ["instance_name", "disks"]
3749
  REQ_BGL = False
3750

    
3751
  def CheckArguments(self):
3752
    """Check the arguments.
3753

3754
    """
3755
    if not isinstance(self.op.disks, list):
3756
      raise errors.OpPrereqError("Invalid disks parameter")
3757
    for item in self.op.disks:
3758
      if (not isinstance(item, int) or
3759
          item < 0):
3760
        raise errors.OpPrereqError("Invalid disk specification '%s'" %
3761
                                   str(item))
3762

    
3763
  def ExpandNames(self):
3764
    self._ExpandAndLockInstance()
3765

    
3766
  def BuildHooksEnv(self):
3767
    """Build hooks env.
3768

3769
    This runs on master, primary and secondary nodes of the instance.
3770

3771
    """
3772
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3773
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3774
    return env, nl, nl
3775

    
3776
  def CheckPrereq(self):
3777
    """Check prerequisites.
3778

3779
    This checks that the instance is in the cluster and is not running.
3780

3781
    """
3782
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3783
    assert instance is not None, \
3784
      "Cannot retrieve locked instance %s" % self.op.instance_name
3785
    _CheckNodeOnline(self, instance.primary_node)
3786

    
3787
    if instance.disk_template == constants.DT_DISKLESS:
3788
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3789
                                 self.op.instance_name)
3790
    if instance.admin_up:
3791
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3792
                                 self.op.instance_name)
3793
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3794
                                              instance.name,
3795
                                              instance.hypervisor)
3796
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3797
                      prereq=True)
3798
    if remote_info.payload:
3799
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3800
                                 (self.op.instance_name,
3801
                                  instance.primary_node))
3802

    
3803
    if not self.op.disks:
3804
      self.op.disks = range(len(instance.disks))
3805
    else:
3806
      for idx in self.op.disks:
3807
        if idx >= len(instance.disks):
3808
          raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3809

    
3810
    self.instance = instance
3811

    
3812
  def Exec(self, feedback_fn):
3813
    """Recreate the disks.
3814

3815
    """
3816
    to_skip = []
3817
    for idx, disk in enumerate(self.instance.disks):
3818
      if idx not in self.op.disks: # disk idx has not been passed in
3819
        to_skip.append(idx)
3820
        continue
3821

    
3822
    _CreateDisks(self, self.instance, to_skip=to_skip)
3823

    
3824

    
3825
class LURenameInstance(LogicalUnit):
3826
  """Rename an instance.
3827

3828
  """
3829
  HPATH = "instance-rename"
3830
  HTYPE = constants.HTYPE_INSTANCE
3831
  _OP_REQP = ["instance_name", "new_name"]
3832

    
3833
  def BuildHooksEnv(self):
3834
    """Build hooks env.
3835

3836
    This runs on master, primary and secondary nodes of the instance.
3837

3838
    """
3839
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3840
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3841
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3842
    return env, nl, nl
3843

    
3844
  def CheckPrereq(self):
3845
    """Check prerequisites.
3846

3847
    This checks that the instance is in the cluster and is not running.
3848

3849
    """
3850
    instance = self.cfg.GetInstanceInfo(
3851
      self.cfg.ExpandInstanceName(self.op.instance_name))
3852
    if instance is None:
3853
      raise errors.OpPrereqError("Instance '%s' not known" %
3854
                                 self.op.instance_name)
3855
    _CheckNodeOnline(self, instance.primary_node)
3856

    
3857
    if instance.admin_up:
3858
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3859
                                 self.op.instance_name)
3860
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3861
                                              instance.name,
3862
                                              instance.hypervisor)
3863
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3864
                      prereq=True)
3865
    if remote_info.payload:
3866
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3867
                                 (self.op.instance_name,
3868
                                  instance.primary_node))
3869
    self.instance = instance
3870

    
3871
    # new name verification
3872
    name_info = utils.HostInfo(self.op.new_name)
3873

    
3874
    self.op.new_name = new_name = name_info.name
3875
    instance_list = self.cfg.GetInstanceList()
3876
    if new_name in instance_list:
3877
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3878
                                 new_name)
3879

    
3880
    if not getattr(self.op, "ignore_ip", False):
3881
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3882
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3883
                                   (name_info.ip, new_name))
3884

    
3885

    
3886
  def Exec(self, feedback_fn):
3887
    """Reinstall the instance.
3888

3889
    """
3890
    inst = self.instance
3891
    old_name = inst.name
3892

    
3893
    if inst.disk_template == constants.DT_FILE:
3894
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3895

    
3896
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3897
    # Change the instance lock. This is definitely safe while we hold the BGL
3898
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3899
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3900

    
3901
    # re-read the instance from the configuration after rename
3902
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3903

    
3904
    if inst.disk_template == constants.DT_FILE:
3905
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3906
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3907
                                                     old_file_storage_dir,
3908
                                                     new_file_storage_dir)
3909
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
3910
                   " (but the instance has been renamed in Ganeti)" %
3911
                   (inst.primary_node, old_file_storage_dir,
3912
                    new_file_storage_dir))
3913

    
3914
    _StartInstanceDisks(self, inst, None)
3915
    try:
3916
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3917
                                                 old_name)
3918
      msg = result.fail_msg
3919
      if msg:
3920
        msg = ("Could not run OS rename script for instance %s on node %s"
3921
               " (but the instance has been renamed in Ganeti): %s" %
3922
               (inst.name, inst.primary_node, msg))
3923
        self.proc.LogWarning(msg)
3924
    finally:
3925
      _ShutdownInstanceDisks(self, inst)
3926

    
3927

    
3928
class LURemoveInstance(LogicalUnit):
3929
  """Remove an instance.
3930

3931
  """
3932
  HPATH = "instance-remove"
3933
  HTYPE = constants.HTYPE_INSTANCE
3934
  _OP_REQP = ["instance_name", "ignore_failures"]
3935
  REQ_BGL = False
3936

    
3937
  def ExpandNames(self):
3938
    self._ExpandAndLockInstance()
3939
    self.needed_locks[locking.LEVEL_NODE] = []
3940
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3941

    
3942
  def DeclareLocks(self, level):
3943
    if level == locking.LEVEL_NODE:
3944
      self._LockInstancesNodes()
3945

    
3946
  def BuildHooksEnv(self):
3947
    """Build hooks env.
3948

3949
    This runs on master, primary and secondary nodes of the instance.
3950

3951
    """
3952
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3953
    nl = [self.cfg.GetMasterNode()]
3954
    return env, nl, nl
3955

    
3956
  def CheckPrereq(self):
3957
    """Check prerequisites.
3958

3959
    This checks that the instance is in the cluster.
3960

3961
    """
3962
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3963
    assert self.instance is not None, \
3964
      "Cannot retrieve locked instance %s" % self.op.instance_name
3965

    
3966
  def Exec(self, feedback_fn):
3967
    """Remove the instance.
3968

3969
    """
3970
    instance = self.instance
3971
    logging.info("Shutting down instance %s on node %s",
3972
                 instance.name, instance.primary_node)
3973

    
3974
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3975
    msg = result.fail_msg
3976
    if msg:
3977
      if self.op.ignore_failures:
3978
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3979
      else:
3980
        raise errors.OpExecError("Could not shutdown instance %s on"
3981
                                 " node %s: %s" %
3982
                                 (instance.name, instance.primary_node, msg))
3983

    
3984
    logging.info("Removing block devices for instance %s", instance.name)
3985

    
3986
    if not _RemoveDisks(self, instance):
3987
      if self.op.ignore_failures:
3988
        feedback_fn("Warning: can't remove instance's disks")
3989
      else:
3990
        raise errors.OpExecError("Can't remove instance's disks")
3991

    
3992
    logging.info("Removing instance %s out of cluster config", instance.name)
3993

    
3994
    self.cfg.RemoveInstance(instance.name)
3995
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3996

    
3997

    
3998
class LUQueryInstances(NoHooksLU):
3999
  """Logical unit for querying instances.
4000