Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 3cb5c1e3

History | View | Annotate | Download (286.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq (except when tasklets are used)
51
    - implement Exec (except when tasklets are used)
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  @ivar dry_run_result: the value (if any) that will be returned to the caller
60
      in dry-run mode (signalled by opcode dry_run parameter)
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overridden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92
    self.LogStep = processor.LogStep
93
    # support for dry-run
94
    self.dry_run_result = None
95

    
96
    # Tasklets
97
    self.tasklets = None
98

    
99
    for attr_name in self._OP_REQP:
100
      attr_val = getattr(op, attr_name, None)
101
      if attr_val is None:
102
        raise errors.OpPrereqError("Required parameter '%s' missing" %
103
                                   attr_name)
104

    
105
    self.CheckArguments()
106

    
107
  def __GetSSH(self):
108
    """Returns the SshRunner object
109

110
    """
111
    if not self.__ssh:
112
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113
    return self.__ssh
114

    
115
  ssh = property(fget=__GetSSH)
116

    
117
  def CheckArguments(self):
118
    """Check syntactic validity for the opcode arguments.
119

120
    This method is for doing a simple syntactic check and ensure
121
    validity of opcode parameters, without any cluster-related
122
    checks. While the same can be accomplished in ExpandNames and/or
123
    CheckPrereq, doing these separate is better because:
124

125
      - ExpandNames is left as as purely a lock-related function
126
      - CheckPrereq is run after we have acquired locks (and possible
127
        waited for them)
128

129
    The function is allowed to change the self.op attribute so that
130
    later methods can no longer worry about missing parameters.
131

132
    """
133
    pass
134

    
135
  def ExpandNames(self):
136
    """Expand names for this LU.
137

138
    This method is called before starting to execute the opcode, and it should
139
    update all the parameters of the opcode to their canonical form (e.g. a
140
    short node name must be fully expanded after this method has successfully
141
    completed). This way locking, hooks, logging, ecc. can work correctly.
142

143
    LUs which implement this method must also populate the self.needed_locks
144
    member, as a dict with lock levels as keys, and a list of needed lock names
145
    as values. Rules:
146

147
      - use an empty dict if you don't need any lock
148
      - if you don't need any lock at a particular level omit that level
149
      - don't put anything for the BGL level
150
      - if you want all locks at a level use locking.ALL_SET as a value
151

152
    If you need to share locks (rather than acquire them exclusively) at one
153
    level you can modify self.share_locks, setting a true value (usually 1) for
154
    that level. By default locks are not shared.
155

156
    This function can also define a list of tasklets, which then will be
157
    executed in order instead of the usual LU-level CheckPrereq and Exec
158
    functions, if those are not defined by the LU.
159

160
    Examples::
161

162
      # Acquire all nodes and one instance
163
      self.needed_locks = {
164
        locking.LEVEL_NODE: locking.ALL_SET,
165
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
166
      }
167
      # Acquire just two nodes
168
      self.needed_locks = {
169
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
170
      }
171
      # Acquire no locks
172
      self.needed_locks = {} # No, you can't leave it to the default value None
173

174
    """
175
    # The implementation of this method is mandatory only if the new LU is
176
    # concurrent, so that old LUs don't need to be changed all at the same
177
    # time.
178
    if self.REQ_BGL:
179
      self.needed_locks = {} # Exclusive LUs don't need locks.
180
    else:
181
      raise NotImplementedError
182

    
183
  def DeclareLocks(self, level):
184
    """Declare LU locking needs for a level
185

186
    While most LUs can just declare their locking needs at ExpandNames time,
187
    sometimes there's the need to calculate some locks after having acquired
188
    the ones before. This function is called just before acquiring locks at a
189
    particular level, but after acquiring the ones at lower levels, and permits
190
    such calculations. It can be used to modify self.needed_locks, and by
191
    default it does nothing.
192

193
    This function is only called if you have something already set in
194
    self.needed_locks for the level.
195

196
    @param level: Locking level which is going to be locked
197
    @type level: member of ganeti.locking.LEVELS
198

199
    """
200

    
201
  def CheckPrereq(self):
202
    """Check prerequisites for this LU.
203

204
    This method should check that the prerequisites for the execution
205
    of this LU are fulfilled. It can do internode communication, but
206
    it should be idempotent - no cluster or system changes are
207
    allowed.
208

209
    The method should raise errors.OpPrereqError in case something is
210
    not fulfilled. Its return value is ignored.
211

212
    This method should also update all the parameters of the opcode to
213
    their canonical form if it hasn't been done by ExpandNames before.
214

215
    """
216
    if self.tasklets is not None:
217
      for (idx, tl) in enumerate(self.tasklets):
218
        logging.debug("Checking prerequisites for tasklet %s/%s",
219
                      idx + 1, len(self.tasklets))
220
        tl.CheckPrereq()
221
    else:
222
      raise NotImplementedError
223

    
224
  def Exec(self, feedback_fn):
225
    """Execute the LU.
226

227
    This method should implement the actual work. It should raise
228
    errors.OpExecError for failures that are somewhat dealt with in
229
    code, or expected.
230

231
    """
232
    if self.tasklets is not None:
233
      for (idx, tl) in enumerate(self.tasklets):
234
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
235
        tl.Exec(feedback_fn)
236
    else:
237
      raise NotImplementedError
238

    
239
  def BuildHooksEnv(self):
240
    """Build hooks environment for this LU.
241

242
    This method should return a three-node tuple consisting of: a dict
243
    containing the environment that will be used for running the
244
    specific hook for this LU, a list of node names on which the hook
245
    should run before the execution, and a list of node names on which
246
    the hook should run after the execution.
247

248
    The keys of the dict must not have 'GANETI_' prefixed as this will
249
    be handled in the hooks runner. Also note additional keys will be
250
    added by the hooks runner. If the LU doesn't define any
251
    environment, an empty dict (and not None) should be returned.
252

253
    No nodes should be returned as an empty list (and not None).
254

255
    Note that if the HPATH for a LU class is None, this function will
256
    not be called.
257

258
    """
259
    raise NotImplementedError
260

    
261
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262
    """Notify the LU about the results of its hooks.
263

264
    This method is called every time a hooks phase is executed, and notifies
265
    the Logical Unit about the hooks' result. The LU can then use it to alter
266
    its result based on the hooks.  By default the method does nothing and the
267
    previous result is passed back unchanged but any LU can define it if it
268
    wants to use the local cluster hook-scripts somehow.
269

270
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
271
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272
    @param hook_results: the results of the multi-node hooks rpc call
273
    @param feedback_fn: function used send feedback back to the caller
274
    @param lu_result: the previous Exec result this LU had, or None
275
        in the PRE phase
276
    @return: the new Exec result, based on the previous result
277
        and hook results
278

279
    """
280
    return lu_result
281

    
282
  def _ExpandAndLockInstance(self):
283
    """Helper function to expand and lock an instance.
284

285
    Many LUs that work on an instance take its name in self.op.instance_name
286
    and need to expand it and then declare the expanded name for locking. This
287
    function does it, and then updates self.op.instance_name to the expanded
288
    name. It also initializes needed_locks as a dict, if this hasn't been done
289
    before.
290

291
    """
292
    if self.needed_locks is None:
293
      self.needed_locks = {}
294
    else:
295
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296
        "_ExpandAndLockInstance called with instance-level locks set"
297
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298
    if expanded_name is None:
299
      raise errors.OpPrereqError("Instance '%s' not known" %
300
                                  self.op.instance_name)
301
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302
    self.op.instance_name = expanded_name
303

    
304
  def _LockInstancesNodes(self, primary_only=False):
305
    """Helper function to declare instances' nodes for locking.
306

307
    This function should be called after locking one or more instances to lock
308
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309
    with all primary or secondary nodes for instances already locked and
310
    present in self.needed_locks[locking.LEVEL_INSTANCE].
311

312
    It should be called from DeclareLocks, and for safety only works if
313
    self.recalculate_locks[locking.LEVEL_NODE] is set.
314

315
    In the future it may grow parameters to just lock some instance's nodes, or
316
    to just lock primaries or secondary nodes, if needed.
317

318
    If should be called in DeclareLocks in a way similar to::
319

320
      if level == locking.LEVEL_NODE:
321
        self._LockInstancesNodes()
322

323
    @type primary_only: boolean
324
    @param primary_only: only lock primary nodes of locked instances
325

326
    """
327
    assert locking.LEVEL_NODE in self.recalculate_locks, \
328
      "_LockInstancesNodes helper function called with no nodes to recalculate"
329

    
330
    # TODO: check if we're really been called with the instance locks held
331

    
332
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333
    # future we might want to have different behaviors depending on the value
334
    # of self.recalculate_locks[locking.LEVEL_NODE]
335
    wanted_nodes = []
336
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337
      instance = self.context.cfg.GetInstanceInfo(instance_name)
338
      wanted_nodes.append(instance.primary_node)
339
      if not primary_only:
340
        wanted_nodes.extend(instance.secondary_nodes)
341

    
342
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
346

    
347
    del self.recalculate_locks[locking.LEVEL_NODE]
348

    
349

    
350
class NoHooksLU(LogicalUnit):
351
  """Simple LU which runs no hooks.
352

353
  This LU is intended as a parent for other LogicalUnits which will
354
  run no hooks, in order to reduce duplicate code.
355

356
  """
357
  HPATH = None
358
  HTYPE = None
359

    
360

    
361
class Tasklet:
362
  """Tasklet base class.
363

364
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
366
  tasklets know nothing about locks.
367

368
  Subclasses must follow these rules:
369
    - Implement CheckPrereq
370
    - Implement Exec
371

372
  """
373
  def __init__(self, lu):
374
    self.lu = lu
375

    
376
    # Shortcuts
377
    self.cfg = lu.cfg
378
    self.rpc = lu.rpc
379

    
380
  def CheckPrereq(self):
381
    """Check prerequisites for this tasklets.
382

383
    This method should check whether the prerequisites for the execution of
384
    this tasklet are fulfilled. It can do internode communication, but it
385
    should be idempotent - no cluster or system changes are allowed.
386

387
    The method should raise errors.OpPrereqError in case something is not
388
    fulfilled. Its return value is ignored.
389

390
    This method should also update all parameters to their canonical form if it
391
    hasn't been done before.
392

393
    """
394
    raise NotImplementedError
395

    
396
  def Exec(self, feedback_fn):
397
    """Execute the tasklet.
398

399
    This method should implement the actual work. It should raise
400
    errors.OpExecError for failures that are somewhat dealt with in code, or
401
    expected.
402

403
    """
404
    raise NotImplementedError
405

    
406

    
407
def _GetWantedNodes(lu, nodes):
408
  """Returns list of checked and expanded node names.
409

410
  @type lu: L{LogicalUnit}
411
  @param lu: the logical unit on whose behalf we execute
412
  @type nodes: list
413
  @param nodes: list of node names or None for all nodes
414
  @rtype: list
415
  @return: the list of nodes, sorted
416
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
417

418
  """
419
  if not isinstance(nodes, list):
420
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
421

    
422
  if not nodes:
423
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424
      " non-empty list of nodes whose name is to be expanded.")
425

    
426
  wanted = []
427
  for name in nodes:
428
    node = lu.cfg.ExpandNodeName(name)
429
    if node is None:
430
      raise errors.OpPrereqError("No such node name '%s'" % name)
431
    wanted.append(node)
432

    
433
  return utils.NiceSort(wanted)
434

    
435

    
436
def _GetWantedInstances(lu, instances):
437
  """Returns list of checked and expanded instance names.
438

439
  @type lu: L{LogicalUnit}
440
  @param lu: the logical unit on whose behalf we execute
441
  @type instances: list
442
  @param instances: list of instance names or None for all instances
443
  @rtype: list
444
  @return: the list of instances, sorted
445
  @raise errors.OpPrereqError: if the instances parameter is wrong type
446
  @raise errors.OpPrereqError: if any of the passed instances is not found
447

448
  """
449
  if not isinstance(instances, list):
450
    raise errors.OpPrereqError("Invalid argument type 'instances'")
451

    
452
  if instances:
453
    wanted = []
454

    
455
    for name in instances:
456
      instance = lu.cfg.ExpandInstanceName(name)
457
      if instance is None:
458
        raise errors.OpPrereqError("No such instance name '%s'" % name)
459
      wanted.append(instance)
460

    
461
  else:
462
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
463
  return wanted
464

    
465

    
466
def _CheckOutputFields(static, dynamic, selected):
467
  """Checks whether all selected fields are valid.
468

469
  @type static: L{utils.FieldSet}
470
  @param static: static fields set
471
  @type dynamic: L{utils.FieldSet}
472
  @param dynamic: dynamic fields set
473

474
  """
475
  f = utils.FieldSet()
476
  f.Extend(static)
477
  f.Extend(dynamic)
478

    
479
  delta = f.NonMatching(selected)
480
  if delta:
481
    raise errors.OpPrereqError("Unknown output fields selected: %s"
482
                               % ",".join(delta))
483

    
484

    
485
def _CheckBooleanOpField(op, name):
486
  """Validates boolean opcode parameters.
487

488
  This will ensure that an opcode parameter is either a boolean value,
489
  or None (but that it always exists).
490

491
  """
492
  val = getattr(op, name, None)
493
  if not (val is None or isinstance(val, bool)):
494
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
495
                               (name, str(val)))
496
  setattr(op, name, val)
497

    
498

    
499
def _CheckNodeOnline(lu, node):
500
  """Ensure that a given node is online.
501

502
  @param lu: the LU on behalf of which we make the check
503
  @param node: the node to check
504
  @raise errors.OpPrereqError: if the node is offline
505

506
  """
507
  if lu.cfg.GetNodeInfo(node).offline:
508
    raise errors.OpPrereqError("Can't use offline node %s" % node)
509

    
510

    
511
def _CheckNodeNotDrained(lu, node):
512
  """Ensure that a given node is not drained.
513

514
  @param lu: the LU on behalf of which we make the check
515
  @param node: the node to check
516
  @raise errors.OpPrereqError: if the node is drained
517

518
  """
519
  if lu.cfg.GetNodeInfo(node).drained:
520
    raise errors.OpPrereqError("Can't use drained node %s" % node)
521

    
522

    
523
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524
                          memory, vcpus, nics, disk_template, disks,
525
                          bep, hvp, hypervisor_name):
526
  """Builds instance related env variables for hooks
527

528
  This builds the hook environment from individual variables.
529

530
  @type name: string
531
  @param name: the name of the instance
532
  @type primary_node: string
533
  @param primary_node: the name of the instance's primary node
534
  @type secondary_nodes: list
535
  @param secondary_nodes: list of secondary nodes as strings
536
  @type os_type: string
537
  @param os_type: the name of the instance's OS
538
  @type status: boolean
539
  @param status: the should_run status of the instance
540
  @type memory: string
541
  @param memory: the memory size of the instance
542
  @type vcpus: string
543
  @param vcpus: the count of VCPUs the instance has
544
  @type nics: list
545
  @param nics: list of tuples (ip, mac, mode, link) representing
546
      the NICs the instance has
547
  @type disk_template: string
548
  @param disk_template: the disk template of the instance
549
  @type disks: list
550
  @param disks: the list of (size, mode) pairs
551
  @type bep: dict
552
  @param bep: the backend parameters for the instance
553
  @type hvp: dict
554
  @param hvp: the hypervisor parameters for the instance
555
  @type hypervisor_name: string
556
  @param hypervisor_name: the hypervisor for the instance
557
  @rtype: dict
558
  @return: the hook environment for this instance
559

560
  """
561
  if status:
562
    str_status = "up"
563
  else:
564
    str_status = "down"
565
  env = {
566
    "OP_TARGET": name,
567
    "INSTANCE_NAME": name,
568
    "INSTANCE_PRIMARY": primary_node,
569
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570
    "INSTANCE_OS_TYPE": os_type,
571
    "INSTANCE_STATUS": str_status,
572
    "INSTANCE_MEMORY": memory,
573
    "INSTANCE_VCPUS": vcpus,
574
    "INSTANCE_DISK_TEMPLATE": disk_template,
575
    "INSTANCE_HYPERVISOR": hypervisor_name,
576
  }
577

    
578
  if nics:
579
    nic_count = len(nics)
580
    for idx, (ip, mac, mode, link) in enumerate(nics):
581
      if ip is None:
582
        ip = ""
583
      env["INSTANCE_NIC%d_IP" % idx] = ip
584
      env["INSTANCE_NIC%d_MAC" % idx] = mac
585
      env["INSTANCE_NIC%d_MODE" % idx] = mode
586
      env["INSTANCE_NIC%d_LINK" % idx] = link
587
      if mode == constants.NIC_MODE_BRIDGED:
588
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
589
  else:
590
    nic_count = 0
591

    
592
  env["INSTANCE_NIC_COUNT"] = nic_count
593

    
594
  if disks:
595
    disk_count = len(disks)
596
    for idx, (size, mode) in enumerate(disks):
597
      env["INSTANCE_DISK%d_SIZE" % idx] = size
598
      env["INSTANCE_DISK%d_MODE" % idx] = mode
599
  else:
600
    disk_count = 0
601

    
602
  env["INSTANCE_DISK_COUNT"] = disk_count
603

    
604
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
605
    for key, value in source.items():
606
      env["INSTANCE_%s_%s" % (kind, key)] = value
607

    
608
  return env
609

    
610

    
611
def _NICListToTuple(lu, nics):
612
  """Build a list of nic information tuples.
613

614
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615
  value in LUQueryInstanceData.
616

617
  @type lu:  L{LogicalUnit}
618
  @param lu: the logical unit on whose behalf we execute
619
  @type nics: list of L{objects.NIC}
620
  @param nics: list of nics to convert to hooks tuples
621

622
  """
623
  hooks_nics = []
624
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
625
  for nic in nics:
626
    ip = nic.ip
627
    mac = nic.mac
628
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629
    mode = filled_params[constants.NIC_MODE]
630
    link = filled_params[constants.NIC_LINK]
631
    hooks_nics.append((ip, mac, mode, link))
632
  return hooks_nics
633

    
634

    
635
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636
  """Builds instance related env variables for hooks from an object.
637

638
  @type lu: L{LogicalUnit}
639
  @param lu: the logical unit on whose behalf we execute
640
  @type instance: L{objects.Instance}
641
  @param instance: the instance for which we should build the
642
      environment
643
  @type override: dict
644
  @param override: dictionary with key/values that will override
645
      our values
646
  @rtype: dict
647
  @return: the hook environment dictionary
648

649
  """
650
  cluster = lu.cfg.GetClusterInfo()
651
  bep = cluster.FillBE(instance)
652
  hvp = cluster.FillHV(instance)
653
  args = {
654
    'name': instance.name,
655
    'primary_node': instance.primary_node,
656
    'secondary_nodes': instance.secondary_nodes,
657
    'os_type': instance.os,
658
    'status': instance.admin_up,
659
    'memory': bep[constants.BE_MEMORY],
660
    'vcpus': bep[constants.BE_VCPUS],
661
    'nics': _NICListToTuple(lu, instance.nics),
662
    'disk_template': instance.disk_template,
663
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
664
    'bep': bep,
665
    'hvp': hvp,
666
    'hypervisor_name': instance.hypervisor,
667
  }
668
  if override:
669
    args.update(override)
670
  return _BuildInstanceHookEnv(**args)
671

    
672

    
673
def _AdjustCandidatePool(lu):
674
  """Adjust the candidate pool after node operations.
675

676
  """
677
  mod_list = lu.cfg.MaintainCandidatePool()
678
  if mod_list:
679
    lu.LogInfo("Promoted nodes to master candidate role: %s",
680
               ", ".join(node.name for node in mod_list))
681
    for name in mod_list:
682
      lu.context.ReaddNode(name)
683
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
684
  if mc_now > mc_max:
685
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
686
               (mc_now, mc_max))
687

    
688

    
689
def _CheckNicsBridgesExist(lu, target_nics, target_node,
690
                               profile=constants.PP_DEFAULT):
691
  """Check that the brigdes needed by a list of nics exist.
692

693
  """
694
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696
                for nic in target_nics]
697
  brlist = [params[constants.NIC_LINK] for params in paramslist
698
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
699
  if brlist:
700
    result = lu.rpc.call_bridges_exist(target_node, brlist)
701
    result.Raise("Error checking bridges on destination node '%s'" %
702
                 target_node, prereq=True)
703

    
704

    
705
def _CheckInstanceBridgesExist(lu, instance, node=None):
706
  """Check that the brigdes needed by an instance exist.
707

708
  """
709
  if node is None:
710
    node = instance.primary_node
711
  _CheckNicsBridgesExist(lu, instance.nics, node)
712

    
713

    
714
def _GetNodeInstancesInner(cfg, fn):
715
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
716

    
717

    
718
def _GetNodeInstances(cfg, node_name):
719
  """Returns a list of all primary and secondary instances on a node.
720

721
  """
722

    
723
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
724

    
725

    
726
def _GetNodePrimaryInstances(cfg, node_name):
727
  """Returns primary instances on a node.
728

729
  """
730
  return _GetNodeInstancesInner(cfg,
731
                                lambda inst: node_name == inst.primary_node)
732

    
733

    
734
def _GetNodeSecondaryInstances(cfg, node_name):
735
  """Returns secondary instances on a node.
736

737
  """
738
  return _GetNodeInstancesInner(cfg,
739
                                lambda inst: node_name in inst.secondary_nodes)
740

    
741

    
742
def _GetStorageTypeArgs(cfg, storage_type):
743
  """Returns the arguments for a storage type.
744

745
  """
746
  # Special case for file storage
747
  if storage_type == constants.ST_FILE:
748
    # storage.FileStorage wants a list of storage directories
749
    return [[cfg.GetFileStorageDir()]]
750

    
751
  return []
752

    
753

    
754
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
755
  faulty = []
756

    
757
  for dev in instance.disks:
758
    cfg.SetDiskID(dev, node_name)
759

    
760
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
761
  result.Raise("Failed to get disk status from node %s" % node_name,
762
               prereq=prereq)
763

    
764
  for idx, bdev_status in enumerate(result.payload):
765
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
766
      faulty.append(idx)
767

    
768
  return faulty
769

    
770

    
771
class LUPostInitCluster(LogicalUnit):
772
  """Logical unit for running hooks after cluster initialization.
773

774
  """
775
  HPATH = "cluster-init"
776
  HTYPE = constants.HTYPE_CLUSTER
777
  _OP_REQP = []
778

    
779
  def BuildHooksEnv(self):
780
    """Build hooks env.
781

782
    """
783
    env = {"OP_TARGET": self.cfg.GetClusterName()}
784
    mn = self.cfg.GetMasterNode()
785
    return env, [], [mn]
786

    
787
  def CheckPrereq(self):
788
    """No prerequisites to check.
789

790
    """
791
    return True
792

    
793
  def Exec(self, feedback_fn):
794
    """Nothing to do.
795

796
    """
797
    return True
798

    
799

    
800
class LUDestroyCluster(NoHooksLU):
801
  """Logical unit for destroying the cluster.
802

803
  """
804
  _OP_REQP = []
805

    
806
  def CheckPrereq(self):
807
    """Check prerequisites.
808

809
    This checks whether the cluster is empty.
810

811
    Any errors are signaled by raising errors.OpPrereqError.
812

813
    """
814
    master = self.cfg.GetMasterNode()
815

    
816
    nodelist = self.cfg.GetNodeList()
817
    if len(nodelist) != 1 or nodelist[0] != master:
818
      raise errors.OpPrereqError("There are still %d node(s) in"
819
                                 " this cluster." % (len(nodelist) - 1))
820
    instancelist = self.cfg.GetInstanceList()
821
    if instancelist:
822
      raise errors.OpPrereqError("There are still %d instance(s) in"
823
                                 " this cluster." % len(instancelist))
824

    
825
  def Exec(self, feedback_fn):
826
    """Destroys the cluster.
827

828
    """
829
    master = self.cfg.GetMasterNode()
830
    result = self.rpc.call_node_stop_master(master, False)
831
    result.Raise("Could not disable the master role")
832
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
833
    utils.CreateBackup(priv_key)
834
    utils.CreateBackup(pub_key)
835
    return master
836

    
837

    
838
class LUVerifyCluster(LogicalUnit):
839
  """Verifies the cluster status.
840

841
  """
842
  HPATH = "cluster-verify"
843
  HTYPE = constants.HTYPE_CLUSTER
844
  _OP_REQP = ["skip_checks"]
845
  REQ_BGL = False
846

    
847
  def ExpandNames(self):
848
    self.needed_locks = {
849
      locking.LEVEL_NODE: locking.ALL_SET,
850
      locking.LEVEL_INSTANCE: locking.ALL_SET,
851
    }
852
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
853

    
854
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
855
                  node_result, feedback_fn, master_files,
856
                  drbd_map, vg_name):
857
    """Run multiple tests against a node.
858

859
    Test list:
860

861
      - compares ganeti version
862
      - checks vg existence and size > 20G
863
      - checks config file checksum
864
      - checks ssh to other nodes
865

866
    @type nodeinfo: L{objects.Node}
867
    @param nodeinfo: the node to check
868
    @param file_list: required list of files
869
    @param local_cksum: dictionary of local files and their checksums
870
    @param node_result: the results from the node
871
    @param feedback_fn: function used to accumulate results
872
    @param master_files: list of files that only masters should have
873
    @param drbd_map: the useddrbd minors for this node, in
874
        form of minor: (instance, must_exist) which correspond to instances
875
        and their running status
876
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
877

878
    """
879
    node = nodeinfo.name
880

    
881
    # main result, node_result should be a non-empty dict
882
    if not node_result or not isinstance(node_result, dict):
883
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
884
      return True
885

    
886
    # compares ganeti version
887
    local_version = constants.PROTOCOL_VERSION
888
    remote_version = node_result.get('version', None)
889
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
890
            len(remote_version) == 2):
891
      feedback_fn("  - ERROR: connection to %s failed" % (node))
892
      return True
893

    
894
    if local_version != remote_version[0]:
895
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
896
                  " node %s %s" % (local_version, node, remote_version[0]))
897
      return True
898

    
899
    # node seems compatible, we can actually try to look into its results
900

    
901
    bad = False
902

    
903
    # full package version
904
    if constants.RELEASE_VERSION != remote_version[1]:
905
      feedback_fn("  - WARNING: software version mismatch: master %s,"
906
                  " node %s %s" %
907
                  (constants.RELEASE_VERSION, node, remote_version[1]))
908

    
909
    # checks vg existence and size > 20G
910
    if vg_name is not None:
911
      vglist = node_result.get(constants.NV_VGLIST, None)
912
      if not vglist:
913
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
914
                        (node,))
915
        bad = True
916
      else:
917
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
918
                                              constants.MIN_VG_SIZE)
919
        if vgstatus:
920
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
921
          bad = True
922

    
923
    # checks config file checksum
924

    
925
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
926
    if not isinstance(remote_cksum, dict):
927
      bad = True
928
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
929
    else:
930
      for file_name in file_list:
931
        node_is_mc = nodeinfo.master_candidate
932
        must_have_file = file_name not in master_files
933
        if file_name not in remote_cksum:
934
          if node_is_mc or must_have_file:
935
            bad = True
936
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
937
        elif remote_cksum[file_name] != local_cksum[file_name]:
938
          if node_is_mc or must_have_file:
939
            bad = True
940
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
941
          else:
942
            # not candidate and this is not a must-have file
943
            bad = True
944
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
945
                        " candidates (and the file is outdated)" % file_name)
946
        else:
947
          # all good, except non-master/non-must have combination
948
          if not node_is_mc and not must_have_file:
949
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
950
                        " candidates" % file_name)
951

    
952
    # checks ssh to any
953

    
954
    if constants.NV_NODELIST not in node_result:
955
      bad = True
956
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
957
    else:
958
      if node_result[constants.NV_NODELIST]:
959
        bad = True
960
        for node in node_result[constants.NV_NODELIST]:
961
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
962
                          (node, node_result[constants.NV_NODELIST][node]))
963

    
964
    if constants.NV_NODENETTEST not in node_result:
965
      bad = True
966
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
967
    else:
968
      if node_result[constants.NV_NODENETTEST]:
969
        bad = True
970
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
971
        for node in nlist:
972
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
973
                          (node, node_result[constants.NV_NODENETTEST][node]))
974

    
975
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
976
    if isinstance(hyp_result, dict):
977
      for hv_name, hv_result in hyp_result.iteritems():
978
        if hv_result is not None:
979
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
980
                      (hv_name, hv_result))
981

    
982
    # check used drbd list
983
    if vg_name is not None:
984
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
985
      if not isinstance(used_minors, (tuple, list)):
986
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
987
                    str(used_minors))
988
      else:
989
        for minor, (iname, must_exist) in drbd_map.items():
990
          if minor not in used_minors and must_exist:
991
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
992
                        " not active" % (minor, iname))
993
            bad = True
994
        for minor in used_minors:
995
          if minor not in drbd_map:
996
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
997
                        minor)
998
            bad = True
999

    
1000
    return bad
1001

    
1002
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1003
                      node_instance, feedback_fn, n_offline):
1004
    """Verify an instance.
1005

1006
    This function checks to see if the required block devices are
1007
    available on the instance's node.
1008

1009
    """
1010
    bad = False
1011

    
1012
    node_current = instanceconfig.primary_node
1013

    
1014
    node_vol_should = {}
1015
    instanceconfig.MapLVsByNode(node_vol_should)
1016

    
1017
    for node in node_vol_should:
1018
      if node in n_offline:
1019
        # ignore missing volumes on offline nodes
1020
        continue
1021
      for volume in node_vol_should[node]:
1022
        if node not in node_vol_is or volume not in node_vol_is[node]:
1023
          feedback_fn("  - ERROR: volume %s missing on node %s" %
1024
                          (volume, node))
1025
          bad = True
1026

    
1027
    if instanceconfig.admin_up:
1028
      if ((node_current not in node_instance or
1029
          not instance in node_instance[node_current]) and
1030
          node_current not in n_offline):
1031
        feedback_fn("  - ERROR: instance %s not running on node %s" %
1032
                        (instance, node_current))
1033
        bad = True
1034

    
1035
    for node in node_instance:
1036
      if (not node == node_current):
1037
        if instance in node_instance[node]:
1038
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
1039
                          (instance, node))
1040
          bad = True
1041

    
1042
    return bad
1043

    
1044
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
1045
    """Verify if there are any unknown volumes in the cluster.
1046

1047
    The .os, .swap and backup volumes are ignored. All other volumes are
1048
    reported as unknown.
1049

1050
    """
1051
    bad = False
1052

    
1053
    for node in node_vol_is:
1054
      for volume in node_vol_is[node]:
1055
        if node not in node_vol_should or volume not in node_vol_should[node]:
1056
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
1057
                      (volume, node))
1058
          bad = True
1059
    return bad
1060

    
1061
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1062
    """Verify the list of running instances.
1063

1064
    This checks what instances are running but unknown to the cluster.
1065

1066
    """
1067
    bad = False
1068
    for node in node_instance:
1069
      for runninginstance in node_instance[node]:
1070
        if runninginstance not in instancelist:
1071
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
1072
                          (runninginstance, node))
1073
          bad = True
1074
    return bad
1075

    
1076
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1077
    """Verify N+1 Memory Resilience.
1078

1079
    Check that if one single node dies we can still start all the instances it
1080
    was primary for.
1081

1082
    """
1083
    bad = False
1084

    
1085
    for node, nodeinfo in node_info.iteritems():
1086
      # This code checks that every node which is now listed as secondary has
1087
      # enough memory to host all instances it is supposed to should a single
1088
      # other node in the cluster fail.
1089
      # FIXME: not ready for failover to an arbitrary node
1090
      # FIXME: does not support file-backed instances
1091
      # WARNING: we currently take into account down instances as well as up
1092
      # ones, considering that even if they're down someone might want to start
1093
      # them even in the event of a node failure.
1094
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1095
        needed_mem = 0
1096
        for instance in instances:
1097
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1098
          if bep[constants.BE_AUTO_BALANCE]:
1099
            needed_mem += bep[constants.BE_MEMORY]
1100
        if nodeinfo['mfree'] < needed_mem:
1101
          feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
1102
                      " failovers should node %s fail" % (node, prinode))
1103
          bad = True
1104
    return bad
1105

    
1106
  def CheckPrereq(self):
1107
    """Check prerequisites.
1108

1109
    Transform the list of checks we're going to skip into a set and check that
1110
    all its members are valid.
1111

1112
    """
1113
    self.skip_set = frozenset(self.op.skip_checks)
1114
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1115
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
1116

    
1117
  def BuildHooksEnv(self):
1118
    """Build hooks env.
1119

1120
    Cluster-Verify hooks just ran in the post phase and their failure makes
1121
    the output be logged in the verify output and the verification to fail.
1122

1123
    """
1124
    all_nodes = self.cfg.GetNodeList()
1125
    env = {
1126
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1127
      }
1128
    for node in self.cfg.GetAllNodesInfo().values():
1129
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1130

    
1131
    return env, [], all_nodes
1132

    
1133
  def Exec(self, feedback_fn):
1134
    """Verify integrity of cluster, performing various test on nodes.
1135

1136
    """
1137
    bad = False
1138
    feedback_fn("* Verifying global settings")
1139
    for msg in self.cfg.VerifyConfig():
1140
      feedback_fn("  - ERROR: %s" % msg)
1141

    
1142
    vg_name = self.cfg.GetVGName()
1143
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1144
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1145
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1146
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1147
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1148
                        for iname in instancelist)
1149
    i_non_redundant = [] # Non redundant instances
1150
    i_non_a_balanced = [] # Non auto-balanced instances
1151
    n_offline = [] # List of offline nodes
1152
    n_drained = [] # List of nodes being drained
1153
    node_volume = {}
1154
    node_instance = {}
1155
    node_info = {}
1156
    instance_cfg = {}
1157

    
1158
    # FIXME: verify OS list
1159
    # do local checksums
1160
    master_files = [constants.CLUSTER_CONF_FILE]
1161

    
1162
    file_names = ssconf.SimpleStore().GetFileList()
1163
    file_names.append(constants.SSL_CERT_FILE)
1164
    file_names.append(constants.RAPI_CERT_FILE)
1165
    file_names.extend(master_files)
1166

    
1167
    local_checksums = utils.FingerprintFiles(file_names)
1168

    
1169
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1170
    node_verify_param = {
1171
      constants.NV_FILELIST: file_names,
1172
      constants.NV_NODELIST: [node.name for node in nodeinfo
1173
                              if not node.offline],
1174
      constants.NV_HYPERVISOR: hypervisors,
1175
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1176
                                  node.secondary_ip) for node in nodeinfo
1177
                                 if not node.offline],
1178
      constants.NV_INSTANCELIST: hypervisors,
1179
      constants.NV_VERSION: None,
1180
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1181
      }
1182
    if vg_name is not None:
1183
      node_verify_param[constants.NV_VGLIST] = None
1184
      node_verify_param[constants.NV_LVLIST] = vg_name
1185
      node_verify_param[constants.NV_DRBDLIST] = None
1186
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1187
                                           self.cfg.GetClusterName())
1188

    
1189
    cluster = self.cfg.GetClusterInfo()
1190
    master_node = self.cfg.GetMasterNode()
1191
    all_drbd_map = self.cfg.ComputeDRBDMap()
1192

    
1193
    for node_i in nodeinfo:
1194
      node = node_i.name
1195

    
1196
      if node_i.offline:
1197
        feedback_fn("* Skipping offline node %s" % (node,))
1198
        n_offline.append(node)
1199
        continue
1200

    
1201
      if node == master_node:
1202
        ntype = "master"
1203
      elif node_i.master_candidate:
1204
        ntype = "master candidate"
1205
      elif node_i.drained:
1206
        ntype = "drained"
1207
        n_drained.append(node)
1208
      else:
1209
        ntype = "regular"
1210
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1211

    
1212
      msg = all_nvinfo[node].fail_msg
1213
      if msg:
1214
        feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1215
        bad = True
1216
        continue
1217

    
1218
      nresult = all_nvinfo[node].payload
1219
      node_drbd = {}
1220
      for minor, instance in all_drbd_map[node].items():
1221
        if instance not in instanceinfo:
1222
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1223
                      instance)
1224
          # ghost instance should not be running, but otherwise we
1225
          # don't give double warnings (both ghost instance and
1226
          # unallocated minor in use)
1227
          node_drbd[minor] = (instance, False)
1228
        else:
1229
          instance = instanceinfo[instance]
1230
          node_drbd[minor] = (instance.name, instance.admin_up)
1231
      result = self._VerifyNode(node_i, file_names, local_checksums,
1232
                                nresult, feedback_fn, master_files,
1233
                                node_drbd, vg_name)
1234
      bad = bad or result
1235

    
1236
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1237
      if vg_name is None:
1238
        node_volume[node] = {}
1239
      elif isinstance(lvdata, basestring):
1240
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1241
                    (node, utils.SafeEncode(lvdata)))
1242
        bad = True
1243
        node_volume[node] = {}
1244
      elif not isinstance(lvdata, dict):
1245
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1246
        bad = True
1247
        continue
1248
      else:
1249
        node_volume[node] = lvdata
1250

    
1251
      # node_instance
1252
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1253
      if not isinstance(idata, list):
1254
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1255
                    (node,))
1256
        bad = True
1257
        continue
1258

    
1259
      node_instance[node] = idata
1260

    
1261
      # node_info
1262
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1263
      if not isinstance(nodeinfo, dict):
1264
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1265
        bad = True
1266
        continue
1267

    
1268
      try:
1269
        node_info[node] = {
1270
          "mfree": int(nodeinfo['memory_free']),
1271
          "pinst": [],
1272
          "sinst": [],
1273
          # dictionary holding all instances this node is secondary for,
1274
          # grouped by their primary node. Each key is a cluster node, and each
1275
          # value is a list of instances which have the key as primary and the
1276
          # current node as secondary.  this is handy to calculate N+1 memory
1277
          # availability if you can only failover from a primary to its
1278
          # secondary.
1279
          "sinst-by-pnode": {},
1280
        }
1281
        # FIXME: devise a free space model for file based instances as well
1282
        if vg_name is not None:
1283
          if (constants.NV_VGLIST not in nresult or
1284
              vg_name not in nresult[constants.NV_VGLIST]):
1285
            feedback_fn("  - ERROR: node %s didn't return data for the"
1286
                        " volume group '%s' - it is either missing or broken" %
1287
                        (node, vg_name))
1288
            bad = True
1289
            continue
1290
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1291
      except (ValueError, KeyError):
1292
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1293
                    " from node %s" % (node,))
1294
        bad = True
1295
        continue
1296

    
1297
    node_vol_should = {}
1298

    
1299
    for instance in instancelist:
1300
      feedback_fn("* Verifying instance %s" % instance)
1301
      inst_config = instanceinfo[instance]
1302
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1303
                                     node_instance, feedback_fn, n_offline)
1304
      bad = bad or result
1305
      inst_nodes_offline = []
1306

    
1307
      inst_config.MapLVsByNode(node_vol_should)
1308

    
1309
      instance_cfg[instance] = inst_config
1310

    
1311
      pnode = inst_config.primary_node
1312
      if pnode in node_info:
1313
        node_info[pnode]['pinst'].append(instance)
1314
      elif pnode not in n_offline:
1315
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1316
                    " %s failed" % (instance, pnode))
1317
        bad = True
1318

    
1319
      if pnode in n_offline:
1320
        inst_nodes_offline.append(pnode)
1321

    
1322
      # If the instance is non-redundant we cannot survive losing its primary
1323
      # node, so we are not N+1 compliant. On the other hand we have no disk
1324
      # templates with more than one secondary so that situation is not well
1325
      # supported either.
1326
      # FIXME: does not support file-backed instances
1327
      if len(inst_config.secondary_nodes) == 0:
1328
        i_non_redundant.append(instance)
1329
      elif len(inst_config.secondary_nodes) > 1:
1330
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1331
                    % instance)
1332

    
1333
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1334
        i_non_a_balanced.append(instance)
1335

    
1336
      for snode in inst_config.secondary_nodes:
1337
        if snode in node_info:
1338
          node_info[snode]['sinst'].append(instance)
1339
          if pnode not in node_info[snode]['sinst-by-pnode']:
1340
            node_info[snode]['sinst-by-pnode'][pnode] = []
1341
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1342
        elif snode not in n_offline:
1343
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1344
                      " %s failed" % (instance, snode))
1345
          bad = True
1346
        if snode in n_offline:
1347
          inst_nodes_offline.append(snode)
1348

    
1349
      if inst_nodes_offline:
1350
        # warn that the instance lives on offline nodes, and set bad=True
1351
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1352
                    ", ".join(inst_nodes_offline))
1353
        bad = True
1354

    
1355
    feedback_fn("* Verifying orphan volumes")
1356
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1357
                                       feedback_fn)
1358
    bad = bad or result
1359

    
1360
    feedback_fn("* Verifying remaining instances")
1361
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1362
                                         feedback_fn)
1363
    bad = bad or result
1364

    
1365
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1366
      feedback_fn("* Verifying N+1 Memory redundancy")
1367
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1368
      bad = bad or result
1369

    
1370
    feedback_fn("* Other Notes")
1371
    if i_non_redundant:
1372
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1373
                  % len(i_non_redundant))
1374

    
1375
    if i_non_a_balanced:
1376
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1377
                  % len(i_non_a_balanced))
1378

    
1379
    if n_offline:
1380
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1381

    
1382
    if n_drained:
1383
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1384

    
1385
    return not bad
1386

    
1387
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1388
    """Analyze the post-hooks' result
1389

1390
    This method analyses the hook result, handles it, and sends some
1391
    nicely-formatted feedback back to the user.
1392

1393
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1394
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1395
    @param hooks_results: the results of the multi-node hooks rpc call
1396
    @param feedback_fn: function used send feedback back to the caller
1397
    @param lu_result: previous Exec result
1398
    @return: the new Exec result, based on the previous result
1399
        and hook results
1400

1401
    """
1402
    # We only really run POST phase hooks, and are only interested in
1403
    # their results
1404
    if phase == constants.HOOKS_PHASE_POST:
1405
      # Used to change hooks' output to proper indentation
1406
      indent_re = re.compile('^', re.M)
1407
      feedback_fn("* Hooks Results")
1408
      if not hooks_results:
1409
        feedback_fn("  - ERROR: general communication failure")
1410
        lu_result = 1
1411
      else:
1412
        for node_name in hooks_results:
1413
          show_node_header = True
1414
          res = hooks_results[node_name]
1415
          msg = res.fail_msg
1416
          if msg:
1417
            if res.offline:
1418
              # no need to warn or set fail return value
1419
              continue
1420
            feedback_fn("    Communication failure in hooks execution: %s" %
1421
                        msg)
1422
            lu_result = 1
1423
            continue
1424
          for script, hkr, output in res.payload:
1425
            if hkr == constants.HKR_FAIL:
1426
              # The node header is only shown once, if there are
1427
              # failing hooks on that node
1428
              if show_node_header:
1429
                feedback_fn("  Node %s:" % node_name)
1430
                show_node_header = False
1431
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1432
              output = indent_re.sub('      ', output)
1433
              feedback_fn("%s" % output)
1434
              lu_result = 1
1435

    
1436
      return lu_result
1437

    
1438

    
1439
class LUVerifyDisks(NoHooksLU):
1440
  """Verifies the cluster disks status.
1441

1442
  """
1443
  _OP_REQP = []
1444
  REQ_BGL = False
1445

    
1446
  def ExpandNames(self):
1447
    self.needed_locks = {
1448
      locking.LEVEL_NODE: locking.ALL_SET,
1449
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1450
    }
1451
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1452

    
1453
  def CheckPrereq(self):
1454
    """Check prerequisites.
1455

1456
    This has no prerequisites.
1457

1458
    """
1459
    pass
1460

    
1461
  def Exec(self, feedback_fn):
1462
    """Verify integrity of cluster disks.
1463

1464
    @rtype: tuple of three items
1465
    @return: a tuple of (dict of node-to-node_error, list of instances
1466
        which need activate-disks, dict of instance: (node, volume) for
1467
        missing volumes
1468

1469
    """
1470
    result = res_nodes, res_instances, res_missing = {}, [], {}
1471

    
1472
    vg_name = self.cfg.GetVGName()
1473
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1474
    instances = [self.cfg.GetInstanceInfo(name)
1475
                 for name in self.cfg.GetInstanceList()]
1476

    
1477
    nv_dict = {}
1478
    for inst in instances:
1479
      inst_lvs = {}
1480
      if (not inst.admin_up or
1481
          inst.disk_template not in constants.DTS_NET_MIRROR):
1482
        continue
1483
      inst.MapLVsByNode(inst_lvs)
1484
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1485
      for node, vol_list in inst_lvs.iteritems():
1486
        for vol in vol_list:
1487
          nv_dict[(node, vol)] = inst
1488

    
1489
    if not nv_dict:
1490
      return result
1491

    
1492
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1493

    
1494
    for node in nodes:
1495
      # node_volume
1496
      node_res = node_lvs[node]
1497
      if node_res.offline:
1498
        continue
1499
      msg = node_res.fail_msg
1500
      if msg:
1501
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1502
        res_nodes[node] = msg
1503
        continue
1504

    
1505
      lvs = node_res.payload
1506
      for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1507
        inst = nv_dict.pop((node, lv_name), None)
1508
        if (not lv_online and inst is not None
1509
            and inst.name not in res_instances):
1510
          res_instances.append(inst.name)
1511

    
1512
    # any leftover items in nv_dict are missing LVs, let's arrange the
1513
    # data better
1514
    for key, inst in nv_dict.iteritems():
1515
      if inst.name not in res_missing:
1516
        res_missing[inst.name] = []
1517
      res_missing[inst.name].append(key)
1518

    
1519
    return result
1520

    
1521

    
1522
class LURepairDiskSizes(NoHooksLU):
1523
  """Verifies the cluster disks sizes.
1524

1525
  """
1526
  _OP_REQP = ["instances"]
1527
  REQ_BGL = False
1528

    
1529
  def ExpandNames(self):
1530

    
1531
    if not isinstance(self.op.instances, list):
1532
      raise errors.OpPrereqError("Invalid argument type 'instances'")
1533

    
1534
    if self.op.instances:
1535
      self.wanted_names = []
1536
      for name in self.op.instances:
1537
        full_name = self.cfg.ExpandInstanceName(name)
1538
        if full_name is None:
1539
          raise errors.OpPrereqError("Instance '%s' not known" % name)
1540
        self.wanted_names.append(full_name)
1541
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1542
      self.needed_locks = {
1543
        locking.LEVEL_NODE: [],
1544
        locking.LEVEL_INSTANCE: self.wanted_names,
1545
        }
1546
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1547
    else:
1548
      self.wanted_names = None
1549
      self.needed_locks = {
1550
        locking.LEVEL_NODE: locking.ALL_SET,
1551
        locking.LEVEL_INSTANCE: locking.ALL_SET,
1552
        }
1553
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1554

    
1555
  def DeclareLocks(self, level):
1556
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
1557
      self._LockInstancesNodes(primary_only=True)
1558

    
1559
  def CheckPrereq(self):
1560
    """Check prerequisites.
1561

1562
    This only checks the optional instance list against the existing names.
1563

1564
    """
1565
    if self.wanted_names is None:
1566
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1567

    
1568
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1569
                             in self.wanted_names]
1570

    
1571
  def Exec(self, feedback_fn):
1572
    """Verify the size of cluster disks.
1573

1574
    """
1575
    # TODO: check child disks too
1576
    # TODO: check differences in size between primary/secondary nodes
1577
    per_node_disks = {}
1578
    for instance in self.wanted_instances:
1579
      pnode = instance.primary_node
1580
      if pnode not in per_node_disks:
1581
        per_node_disks[pnode] = []
1582
      for idx, disk in enumerate(instance.disks):
1583
        per_node_disks[pnode].append((instance, idx, disk))
1584

    
1585
    changed = []
1586
    for node, dskl in per_node_disks.items():
1587
      result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1588
      if result.failed:
1589
        self.LogWarning("Failure in blockdev_getsizes call to node"
1590
                        " %s, ignoring", node)
1591
        continue
1592
      if len(result.data) != len(dskl):
1593
        self.LogWarning("Invalid result from node %s, ignoring node results",
1594
                        node)
1595
        continue
1596
      for ((instance, idx, disk), size) in zip(dskl, result.data):
1597
        if size is None:
1598
          self.LogWarning("Disk %d of instance %s did not return size"
1599
                          " information, ignoring", idx, instance.name)
1600
          continue
1601
        if not isinstance(size, (int, long)):
1602
          self.LogWarning("Disk %d of instance %s did not return valid"
1603
                          " size information, ignoring", idx, instance.name)
1604
          continue
1605
        size = size >> 20
1606
        if size != disk.size:
1607
          self.LogInfo("Disk %d of instance %s has mismatched size,"
1608
                       " correcting: recorded %d, actual %d", idx,
1609
                       instance.name, disk.size, size)
1610
          disk.size = size
1611
          self.cfg.Update(instance)
1612
          changed.append((instance.name, idx, size))
1613
    return changed
1614

    
1615

    
1616
class LURenameCluster(LogicalUnit):
1617
  """Rename the cluster.
1618

1619
  """
1620
  HPATH = "cluster-rename"
1621
  HTYPE = constants.HTYPE_CLUSTER
1622
  _OP_REQP = ["name"]
1623

    
1624
  def BuildHooksEnv(self):
1625
    """Build hooks env.
1626

1627
    """
1628
    env = {
1629
      "OP_TARGET": self.cfg.GetClusterName(),
1630
      "NEW_NAME": self.op.name,
1631
      }
1632
    mn = self.cfg.GetMasterNode()
1633
    return env, [mn], [mn]
1634

    
1635
  def CheckPrereq(self):
1636
    """Verify that the passed name is a valid one.
1637

1638
    """
1639
    hostname = utils.HostInfo(self.op.name)
1640

    
1641
    new_name = hostname.name
1642
    self.ip = new_ip = hostname.ip
1643
    old_name = self.cfg.GetClusterName()
1644
    old_ip = self.cfg.GetMasterIP()
1645
    if new_name == old_name and new_ip == old_ip:
1646
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1647
                                 " cluster has changed")
1648
    if new_ip != old_ip:
1649
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1650
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1651
                                   " reachable on the network. Aborting." %
1652
                                   new_ip)
1653

    
1654
    self.op.name = new_name
1655

    
1656
  def Exec(self, feedback_fn):
1657
    """Rename the cluster.
1658

1659
    """
1660
    clustername = self.op.name
1661
    ip = self.ip
1662

    
1663
    # shutdown the master IP
1664
    master = self.cfg.GetMasterNode()
1665
    result = self.rpc.call_node_stop_master(master, False)
1666
    result.Raise("Could not disable the master role")
1667

    
1668
    try:
1669
      cluster = self.cfg.GetClusterInfo()
1670
      cluster.cluster_name = clustername
1671
      cluster.master_ip = ip
1672
      self.cfg.Update(cluster)
1673

    
1674
      # update the known hosts file
1675
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1676
      node_list = self.cfg.GetNodeList()
1677
      try:
1678
        node_list.remove(master)
1679
      except ValueError:
1680
        pass
1681
      result = self.rpc.call_upload_file(node_list,
1682
                                         constants.SSH_KNOWN_HOSTS_FILE)
1683
      for to_node, to_result in result.iteritems():
1684
        msg = to_result.fail_msg
1685
        if msg:
1686
          msg = ("Copy of file %s to node %s failed: %s" %
1687
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1688
          self.proc.LogWarning(msg)
1689

    
1690
    finally:
1691
      result = self.rpc.call_node_start_master(master, False, False)
1692
      msg = result.fail_msg
1693
      if msg:
1694
        self.LogWarning("Could not re-enable the master role on"
1695
                        " the master, please restart manually: %s", msg)
1696

    
1697

    
1698
def _RecursiveCheckIfLVMBased(disk):
1699
  """Check if the given disk or its children are lvm-based.
1700

1701
  @type disk: L{objects.Disk}
1702
  @param disk: the disk to check
1703
  @rtype: boolean
1704
  @return: boolean indicating whether a LD_LV dev_type was found or not
1705

1706
  """
1707
  if disk.children:
1708
    for chdisk in disk.children:
1709
      if _RecursiveCheckIfLVMBased(chdisk):
1710
        return True
1711
  return disk.dev_type == constants.LD_LV
1712

    
1713

    
1714
class LUSetClusterParams(LogicalUnit):
1715
  """Change the parameters of the cluster.
1716

1717
  """
1718
  HPATH = "cluster-modify"
1719
  HTYPE = constants.HTYPE_CLUSTER
1720
  _OP_REQP = []
1721
  REQ_BGL = False
1722

    
1723
  def CheckArguments(self):
1724
    """Check parameters
1725

1726
    """
1727
    if not hasattr(self.op, "candidate_pool_size"):
1728
      self.op.candidate_pool_size = None
1729
    if self.op.candidate_pool_size is not None:
1730
      try:
1731
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1732
      except (ValueError, TypeError), err:
1733
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1734
                                   str(err))
1735
      if self.op.candidate_pool_size < 1:
1736
        raise errors.OpPrereqError("At least one master candidate needed")
1737

    
1738
  def ExpandNames(self):
1739
    # FIXME: in the future maybe other cluster params won't require checking on
1740
    # all nodes to be modified.
1741
    self.needed_locks = {
1742
      locking.LEVEL_NODE: locking.ALL_SET,
1743
    }
1744
    self.share_locks[locking.LEVEL_NODE] = 1
1745

    
1746
  def BuildHooksEnv(self):
1747
    """Build hooks env.
1748

1749
    """
1750
    env = {
1751
      "OP_TARGET": self.cfg.GetClusterName(),
1752
      "NEW_VG_NAME": self.op.vg_name,
1753
      }
1754
    mn = self.cfg.GetMasterNode()
1755
    return env, [mn], [mn]
1756

    
1757
  def CheckPrereq(self):
1758
    """Check prerequisites.
1759

1760
    This checks whether the given params don't conflict and
1761
    if the given volume group is valid.
1762

1763
    """
1764
    if self.op.vg_name is not None and not self.op.vg_name:
1765
      instances = self.cfg.GetAllInstancesInfo().values()
1766
      for inst in instances:
1767
        for disk in inst.disks:
1768
          if _RecursiveCheckIfLVMBased(disk):
1769
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1770
                                       " lvm-based instances exist")
1771

    
1772
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1773

    
1774
    # if vg_name not None, checks given volume group on all nodes
1775
    if self.op.vg_name:
1776
      vglist = self.rpc.call_vg_list(node_list)
1777
      for node in node_list:
1778
        msg = vglist[node].fail_msg
1779
        if msg:
1780
          # ignoring down node
1781
          self.LogWarning("Error while gathering data on node %s"
1782
                          " (ignoring node): %s", node, msg)
1783
          continue
1784
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1785
                                              self.op.vg_name,
1786
                                              constants.MIN_VG_SIZE)
1787
        if vgstatus:
1788
          raise errors.OpPrereqError("Error on node '%s': %s" %
1789
                                     (node, vgstatus))
1790

    
1791
    self.cluster = cluster = self.cfg.GetClusterInfo()
1792
    # validate params changes
1793
    if self.op.beparams:
1794
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1795
      self.new_beparams = objects.FillDict(
1796
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1797

    
1798
    if self.op.nicparams:
1799
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1800
      self.new_nicparams = objects.FillDict(
1801
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1802
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1803

    
1804
    # hypervisor list/parameters
1805
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1806
    if self.op.hvparams:
1807
      if not isinstance(self.op.hvparams, dict):
1808
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1809
      for hv_name, hv_dict in self.op.hvparams.items():
1810
        if hv_name not in self.new_hvparams:
1811
          self.new_hvparams[hv_name] = hv_dict
1812
        else:
1813
          self.new_hvparams[hv_name].update(hv_dict)
1814

    
1815
    if self.op.enabled_hypervisors is not None:
1816
      self.hv_list = self.op.enabled_hypervisors
1817
      if not self.hv_list:
1818
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1819
                                   " least one member")
1820
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1821
      if invalid_hvs:
1822
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1823
                                   " entries: %s" %
1824
                                   utils.CommaJoin(invalid_hvs))
1825
    else:
1826
      self.hv_list = cluster.enabled_hypervisors
1827

    
1828
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1829
      # either the enabled list has changed, or the parameters have, validate
1830
      for hv_name, hv_params in self.new_hvparams.items():
1831
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1832
            (self.op.enabled_hypervisors and
1833
             hv_name in self.op.enabled_hypervisors)):
1834
          # either this is a new hypervisor, or its parameters have changed
1835
          hv_class = hypervisor.GetHypervisor(hv_name)
1836
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1837
          hv_class.CheckParameterSyntax(hv_params)
1838
          _CheckHVParams(self, node_list, hv_name, hv_params)
1839

    
1840
  def Exec(self, feedback_fn):
1841
    """Change the parameters of the cluster.
1842

1843
    """
1844
    if self.op.vg_name is not None:
1845
      new_volume = self.op.vg_name
1846
      if not new_volume:
1847
        new_volume = None
1848
      if new_volume != self.cfg.GetVGName():
1849
        self.cfg.SetVGName(new_volume)
1850
      else:
1851
        feedback_fn("Cluster LVM configuration already in desired"
1852
                    " state, not changing")
1853
    if self.op.hvparams:
1854
      self.cluster.hvparams = self.new_hvparams
1855
    if self.op.enabled_hypervisors is not None:
1856
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1857
    if self.op.beparams:
1858
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1859
    if self.op.nicparams:
1860
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1861

    
1862
    if self.op.candidate_pool_size is not None:
1863
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1864
      # we need to update the pool size here, otherwise the save will fail
1865
      _AdjustCandidatePool(self)
1866

    
1867
    self.cfg.Update(self.cluster)
1868

    
1869

    
1870
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1871
  """Distribute additional files which are part of the cluster configuration.
1872

1873
  ConfigWriter takes care of distributing the config and ssconf files, but
1874
  there are more files which should be distributed to all nodes. This function
1875
  makes sure those are copied.
1876

1877
  @param lu: calling logical unit
1878
  @param additional_nodes: list of nodes not in the config to distribute to
1879

1880
  """
1881
  # 1. Gather target nodes
1882
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1883
  dist_nodes = lu.cfg.GetNodeList()
1884
  if additional_nodes is not None:
1885
    dist_nodes.extend(additional_nodes)
1886
  if myself.name in dist_nodes:
1887
    dist_nodes.remove(myself.name)
1888
  # 2. Gather files to distribute
1889
  dist_files = set([constants.ETC_HOSTS,
1890
                    constants.SSH_KNOWN_HOSTS_FILE,
1891
                    constants.RAPI_CERT_FILE,
1892
                    constants.RAPI_USERS_FILE,
1893
                    constants.HMAC_CLUSTER_KEY,
1894
                   ])
1895

    
1896
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1897
  for hv_name in enabled_hypervisors:
1898
    hv_class = hypervisor.GetHypervisor(hv_name)
1899
    dist_files.update(hv_class.GetAncillaryFiles())
1900

    
1901
  # 3. Perform the files upload
1902
  for fname in dist_files:
1903
    if os.path.exists(fname):
1904
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1905
      for to_node, to_result in result.items():
1906
        msg = to_result.fail_msg
1907
        if msg:
1908
          msg = ("Copy of file %s to node %s failed: %s" %
1909
                 (fname, to_node, msg))
1910
          lu.proc.LogWarning(msg)
1911

    
1912

    
1913
class LURedistributeConfig(NoHooksLU):
1914
  """Force the redistribution of cluster configuration.
1915

1916
  This is a very simple LU.
1917

1918
  """
1919
  _OP_REQP = []
1920
  REQ_BGL = False
1921

    
1922
  def ExpandNames(self):
1923
    self.needed_locks = {
1924
      locking.LEVEL_NODE: locking.ALL_SET,
1925
    }
1926
    self.share_locks[locking.LEVEL_NODE] = 1
1927

    
1928
  def CheckPrereq(self):
1929
    """Check prerequisites.
1930

1931
    """
1932

    
1933
  def Exec(self, feedback_fn):
1934
    """Redistribute the configuration.
1935

1936
    """
1937
    self.cfg.Update(self.cfg.GetClusterInfo())
1938
    _RedistributeAncillaryFiles(self)
1939

    
1940

    
1941
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1942
  """Sleep and poll for an instance's disk to sync.
1943

1944
  """
1945
  if not instance.disks:
1946
    return True
1947

    
1948
  if not oneshot:
1949
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1950

    
1951
  node = instance.primary_node
1952

    
1953
  for dev in instance.disks:
1954
    lu.cfg.SetDiskID(dev, node)
1955

    
1956
  retries = 0
1957
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1958
  while True:
1959
    max_time = 0
1960
    done = True
1961
    cumul_degraded = False
1962
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1963
    msg = rstats.fail_msg
1964
    if msg:
1965
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1966
      retries += 1
1967
      if retries >= 10:
1968
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1969
                                 " aborting." % node)
1970
      time.sleep(6)
1971
      continue
1972
    rstats = rstats.payload
1973
    retries = 0
1974
    for i, mstat in enumerate(rstats):
1975
      if mstat is None:
1976
        lu.LogWarning("Can't compute data for node %s/%s",
1977
                           node, instance.disks[i].iv_name)
1978
        continue
1979

    
1980
      cumul_degraded = (cumul_degraded or
1981
                        (mstat.is_degraded and mstat.sync_percent is None))
1982
      if mstat.sync_percent is not None:
1983
        done = False
1984
        if mstat.estimated_time is not None:
1985
          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
1986
          max_time = mstat.estimated_time
1987
        else:
1988
          rem_time = "no time estimate"
1989
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1990
                        (instance.disks[i].iv_name, mstat.sync_percent, rem_time))
1991

    
1992
    # if we're done but degraded, let's do a few small retries, to
1993
    # make sure we see a stable and not transient situation; therefore
1994
    # we force restart of the loop
1995
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1996
      logging.info("Degraded disks found, %d retries left", degr_retries)
1997
      degr_retries -= 1
1998
      time.sleep(1)
1999
      continue
2000

    
2001
    if done or oneshot:
2002
      break
2003

    
2004
    time.sleep(min(60, max_time))
2005

    
2006
  if done:
2007
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2008
  return not cumul_degraded
2009

    
2010

    
2011
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2012
  """Check that mirrors are not degraded.
2013

2014
  The ldisk parameter, if True, will change the test from the
2015
  is_degraded attribute (which represents overall non-ok status for
2016
  the device(s)) to the ldisk (representing the local storage status).
2017

2018
  """
2019
  lu.cfg.SetDiskID(dev, node)
2020

    
2021
  result = True
2022

    
2023
  if on_primary or dev.AssembleOnSecondary():
2024
    rstats = lu.rpc.call_blockdev_find(node, dev)
2025
    msg = rstats.fail_msg
2026
    if msg:
2027
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2028
      result = False
2029
    elif not rstats.payload:
2030
      lu.LogWarning("Can't find disk on node %s", node)
2031
      result = False
2032
    else:
2033
      if ldisk:
2034
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2035
      else:
2036
        result = result and not rstats.payload.is_degraded
2037

    
2038
  if dev.children:
2039
    for child in dev.children:
2040
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2041

    
2042
  return result
2043

    
2044

    
2045
class LUDiagnoseOS(NoHooksLU):
2046
  """Logical unit for OS diagnose/query.
2047

2048
  """
2049
  _OP_REQP = ["output_fields", "names"]
2050
  REQ_BGL = False
2051
  _FIELDS_STATIC = utils.FieldSet()
2052
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2053

    
2054
  def ExpandNames(self):
2055
    if self.op.names:
2056
      raise errors.OpPrereqError("Selective OS query not supported")
2057

    
2058
    _CheckOutputFields(static=self._FIELDS_STATIC,
2059
                       dynamic=self._FIELDS_DYNAMIC,
2060
                       selected=self.op.output_fields)
2061

    
2062
    # Lock all nodes, in shared mode
2063
    # Temporary removal of locks, should be reverted later
2064
    # TODO: reintroduce locks when they are lighter-weight
2065
    self.needed_locks = {}
2066
    #self.share_locks[locking.LEVEL_NODE] = 1
2067
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2068

    
2069
  def CheckPrereq(self):
2070
    """Check prerequisites.
2071

2072
    """
2073

    
2074
  @staticmethod
2075
  def _DiagnoseByOS(node_list, rlist):
2076
    """Remaps a per-node return list into an a per-os per-node dictionary
2077

2078
    @param node_list: a list with the names of all nodes
2079
    @param rlist: a map with node names as keys and OS objects as values
2080

2081
    @rtype: dict
2082
    @return: a dictionary with osnames as keys and as value another map, with
2083
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
2084

2085
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2086
                                     (/srv/..., False, "invalid api")],
2087
                           "node2": [(/srv/..., True, "")]}
2088
          }
2089

2090
    """
2091
    all_os = {}
2092
    # we build here the list of nodes that didn't fail the RPC (at RPC
2093
    # level), so that nodes with a non-responding node daemon don't
2094
    # make all OSes invalid
2095
    good_nodes = [node_name for node_name in rlist
2096
                  if not rlist[node_name].fail_msg]
2097
    for node_name, nr in rlist.items():
2098
      if nr.fail_msg or not nr.payload:
2099
        continue
2100
      for name, path, status, diagnose in nr.payload:
2101
        if name not in all_os:
2102
          # build a list of nodes for this os containing empty lists
2103
          # for each node in node_list
2104
          all_os[name] = {}
2105
          for nname in good_nodes:
2106
            all_os[name][nname] = []
2107
        all_os[name][node_name].append((path, status, diagnose))
2108
    return all_os
2109

    
2110
  def Exec(self, feedback_fn):
2111
    """Compute the list of OSes.
2112

2113
    """
2114
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2115
    node_data = self.rpc.call_os_diagnose(valid_nodes)
2116
    pol = self._DiagnoseByOS(valid_nodes, node_data)
2117
    output = []
2118
    for os_name, os_data in pol.items():
2119
      row = []
2120
      for field in self.op.output_fields:
2121
        if field == "name":
2122
          val = os_name
2123
        elif field == "valid":
2124
          val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2125
        elif field == "node_status":
2126
          # this is just a copy of the dict
2127
          val = {}
2128
          for node_name, nos_list in os_data.items():
2129
            val[node_name] = nos_list
2130
        else:
2131
          raise errors.ParameterError(field)
2132
        row.append(val)
2133
      output.append(row)
2134

    
2135
    return output
2136

    
2137

    
2138
class LURemoveNode(LogicalUnit):
2139
  """Logical unit for removing a node.
2140

2141
  """
2142
  HPATH = "node-remove"
2143
  HTYPE = constants.HTYPE_NODE
2144
  _OP_REQP = ["node_name"]
2145

    
2146
  def BuildHooksEnv(self):
2147
    """Build hooks env.
2148

2149
    This doesn't run on the target node in the pre phase as a failed
2150
    node would then be impossible to remove.
2151

2152
    """
2153
    env = {
2154
      "OP_TARGET": self.op.node_name,
2155
      "NODE_NAME": self.op.node_name,
2156
      }
2157
    all_nodes = self.cfg.GetNodeList()
2158
    if self.op.node_name in all_nodes:
2159
      all_nodes.remove(self.op.node_name)
2160
    return env, all_nodes, all_nodes
2161

    
2162
  def CheckPrereq(self):
2163
    """Check prerequisites.
2164

2165
    This checks:
2166
     - the node exists in the configuration
2167
     - it does not have primary or secondary instances
2168
     - it's not the master
2169

2170
    Any errors are signaled by raising errors.OpPrereqError.
2171

2172
    """
2173
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2174
    if node is None:
2175
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2176

    
2177
    instance_list = self.cfg.GetInstanceList()
2178

    
2179
    masternode = self.cfg.GetMasterNode()
2180
    if node.name == masternode:
2181
      raise errors.OpPrereqError("Node is the master node,"
2182
                                 " you need to failover first.")
2183

    
2184
    for instance_name in instance_list:
2185
      instance = self.cfg.GetInstanceInfo(instance_name)
2186
      if node.name in instance.all_nodes:
2187
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2188
                                   " please remove first." % instance_name)
2189
    self.op.node_name = node.name
2190
    self.node = node
2191

    
2192
  def Exec(self, feedback_fn):
2193
    """Removes the node from the cluster.
2194

2195
    """
2196
    node = self.node
2197
    logging.info("Stopping the node daemon and removing configs from node %s",
2198
                 node.name)
2199

    
2200
    self.context.RemoveNode(node.name)
2201

    
2202
    # Run post hooks on the node before it's removed
2203
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2204
    try:
2205
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2206
    except:
2207
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
2208

    
2209
    result = self.rpc.call_node_leave_cluster(node.name)
2210
    msg = result.fail_msg
2211
    if msg:
2212
      self.LogWarning("Errors encountered on the remote node while leaving"
2213
                      " the cluster: %s", msg)
2214

    
2215
    # Promote nodes to master candidate as needed
2216
    _AdjustCandidatePool(self)
2217

    
2218

    
2219
class LUQueryNodes(NoHooksLU):
2220
  """Logical unit for querying nodes.
2221

2222
  """
2223
  _OP_REQP = ["output_fields", "names", "use_locking"]
2224
  REQ_BGL = False
2225
  _FIELDS_DYNAMIC = utils.FieldSet(
2226
    "dtotal", "dfree",
2227
    "mtotal", "mnode", "mfree",
2228
    "bootid",
2229
    "ctotal", "cnodes", "csockets",
2230
    )
2231

    
2232
  _FIELDS_STATIC = utils.FieldSet(
2233
    "name", "pinst_cnt", "sinst_cnt",
2234
    "pinst_list", "sinst_list",
2235
    "pip", "sip", "tags",
2236
    "serial_no", "ctime", "mtime",
2237
    "master_candidate",
2238
    "master",
2239
    "offline",
2240
    "drained",
2241
    "role",
2242
    )
2243

    
2244
  def ExpandNames(self):
2245
    _CheckOutputFields(static=self._FIELDS_STATIC,
2246
                       dynamic=self._FIELDS_DYNAMIC,
2247
                       selected=self.op.output_fields)
2248

    
2249
    self.needed_locks = {}
2250
    self.share_locks[locking.LEVEL_NODE] = 1
2251

    
2252
    if self.op.names:
2253
      self.wanted = _GetWantedNodes(self, self.op.names)
2254
    else:
2255
      self.wanted = locking.ALL_SET
2256

    
2257
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2258
    self.do_locking = self.do_node_query and self.op.use_locking
2259
    if self.do_locking:
2260
      # if we don't request only static fields, we need to lock the nodes
2261
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2262

    
2263

    
2264
  def CheckPrereq(self):
2265
    """Check prerequisites.
2266

2267
    """
2268
    # The validation of the node list is done in the _GetWantedNodes,
2269
    # if non empty, and if empty, there's no validation to do
2270
    pass
2271

    
2272
  def Exec(self, feedback_fn):
2273
    """Computes the list of nodes and their attributes.
2274

2275
    """
2276
    all_info = self.cfg.GetAllNodesInfo()
2277
    if self.do_locking:
2278
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2279
    elif self.wanted != locking.ALL_SET:
2280
      nodenames = self.wanted
2281
      missing = set(nodenames).difference(all_info.keys())
2282
      if missing:
2283
        raise errors.OpExecError(
2284
          "Some nodes were removed before retrieving their data: %s" % missing)
2285
    else:
2286
      nodenames = all_info.keys()
2287

    
2288
    nodenames = utils.NiceSort(nodenames)
2289
    nodelist = [all_info[name] for name in nodenames]
2290

    
2291
    # begin data gathering
2292

    
2293
    if self.do_node_query:
2294
      live_data = {}
2295
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2296
                                          self.cfg.GetHypervisorType())
2297
      for name in nodenames:
2298
        nodeinfo = node_data[name]
2299
        if not nodeinfo.fail_msg and nodeinfo.payload:
2300
          nodeinfo = nodeinfo.payload
2301
          fn = utils.TryConvert
2302
          live_data[name] = {
2303
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2304
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2305
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2306
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2307
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2308
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2309
            "bootid": nodeinfo.get('bootid', None),
2310
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2311
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2312
            }
2313
        else:
2314
          live_data[name] = {}
2315
    else:
2316
      live_data = dict.fromkeys(nodenames, {})
2317

    
2318
    node_to_primary = dict([(name, set()) for name in nodenames])
2319
    node_to_secondary = dict([(name, set()) for name in nodenames])
2320

    
2321
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2322
                             "sinst_cnt", "sinst_list"))
2323
    if inst_fields & frozenset(self.op.output_fields):
2324
      instancelist = self.cfg.GetInstanceList()
2325

    
2326
      for instance_name in instancelist:
2327
        inst = self.cfg.GetInstanceInfo(instance_name)
2328
        if inst.primary_node in node_to_primary:
2329
          node_to_primary[inst.primary_node].add(inst.name)
2330
        for secnode in inst.secondary_nodes:
2331
          if secnode in node_to_secondary:
2332
            node_to_secondary[secnode].add(inst.name)
2333

    
2334
    master_node = self.cfg.GetMasterNode()
2335

    
2336
    # end data gathering
2337

    
2338
    output = []
2339
    for node in nodelist:
2340
      node_output = []
2341
      for field in self.op.output_fields:
2342
        if field == "name":
2343
          val = node.name
2344
        elif field == "pinst_list":
2345
          val = list(node_to_primary[node.name])
2346
        elif field == "sinst_list":
2347
          val = list(node_to_secondary[node.name])
2348
        elif field == "pinst_cnt":
2349
          val = len(node_to_primary[node.name])
2350
        elif field == "sinst_cnt":
2351
          val = len(node_to_secondary[node.name])
2352
        elif field == "pip":
2353
          val = node.primary_ip
2354
        elif field == "sip":
2355
          val = node.secondary_ip
2356
        elif field == "tags":
2357
          val = list(node.GetTags())
2358
        elif field == "serial_no":
2359
          val = node.serial_no
2360
        elif field == "ctime":
2361
          val = node.ctime
2362
        elif field == "mtime":
2363
          val = node.mtime
2364
        elif field == "master_candidate":
2365
          val = node.master_candidate
2366
        elif field == "master":
2367
          val = node.name == master_node
2368
        elif field == "offline":
2369
          val = node.offline
2370
        elif field == "drained":
2371
          val = node.drained
2372
        elif self._FIELDS_DYNAMIC.Matches(field):
2373
          val = live_data[node.name].get(field, None)
2374
        elif field == "role":
2375
          if node.name == master_node:
2376
            val = "M"
2377
          elif node.master_candidate:
2378
            val = "C"
2379
          elif node.drained:
2380
            val = "D"
2381
          elif node.offline:
2382
            val = "O"
2383
          else:
2384
            val = "R"
2385
        else:
2386
          raise errors.ParameterError(field)
2387
        node_output.append(val)
2388
      output.append(node_output)
2389

    
2390
    return output
2391

    
2392

    
2393
class LUQueryNodeVolumes(NoHooksLU):
2394
  """Logical unit for getting volumes on node(s).
2395

2396
  """
2397
  _OP_REQP = ["nodes", "output_fields"]
2398
  REQ_BGL = False
2399
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2400
  _FIELDS_STATIC = utils.FieldSet("node")
2401

    
2402
  def ExpandNames(self):
2403
    _CheckOutputFields(static=self._FIELDS_STATIC,
2404
                       dynamic=self._FIELDS_DYNAMIC,
2405
                       selected=self.op.output_fields)
2406

    
2407
    self.needed_locks = {}
2408
    self.share_locks[locking.LEVEL_NODE] = 1
2409
    if not self.op.nodes:
2410
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2411
    else:
2412
      self.needed_locks[locking.LEVEL_NODE] = \
2413
        _GetWantedNodes(self, self.op.nodes)
2414

    
2415
  def CheckPrereq(self):
2416
    """Check prerequisites.
2417

2418
    This checks that the fields required are valid output fields.
2419

2420
    """
2421
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2422

    
2423
  def Exec(self, feedback_fn):
2424
    """Computes the list of nodes and their attributes.
2425

2426
    """
2427
    nodenames = self.nodes
2428
    volumes = self.rpc.call_node_volumes(nodenames)
2429

    
2430
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2431
             in self.cfg.GetInstanceList()]
2432

    
2433
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2434

    
2435
    output = []
2436
    for node in nodenames:
2437
      nresult = volumes[node]
2438
      if nresult.offline:
2439
        continue
2440
      msg = nresult.fail_msg
2441
      if msg:
2442
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2443
        continue
2444

    
2445
      node_vols = nresult.payload[:]
2446
      node_vols.sort(key=lambda vol: vol['dev'])
2447

    
2448
      for vol in node_vols:
2449
        node_output = []
2450
        for field in self.op.output_fields:
2451
          if field == "node":
2452
            val = node
2453
          elif field == "phys":
2454
            val = vol['dev']
2455
          elif field == "vg":
2456
            val = vol['vg']
2457
          elif field == "name":
2458
            val = vol['name']
2459
          elif field == "size":
2460
            val = int(float(vol['size']))
2461
          elif field == "instance":
2462
            for inst in ilist:
2463
              if node not in lv_by_node[inst]:
2464
                continue
2465
              if vol['name'] in lv_by_node[inst][node]:
2466
                val = inst.name
2467
                break
2468
            else:
2469
              val = '-'
2470
          else:
2471
            raise errors.ParameterError(field)
2472
          node_output.append(str(val))
2473

    
2474
        output.append(node_output)
2475

    
2476
    return output
2477

    
2478

    
2479
class LUQueryNodeStorage(NoHooksLU):
2480
  """Logical unit for getting information on storage units on node(s).
2481

2482
  """
2483
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
2484
  REQ_BGL = False
2485
  _FIELDS_STATIC = utils.FieldSet("node")
2486

    
2487
  def ExpandNames(self):
2488
    storage_type = self.op.storage_type
2489

    
2490
    if storage_type not in constants.VALID_STORAGE_FIELDS:
2491
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2492

    
2493
    dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2494

    
2495
    _CheckOutputFields(static=self._FIELDS_STATIC,
2496
                       dynamic=utils.FieldSet(*dynamic_fields),
2497
                       selected=self.op.output_fields)
2498

    
2499
    self.needed_locks = {}
2500
    self.share_locks[locking.LEVEL_NODE] = 1
2501

    
2502
    if self.op.nodes:
2503
      self.needed_locks[locking.LEVEL_NODE] = \
2504
        _GetWantedNodes(self, self.op.nodes)
2505
    else:
2506
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2507

    
2508
  def CheckPrereq(self):
2509
    """Check prerequisites.
2510

2511
    This checks that the fields required are valid output fields.
2512

2513
    """
2514
    self.op.name = getattr(self.op, "name", None)
2515

    
2516
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2517

    
2518
  def Exec(self, feedback_fn):
2519
    """Computes the list of nodes and their attributes.
2520

2521
    """
2522
    # Always get name to sort by
2523
    if constants.SF_NAME in self.op.output_fields:
2524
      fields = self.op.output_fields[:]
2525
    else:
2526
      fields = [constants.SF_NAME] + self.op.output_fields
2527

    
2528
    # Never ask for node as it's only known to the LU
2529
    while "node" in fields:
2530
      fields.remove("node")
2531

    
2532
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2533
    name_idx = field_idx[constants.SF_NAME]
2534

    
2535
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2536
    data = self.rpc.call_storage_list(self.nodes,
2537
                                      self.op.storage_type, st_args,
2538
                                      self.op.name, fields)
2539

    
2540
    result = []
2541

    
2542
    for node in utils.NiceSort(self.nodes):
2543
      nresult = data[node]
2544
      if nresult.offline:
2545
        continue
2546

    
2547
      msg = nresult.fail_msg
2548
      if msg:
2549
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2550
        continue
2551

    
2552
      rows = dict([(row[name_idx], row) for row in nresult.payload])
2553

    
2554
      for name in utils.NiceSort(rows.keys()):
2555
        row = rows[name]
2556

    
2557
        out = []
2558

    
2559
        for field in self.op.output_fields:
2560
          if field == "node":
2561
            val = node
2562
          elif field in field_idx:
2563
            val = row[field_idx[field]]
2564
          else:
2565
            raise errors.ParameterError(field)
2566

    
2567
          out.append(val)
2568

    
2569
        result.append(out)
2570

    
2571
    return result
2572

    
2573

    
2574
class LUModifyNodeStorage(NoHooksLU):
2575
  """Logical unit for modifying a storage volume on a node.
2576

2577
  """
2578
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2579
  REQ_BGL = False
2580

    
2581
  def CheckArguments(self):
2582
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2583
    if node_name is None:
2584
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2585

    
2586
    self.op.node_name = node_name
2587

    
2588
    storage_type = self.op.storage_type
2589
    if storage_type not in constants.VALID_STORAGE_FIELDS:
2590
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2591

    
2592
  def ExpandNames(self):
2593
    self.needed_locks = {
2594
      locking.LEVEL_NODE: self.op.node_name,
2595
      }
2596

    
2597
  def CheckPrereq(self):
2598
    """Check prerequisites.
2599

2600
    """
2601
    storage_type = self.op.storage_type
2602

    
2603
    try:
2604
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2605
    except KeyError:
2606
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
2607
                                 " modified" % storage_type)
2608

    
2609
    diff = set(self.op.changes.keys()) - modifiable
2610
    if diff:
2611
      raise errors.OpPrereqError("The following fields can not be modified for"
2612
                                 " storage units of type '%s': %r" %
2613
                                 (storage_type, list(diff)))
2614

    
2615
  def Exec(self, feedback_fn):
2616
    """Computes the list of nodes and their attributes.
2617

2618
    """
2619
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2620
    result = self.rpc.call_storage_modify(self.op.node_name,
2621
                                          self.op.storage_type, st_args,
2622
                                          self.op.name, self.op.changes)
2623
    result.Raise("Failed to modify storage unit '%s' on %s" %
2624
                 (self.op.name, self.op.node_name))
2625

    
2626

    
2627
class LUAddNode(LogicalUnit):
2628
  """Logical unit for adding node to the cluster.
2629

2630
  """
2631
  HPATH = "node-add"
2632
  HTYPE = constants.HTYPE_NODE
2633
  _OP_REQP = ["node_name"]
2634

    
2635
  def BuildHooksEnv(self):
2636
    """Build hooks env.
2637

2638
    This will run on all nodes before, and on all nodes + the new node after.
2639

2640
    """
2641
    env = {
2642
      "OP_TARGET": self.op.node_name,
2643
      "NODE_NAME": self.op.node_name,
2644
      "NODE_PIP": self.op.primary_ip,
2645
      "NODE_SIP": self.op.secondary_ip,
2646
      }
2647
    nodes_0 = self.cfg.GetNodeList()
2648
    nodes_1 = nodes_0 + [self.op.node_name, ]
2649
    return env, nodes_0, nodes_1
2650

    
2651
  def CheckPrereq(self):
2652
    """Check prerequisites.
2653

2654
    This checks:
2655
     - the new node is not already in the config
2656
     - it is resolvable
2657
     - its parameters (single/dual homed) matches the cluster
2658

2659
    Any errors are signaled by raising errors.OpPrereqError.
2660

2661
    """
2662
    node_name = self.op.node_name
2663
    cfg = self.cfg
2664

    
2665
    dns_data = utils.HostInfo(node_name)
2666

    
2667
    node = dns_data.name
2668
    primary_ip = self.op.primary_ip = dns_data.ip
2669
    secondary_ip = getattr(self.op, "secondary_ip", None)
2670
    if secondary_ip is None:
2671
      secondary_ip = primary_ip
2672
    if not utils.IsValidIP(secondary_ip):
2673
      raise errors.OpPrereqError("Invalid secondary IP given")
2674
    self.op.secondary_ip = secondary_ip
2675

    
2676
    node_list = cfg.GetNodeList()
2677
    if not self.op.readd and node in node_list:
2678
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2679
                                 node)
2680
    elif self.op.readd and node not in node_list:
2681
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2682

    
2683
    for existing_node_name in node_list:
2684
      existing_node = cfg.GetNodeInfo(existing_node_name)
2685

    
2686
      if self.op.readd and node == existing_node_name:
2687
        if (existing_node.primary_ip != primary_ip or
2688
            existing_node.secondary_ip != secondary_ip):
2689
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2690
                                     " address configuration as before")
2691
        continue
2692

    
2693
      if (existing_node.primary_ip == primary_ip or
2694
          existing_node.secondary_ip == primary_ip or
2695
          existing_node.primary_ip == secondary_ip or
2696
          existing_node.secondary_ip == secondary_ip):
2697
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2698
                                   " existing node %s" % existing_node.name)
2699

    
2700
    # check that the type of the node (single versus dual homed) is the
2701
    # same as for the master
2702
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2703
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2704
    newbie_singlehomed = secondary_ip == primary_ip
2705
    if master_singlehomed != newbie_singlehomed:
2706
      if master_singlehomed:
2707
        raise errors.OpPrereqError("The master has no private ip but the"
2708
                                   " new node has one")
2709
      else:
2710
        raise errors.OpPrereqError("The master has a private ip but the"
2711
                                   " new node doesn't have one")
2712

    
2713
    # checks reachability
2714
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2715
      raise errors.OpPrereqError("Node not reachable by ping")
2716

    
2717
    if not newbie_singlehomed:
2718
      # check reachability from my secondary ip to newbie's secondary ip
2719
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2720
                           source=myself.secondary_ip):
2721
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2722
                                   " based ping to noded port")
2723

    
2724
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2725
    if self.op.readd:
2726
      exceptions = [node]
2727
    else:
2728
      exceptions = []
2729
    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2730
    # the new node will increase mc_max with one, so:
2731
    mc_max = min(mc_max + 1, cp_size)
2732
    self.master_candidate = mc_now < mc_max
2733

    
2734
    if self.op.readd:
2735
      self.new_node = self.cfg.GetNodeInfo(node)
2736
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2737
    else:
2738
      self.new_node = objects.Node(name=node,
2739
                                   primary_ip=primary_ip,
2740
                                   secondary_ip=secondary_ip,
2741
                                   master_candidate=self.master_candidate,
2742
                                   offline=False, drained=False)
2743

    
2744
  def Exec(self, feedback_fn):
2745
    """Adds the new node to the cluster.
2746

2747
    """
2748
    new_node = self.new_node
2749
    node = new_node.name
2750

    
2751
    # for re-adds, reset the offline/drained/master-candidate flags;
2752
    # we need to reset here, otherwise offline would prevent RPC calls
2753
    # later in the procedure; this also means that if the re-add
2754
    # fails, we are left with a non-offlined, broken node
2755
    if self.op.readd:
2756
      new_node.drained = new_node.offline = False
2757
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2758
      # if we demote the node, we do cleanup later in the procedure
2759
      new_node.master_candidate = self.master_candidate
2760

    
2761
    # notify the user about any possible mc promotion
2762
    if new_node.master_candidate:
2763
      self.LogInfo("Node will be a master candidate")
2764

    
2765
    # check connectivity
2766
    result = self.rpc.call_version([node])[node]
2767
    result.Raise("Can't get version information from node %s" % node)
2768
    if constants.PROTOCOL_VERSION == result.payload:
2769
      logging.info("Communication to node %s fine, sw version %s match",
2770
                   node, result.payload)
2771
    else:
2772
      raise errors.OpExecError("Version mismatch master version %s,"
2773
                               " node version %s" %
2774
                               (constants.PROTOCOL_VERSION, result.payload))
2775

    
2776
    # setup ssh on node
2777
    logging.info("Copy ssh key to node %s", node)
2778
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2779
    keyarray = []
2780
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2781
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2782
                priv_key, pub_key]
2783

    
2784
    for i in keyfiles:
2785
      f = open(i, 'r')
2786
      try:
2787
        keyarray.append(f.read())
2788
      finally:
2789
        f.close()
2790

    
2791
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2792
                                    keyarray[2],
2793
                                    keyarray[3], keyarray[4], keyarray[5])
2794
    result.Raise("Cannot transfer ssh keys to the new node")
2795

    
2796
    # Add node to our /etc/hosts, and add key to known_hosts
2797
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2798
      utils.AddHostToEtcHosts(new_node.name)
2799

    
2800
    if new_node.secondary_ip != new_node.primary_ip:
2801
      result = self.rpc.call_node_has_ip_address(new_node.name,
2802
                                                 new_node.secondary_ip)
2803
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2804
                   prereq=True)
2805
      if not result.payload:
2806
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2807
                                 " you gave (%s). Please fix and re-run this"
2808
                                 " command." % new_node.secondary_ip)
2809

    
2810
    node_verify_list = [self.cfg.GetMasterNode()]
2811
    node_verify_param = {
2812
      'nodelist': [node],
2813
      # TODO: do a node-net-test as well?
2814
    }
2815

    
2816
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2817
                                       self.cfg.GetClusterName())
2818
    for verifier in node_verify_list:
2819
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2820
      nl_payload = result[verifier].payload['nodelist']
2821
      if nl_payload:
2822
        for failed in nl_payload:
2823
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2824
                      (verifier, nl_payload[failed]))
2825
        raise errors.OpExecError("ssh/hostname verification failed.")
2826

    
2827
    if self.op.readd:
2828
      _RedistributeAncillaryFiles(self)
2829
      self.context.ReaddNode(new_node)
2830
      # make sure we redistribute the config
2831
      self.cfg.Update(new_node)
2832
      # and make sure the new node will not have old files around
2833
      if not new_node.master_candidate:
2834
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2835
        msg = result.RemoteFailMsg()
2836
        if msg:
2837
          self.LogWarning("Node failed to demote itself from master"
2838
                          " candidate status: %s" % msg)
2839
    else:
2840
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2841
      self.context.AddNode(new_node)
2842

    
2843

    
2844
class LUSetNodeParams(LogicalUnit):
2845
  """Modifies the parameters of a node.
2846

2847
  """
2848
  HPATH = "node-modify"
2849
  HTYPE = constants.HTYPE_NODE
2850
  _OP_REQP = ["node_name"]
2851
  REQ_BGL = False
2852

    
2853
  def CheckArguments(self):
2854
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2855
    if node_name is None:
2856
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2857
    self.op.node_name = node_name
2858
    _CheckBooleanOpField(self.op, 'master_candidate')
2859
    _CheckBooleanOpField(self.op, 'offline')
2860
    _CheckBooleanOpField(self.op, 'drained')
2861
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2862
    if all_mods.count(None) == 3:
2863
      raise errors.OpPrereqError("Please pass at least one modification")
2864
    if all_mods.count(True) > 1:
2865
      raise errors.OpPrereqError("Can't set the node into more than one"
2866
                                 " state at the same time")
2867

    
2868
  def ExpandNames(self):
2869
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2870

    
2871
  def BuildHooksEnv(self):
2872
    """Build hooks env.
2873

2874
    This runs on the master node.
2875

2876
    """
2877
    env = {
2878
      "OP_TARGET": self.op.node_name,
2879
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2880
      "OFFLINE": str(self.op.offline),
2881
      "DRAINED": str(self.op.drained),
2882
      }
2883
    nl = [self.cfg.GetMasterNode(),
2884
          self.op.node_name]
2885
    return env, nl, nl
2886

    
2887
  def CheckPrereq(self):
2888
    """Check prerequisites.
2889

2890
    This only checks the instance list against the existing names.
2891

2892
    """
2893
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2894

    
2895
    if ((self.op.master_candidate == False or self.op.offline == True or
2896
         self.op.drained == True) and node.master_candidate):
2897
      # we will demote the node from master_candidate
2898
      if self.op.node_name == self.cfg.GetMasterNode():
2899
        raise errors.OpPrereqError("The master node has to be a"
2900
                                   " master candidate, online and not drained")
2901
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2902
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2903
      if num_candidates <= cp_size:
2904
        msg = ("Not enough master candidates (desired"
2905
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2906
        if self.op.force:
2907
          self.LogWarning(msg)
2908
        else:
2909
          raise errors.OpPrereqError(msg)
2910

    
2911
    if (self.op.master_candidate == True and
2912
        ((node.offline and not self.op.offline == False) or
2913
         (node.drained and not self.op.drained == False))):
2914
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2915
                                 " to master_candidate" % node.name)
2916

    
2917
    return
2918

    
2919
  def Exec(self, feedback_fn):
2920
    """Modifies a node.
2921

2922
    """
2923
    node = self.node
2924

    
2925
    result = []
2926
    changed_mc = False
2927

    
2928
    if self.op.offline is not None:
2929
      node.offline = self.op.offline
2930
      result.append(("offline", str(self.op.offline)))
2931
      if self.op.offline == True:
2932
        if node.master_candidate:
2933
          node.master_candidate = False
2934
          changed_mc = True
2935
          result.append(("master_candidate", "auto-demotion due to offline"))
2936
        if node.drained:
2937
          node.drained = False
2938
          result.append(("drained", "clear drained status due to offline"))
2939

    
2940
    if self.op.master_candidate is not None:
2941
      node.master_candidate = self.op.master_candidate
2942
      changed_mc = True
2943
      result.append(("master_candidate", str(self.op.master_candidate)))
2944
      if self.op.master_candidate == False:
2945
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2946
        msg = rrc.fail_msg
2947
        if msg:
2948
          self.LogWarning("Node failed to demote itself: %s" % msg)
2949

    
2950
    if self.op.drained is not None:
2951
      node.drained = self.op.drained
2952
      result.append(("drained", str(self.op.drained)))
2953
      if self.op.drained == True:
2954
        if node.master_candidate:
2955
          node.master_candidate = False
2956
          changed_mc = True
2957
          result.append(("master_candidate", "auto-demotion due to drain"))
2958
          rrc = self.rpc.call_node_demote_from_mc(node.name)
2959
          msg = rrc.RemoteFailMsg()
2960
          if msg:
2961
            self.LogWarning("Node failed to demote itself: %s" % msg)
2962
        if node.offline:
2963
          node.offline = False
2964
          result.append(("offline", "clear offline status due to drain"))
2965

    
2966
    # this will trigger configuration file update, if needed
2967
    self.cfg.Update(node)
2968
    # this will trigger job queue propagation or cleanup
2969
    if changed_mc:
2970
      self.context.ReaddNode(node)
2971

    
2972
    return result
2973

    
2974

    
2975
class LUPowercycleNode(NoHooksLU):
2976
  """Powercycles a node.
2977

2978
  """
2979
  _OP_REQP = ["node_name", "force"]
2980
  REQ_BGL = False
2981

    
2982
  def CheckArguments(self):
2983
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2984
    if node_name is None:
2985
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2986
    self.op.node_name = node_name
2987
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
2988
      raise errors.OpPrereqError("The node is the master and the force"
2989
                                 " parameter was not set")
2990

    
2991
  def ExpandNames(self):
2992
    """Locking for PowercycleNode.
2993

2994
    This is a last-resort option and shouldn't block on other
2995
    jobs. Therefore, we grab no locks.
2996

2997
    """
2998
    self.needed_locks = {}
2999

    
3000
  def CheckPrereq(self):
3001
    """Check prerequisites.
3002

3003
    This LU has no prereqs.
3004

3005
    """
3006
    pass
3007

    
3008
  def Exec(self, feedback_fn):
3009
    """Reboots a node.
3010

3011
    """
3012
    result = self.rpc.call_node_powercycle(self.op.node_name,
3013
                                           self.cfg.GetHypervisorType())
3014
    result.Raise("Failed to schedule the reboot")
3015
    return result.payload
3016

    
3017

    
3018
class LUQueryClusterInfo(NoHooksLU):
3019
  """Query cluster configuration.
3020

3021
  """
3022
  _OP_REQP = []
3023
  REQ_BGL = False
3024

    
3025
  def ExpandNames(self):
3026
    self.needed_locks = {}
3027

    
3028
  def CheckPrereq(self):
3029
    """No prerequsites needed for this LU.
3030

3031
    """
3032
    pass
3033

    
3034
  def Exec(self, feedback_fn):
3035
    """Return cluster config.
3036

3037
    """
3038
    cluster = self.cfg.GetClusterInfo()
3039
    result = {
3040
      "software_version": constants.RELEASE_VERSION,
3041
      "protocol_version": constants.PROTOCOL_VERSION,
3042
      "config_version": constants.CONFIG_VERSION,
3043
      "os_api_version": max(constants.OS_API_VERSIONS),
3044
      "export_version": constants.EXPORT_VERSION,
3045
      "architecture": (platform.architecture()[0], platform.machine()),
3046
      "name": cluster.cluster_name,
3047
      "master": cluster.master_node,
3048
      "default_hypervisor": cluster.enabled_hypervisors[0],
3049
      "enabled_hypervisors": cluster.enabled_hypervisors,
3050
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3051
                        for hypervisor_name in cluster.enabled_hypervisors]),
3052
      "beparams": cluster.beparams,
3053
      "nicparams": cluster.nicparams,
3054
      "candidate_pool_size": cluster.candidate_pool_size,
3055
      "master_netdev": cluster.master_netdev,
3056
      "volume_group_name": cluster.volume_group_name,
3057
      "file_storage_dir": cluster.file_storage_dir,
3058
      "ctime": cluster.ctime,
3059
      "mtime": cluster.mtime,
3060
      }
3061

    
3062
    return result
3063

    
3064

    
3065
class LUQueryConfigValues(NoHooksLU):
3066
  """Return configuration values.
3067

3068
  """
3069
  _OP_REQP = []
3070
  REQ_BGL = False
3071
  _FIELDS_DYNAMIC = utils.FieldSet()
3072
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
3073

    
3074
  def ExpandNames(self):
3075
    self.needed_locks = {}
3076

    
3077
    _CheckOutputFields(static=self._FIELDS_STATIC,
3078
                       dynamic=self._FIELDS_DYNAMIC,
3079
                       selected=self.op.output_fields)
3080

    
3081
  def CheckPrereq(self):
3082
    """No prerequisites.
3083

3084
    """
3085
    pass
3086

    
3087
  def Exec(self, feedback_fn):
3088
    """Dump a representation of the cluster config to the standard output.
3089

3090
    """
3091
    values = []
3092
    for field in self.op.output_fields:
3093
      if field == "cluster_name":
3094
        entry = self.cfg.GetClusterName()
3095
      elif field == "master_node":
3096
        entry = self.cfg.GetMasterNode()
3097
      elif field == "drain_flag":
3098
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3099
      else:
3100
        raise errors.ParameterError(field)
3101
      values.append(entry)
3102
    return values
3103

    
3104

    
3105
class LUActivateInstanceDisks(NoHooksLU):
3106
  """Bring up an instance's disks.
3107

3108
  """
3109
  _OP_REQP = ["instance_name"]
3110
  REQ_BGL = False
3111

    
3112
  def ExpandNames(self):
3113
    self._ExpandAndLockInstance()
3114
    self.needed_locks[locking.LEVEL_NODE] = []
3115
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3116

    
3117
  def DeclareLocks(self, level):
3118
    if level == locking.LEVEL_NODE:
3119
      self._LockInstancesNodes()
3120

    
3121
  def CheckPrereq(self):
3122
    """Check prerequisites.
3123

3124
    This checks that the instance is in the cluster.
3125

3126
    """
3127
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3128
    assert self.instance is not None, \
3129
      "Cannot retrieve locked instance %s" % self.op.instance_name
3130
    _CheckNodeOnline(self, self.instance.primary_node)
3131
    if not hasattr(self.op, "ignore_size"):
3132
      self.op.ignore_size = False
3133

    
3134
  def Exec(self, feedback_fn):
3135
    """Activate the disks.
3136

3137
    """
3138
    disks_ok, disks_info = \
3139
              _AssembleInstanceDisks(self, self.instance,
3140
                                     ignore_size=self.op.ignore_size)
3141
    if not disks_ok:
3142
      raise errors.OpExecError("Cannot activate block devices")
3143

    
3144
    return disks_info
3145

    
3146

    
3147
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3148
                           ignore_size=False):
3149
  """Prepare the block devices for an instance.
3150

3151
  This sets up the block devices on all nodes.
3152

3153
  @type lu: L{LogicalUnit}
3154
  @param lu: the logical unit on whose behalf we execute
3155
  @type instance: L{objects.Instance}
3156
  @param instance: the instance for whose disks we assemble
3157
  @type ignore_secondaries: boolean
3158
  @param ignore_secondaries: if true, errors on secondary nodes
3159
      won't result in an error return from the function
3160
  @type ignore_size: boolean
3161
  @param ignore_size: if true, the current known size of the disk
3162
      will not be used during the disk activation, useful for cases
3163
      when the size is wrong
3164
  @return: False if the operation failed, otherwise a list of
3165
      (host, instance_visible_name, node_visible_name)
3166
      with the mapping from node devices to instance devices
3167

3168
  """
3169
  device_info = []
3170
  disks_ok = True
3171
  iname = instance.name
3172
  # With the two passes mechanism we try to reduce the window of
3173
  # opportunity for the race condition of switching DRBD to primary
3174
  # before handshaking occured, but we do not eliminate it
3175

    
3176
  # The proper fix would be to wait (with some limits) until the
3177
  # connection has been made and drbd transitions from WFConnection
3178
  # into any other network-connected state (Connected, SyncTarget,
3179
  # SyncSource, etc.)
3180

    
3181
  # 1st pass, assemble on all nodes in secondary mode
3182
  for inst_disk in instance.disks:
3183
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3184
      if ignore_size:
3185
        node_disk = node_disk.Copy()
3186
        node_disk.UnsetSize()
3187
      lu.cfg.SetDiskID(node_disk, node)
3188
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3189
      msg = result.fail_msg
3190
      if msg:
3191
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3192
                           " (is_primary=False, pass=1): %s",
3193
                           inst_disk.iv_name, node, msg)
3194
        if not ignore_secondaries:
3195
          disks_ok = False
3196

    
3197
  # FIXME: race condition on drbd migration to primary
3198

    
3199
  # 2nd pass, do only the primary node
3200
  for inst_disk in instance.disks:
3201
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3202
      if node != instance.primary_node:
3203
        continue
3204
      if ignore_size:
3205
        node_disk = node_disk.Copy()
3206
        node_disk.UnsetSize()
3207
      lu.cfg.SetDiskID(node_disk, node)
3208
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3209
      msg = result.fail_msg
3210
      if msg:
3211
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3212
                           " (is_primary=True, pass=2): %s",
3213
                           inst_disk.iv_name, node, msg)
3214
        disks_ok = False
3215
    device_info.append((instance.primary_node, inst_disk.iv_name,
3216
                        result.payload))
3217

    
3218
  # leave the disks configured for the primary node
3219
  # this is a workaround that would be fixed better by
3220
  # improving the logical/physical id handling
3221
  for disk in instance.disks:
3222
    lu.cfg.SetDiskID(disk, instance.primary_node)
3223

    
3224
  return disks_ok, device_info
3225

    
3226

    
3227
def _StartInstanceDisks(lu, instance, force):
3228
  """Start the disks of an instance.
3229

3230
  """
3231
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3232
                                           ignore_secondaries=force)
3233
  if not disks_ok:
3234
    _ShutdownInstanceDisks(lu, instance)
3235
    if force is not None and not force:
3236
      lu.proc.LogWarning("", hint="If the message above refers to a"
3237
                         " secondary node,"
3238
                         " you can retry the operation using '--force'.")
3239
    raise errors.OpExecError("Disk consistency error")
3240

    
3241

    
3242
class LUDeactivateInstanceDisks(NoHooksLU):
3243
  """Shutdown an instance's disks.
3244

3245
  """
3246
  _OP_REQP = ["instance_name"]
3247
  REQ_BGL = False
3248

    
3249
  def ExpandNames(self):
3250
    self._ExpandAndLockInstance()
3251
    self.needed_locks[locking.LEVEL_NODE] = []
3252
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3253

    
3254
  def DeclareLocks(self, level):
3255
    if level == locking.LEVEL_NODE:
3256
      self._LockInstancesNodes()
3257

    
3258
  def CheckPrereq(self):
3259
    """Check prerequisites.
3260

3261
    This checks that the instance is in the cluster.
3262

3263
    """
3264
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3265
    assert self.instance is not None, \
3266
      "Cannot retrieve locked instance %s" % self.op.instance_name
3267

    
3268
  def Exec(self, feedback_fn):
3269
    """Deactivate the disks
3270

3271
    """
3272
    instance = self.instance
3273
    _SafeShutdownInstanceDisks(self, instance)
3274

    
3275

    
3276
def _SafeShutdownInstanceDisks(lu, instance):
3277
  """Shutdown block devices of an instance.
3278

3279
  This function checks if an instance is running, before calling
3280
  _ShutdownInstanceDisks.
3281

3282
  """
3283
  pnode = instance.primary_node
3284
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3285
  ins_l.Raise("Can't contact node %s" % pnode)
3286

    
3287
  if instance.name in ins_l.payload:
3288
    raise errors.OpExecError("Instance is running, can't shutdown"
3289
                             " block devices.")
3290

    
3291
  _ShutdownInstanceDisks(lu, instance)
3292

    
3293

    
3294
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3295
  """Shutdown block devices of an instance.
3296

3297
  This does the shutdown on all nodes of the instance.
3298

3299
  If the ignore_primary is false, errors on the primary node are
3300
  ignored.
3301

3302
  """
3303
  all_result = True
3304
  for disk in instance.disks:
3305
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3306
      lu.cfg.SetDiskID(top_disk, node)
3307
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3308
      msg = result.fail_msg
3309
      if msg:
3310
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3311
                      disk.iv_name, node, msg)
3312
        if not ignore_primary or node != instance.primary_node:
3313
          all_result = False
3314
  return all_result
3315

    
3316

    
3317
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3318
  """Checks if a node has enough free memory.
3319

3320
  This function check if a given node has the needed amount of free
3321
  memory. In case the node has less memory or we cannot get the
3322
  information from the node, this function raise an OpPrereqError
3323
  exception.
3324

3325
  @type lu: C{LogicalUnit}
3326
  @param lu: a logical unit from which we get configuration data
3327
  @type node: C{str}
3328
  @param node: the node to check
3329
  @type reason: C{str}
3330
  @param reason: string to use in the error message
3331
  @type requested: C{int}
3332
  @param requested: the amount of memory in MiB to check for
3333
  @type hypervisor_name: C{str}
3334
  @param hypervisor_name: the hypervisor to ask for memory stats
3335
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3336
      we cannot check the node
3337

3338
  """
3339
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3340
  nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3341
  free_mem = nodeinfo[node].payload.get('memory_free', None)
3342
  if not isinstance(free_mem, int):
3343
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3344
                               " was '%s'" % (node, free_mem))
3345
  if requested > free_mem:
3346
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3347
                               " needed %s MiB, available %s MiB" %
3348
                               (node, reason, requested, free_mem))
3349

    
3350

    
3351
class LUStartupInstance(LogicalUnit):
3352
  """Starts an instance.
3353

3354
  """
3355
  HPATH = "instance-start"
3356
  HTYPE = constants.HTYPE_INSTANCE
3357
  _OP_REQP = ["instance_name", "force"]
3358
  REQ_BGL = False
3359

    
3360
  def ExpandNames(self):
3361
    self._ExpandAndLockInstance()
3362

    
3363
  def BuildHooksEnv(self):
3364
    """Build hooks env.
3365

3366
    This runs on master, primary and secondary nodes of the instance.
3367

3368
    """
3369
    env = {
3370
      "FORCE": self.op.force,
3371
      }
3372
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3373
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3374
    return env, nl, nl
3375

    
3376
  def CheckPrereq(self):
3377
    """Check prerequisites.
3378

3379
    This checks that the instance is in the cluster.
3380

3381
    """
3382
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3383
    assert self.instance is not None, \
3384
      "Cannot retrieve locked instance %s" % self.op.instance_name
3385

    
3386
    # extra beparams
3387
    self.beparams = getattr(self.op, "beparams", {})
3388
    if self.beparams:
3389
      if not isinstance(self.beparams, dict):
3390
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3391
                                   " dict" % (type(self.beparams), ))
3392
      # fill the beparams dict
3393
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3394
      self.op.beparams = self.beparams
3395

    
3396
    # extra hvparams
3397
    self.hvparams = getattr(self.op, "hvparams", {})
3398
    if self.hvparams:
3399
      if not isinstance(self.hvparams, dict):
3400
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3401
                                   " dict" % (type(self.hvparams), ))
3402

    
3403
      # check hypervisor parameter syntax (locally)
3404
      cluster = self.cfg.GetClusterInfo()
3405
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3406
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3407
                                    instance.hvparams)
3408
      filled_hvp.update(self.hvparams)
3409
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3410
      hv_type.CheckParameterSyntax(filled_hvp)
3411
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3412
      self.op.hvparams = self.hvparams
3413

    
3414
    _CheckNodeOnline(self, instance.primary_node)
3415

    
3416
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3417
    # check bridges existence
3418
    _CheckInstanceBridgesExist(self, instance)
3419

    
3420
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3421
                                              instance.name,
3422
                                              instance.hypervisor)
3423
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3424
                      prereq=True)
3425
    if not remote_info.payload: # not running already
3426
      _CheckNodeFreeMemory(self, instance.primary_node,
3427
                           "starting instance %s" % instance.name,
3428
                           bep[constants.BE_MEMORY], instance.hypervisor)
3429

    
3430
  def Exec(self, feedback_fn):
3431
    """Start the instance.
3432

3433
    """
3434
    instance = self.instance
3435
    force = self.op.force
3436

    
3437
    self.cfg.MarkInstanceUp(instance.name)
3438

    
3439
    node_current = instance.primary_node
3440

    
3441
    _StartInstanceDisks(self, instance, force)
3442

    
3443
    result = self.rpc.call_instance_start(node_current, instance,
3444
                                          self.hvparams, self.beparams)
3445
    msg = result.fail_msg
3446
    if msg:
3447
      _ShutdownInstanceDisks(self, instance)
3448
      raise errors.OpExecError("Could not start instance: %s" % msg)
3449

    
3450

    
3451
class LURebootInstance(LogicalUnit):
3452
  """Reboot an instance.
3453

3454
  """
3455
  HPATH = "instance-reboot"
3456
  HTYPE = constants.HTYPE_INSTANCE
3457
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3458
  REQ_BGL = False
3459

    
3460
  def ExpandNames(self):
3461
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3462
                                   constants.INSTANCE_REBOOT_HARD,
3463
                                   constants.INSTANCE_REBOOT_FULL]:
3464
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3465
                                  (constants.INSTANCE_REBOOT_SOFT,
3466
                                   constants.INSTANCE_REBOOT_HARD,
3467
                                   constants.INSTANCE_REBOOT_FULL))
3468
    self._ExpandAndLockInstance()
3469

    
3470
  def BuildHooksEnv(self):
3471
    """Build hooks env.
3472

3473
    This runs on master, primary and secondary nodes of the instance.
3474

3475
    """
3476
    env = {
3477
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3478
      "REBOOT_TYPE": self.op.reboot_type,
3479
      }
3480
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3481
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3482
    return env, nl, nl
3483

    
3484
  def CheckPrereq(self):
3485
    """Check prerequisites.
3486

3487
    This checks that the instance is in the cluster.
3488

3489
    """
3490
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3491
    assert self.instance is not None, \
3492
      "Cannot retrieve locked instance %s" % self.op.instance_name
3493

    
3494
    _CheckNodeOnline(self, instance.primary_node)
3495

    
3496
    # check bridges existence
3497
    _CheckInstanceBridgesExist(self, instance)
3498

    
3499
  def Exec(self, feedback_fn):
3500
    """Reboot the instance.
3501

3502
    """
3503
    instance = self.instance
3504
    ignore_secondaries = self.op.ignore_secondaries
3505
    reboot_type = self.op.reboot_type
3506

    
3507
    node_current = instance.primary_node
3508

    
3509
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3510
                       constants.INSTANCE_REBOOT_HARD]:
3511
      for disk in instance.disks:
3512
        self.cfg.SetDiskID(disk, node_current)
3513
      result = self.rpc.call_instance_reboot(node_current, instance,
3514
                                             reboot_type)
3515
      result.Raise("Could not reboot instance")
3516
    else:
3517
      result = self.rpc.call_instance_shutdown(node_current, instance)
3518
      result.Raise("Could not shutdown instance for full reboot")
3519
      _ShutdownInstanceDisks(self, instance)
3520
      _StartInstanceDisks(self, instance, ignore_secondaries)
3521
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3522
      msg = result.fail_msg
3523
      if msg:
3524
        _ShutdownInstanceDisks(self, instance)
3525
        raise errors.OpExecError("Could not start instance for"
3526
                                 " full reboot: %s" % msg)
3527

    
3528
    self.cfg.MarkInstanceUp(instance.name)
3529

    
3530

    
3531
class LUShutdownInstance(LogicalUnit):
3532
  """Shutdown an instance.
3533

3534
  """
3535
  HPATH = "instance-stop"
3536
  HTYPE = constants.HTYPE_INSTANCE
3537
  _OP_REQP = ["instance_name"]
3538
  REQ_BGL = False
3539

    
3540
  def ExpandNames(self):
3541
    self._ExpandAndLockInstance()
3542

    
3543
  def BuildHooksEnv(self):
3544
    """Build hooks env.
3545

3546
    This runs on master, primary and secondary nodes of the instance.
3547

3548
    """
3549
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3550
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3551
    return env, nl, nl
3552

    
3553
  def CheckPrereq(self):
3554
    """Check prerequisites.
3555

3556
    This checks that the instance is in the cluster.
3557

3558
    """
3559
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3560
    assert self.instance is not None, \
3561
      "Cannot retrieve locked instance %s" % self.op.instance_name
3562
    _CheckNodeOnline(self, self.instance.primary_node)
3563

    
3564
  def Exec(self, feedback_fn):
3565
    """Shutdown the instance.
3566

3567
    """
3568
    instance = self.instance
3569
    node_current = instance.primary_node
3570
    self.cfg.MarkInstanceDown(instance.name)
3571
    result = self.rpc.call_instance_shutdown(node_current, instance)
3572
    msg = result.fail_msg
3573
    if msg:
3574
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3575

    
3576
    _ShutdownInstanceDisks(self, instance)
3577

    
3578

    
3579
class LUReinstallInstance(LogicalUnit):
3580
  """Reinstall an instance.
3581

3582
  """
3583
  HPATH = "instance-reinstall"
3584
  HTYPE = constants.HTYPE_INSTANCE
3585
  _OP_REQP = ["instance_name"]
3586
  REQ_BGL = False
3587

    
3588
  def ExpandNames(self):
3589
    self._ExpandAndLockInstance()
3590

    
3591
  def BuildHooksEnv(self):
3592
    """Build hooks env.
3593

3594
    This runs on master, primary and secondary nodes of the instance.
3595

3596
    """
3597
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3598
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3599
    return env, nl, nl
3600

    
3601
  def CheckPrereq(self):
3602
    """Check prerequisites.
3603

3604
    This checks that the instance is in the cluster and is not running.
3605

3606
    """
3607
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3608
    assert instance is not None, \
3609
      "Cannot retrieve locked instance %s" % self.op.instance_name
3610
    _CheckNodeOnline(self, instance.primary_node)
3611

    
3612
    if instance.disk_template == constants.DT_DISKLESS:
3613
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3614
                                 self.op.instance_name)
3615
    if instance.admin_up:
3616
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3617
                                 self.op.instance_name)
3618
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3619
                                              instance.name,
3620
                                              instance.hypervisor)
3621
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3622
                      prereq=True)
3623
    if remote_info.payload:
3624
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3625
                                 (self.op.instance_name,
3626
                                  instance.primary_node))
3627

    
3628
    self.op.os_type = getattr(self.op, "os_type", None)
3629
    if self.op.os_type is not None:
3630
      # OS verification
3631
      pnode = self.cfg.GetNodeInfo(
3632
        self.cfg.ExpandNodeName(instance.primary_node))
3633
      if pnode is None:
3634
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3635
                                   self.op.pnode)
3636
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3637
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3638
                   (self.op.os_type, pnode.name), prereq=True)
3639

    
3640
    self.instance = instance
3641

    
3642
  def Exec(self, feedback_fn):
3643
    """Reinstall the instance.
3644

3645
    """
3646
    inst = self.instance
3647

    
3648
    if self.op.os_type is not None:
3649
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3650
      inst.os = self.op.os_type
3651
      self.cfg.Update(inst)
3652

    
3653
    _StartInstanceDisks(self, inst, None)
3654
    try:
3655
      feedback_fn("Running the instance OS create scripts...")
3656
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3657
      result.Raise("Could not install OS for instance %s on node %s" %
3658
                   (inst.name, inst.primary_node))
3659
    finally:
3660
      _ShutdownInstanceDisks(self, inst)
3661

    
3662

    
3663
class LURecreateInstanceDisks(LogicalUnit):
3664
  """Recreate an instance's missing disks.
3665

3666
  """
3667
  HPATH = "instance-recreate-disks"
3668
  HTYPE = constants.HTYPE_INSTANCE
3669
  _OP_REQP = ["instance_name", "disks"]
3670
  REQ_BGL = False
3671

    
3672
  def CheckArguments(self):
3673
    """Check the arguments.
3674

3675
    """
3676
    if not isinstance(self.op.disks, list):
3677
      raise errors.OpPrereqError("Invalid disks parameter")
3678
    for item in self.op.disks:
3679
      if (not isinstance(item, int) or
3680
          item < 0):
3681
        raise errors.OpPrereqError("Invalid disk specification '%s'" %
3682
                                   str(item))
3683

    
3684
  def ExpandNames(self):
3685
    self._ExpandAndLockInstance()
3686

    
3687
  def BuildHooksEnv(self):
3688
    """Build hooks env.
3689

3690
    This runs on master, primary and secondary nodes of the instance.
3691

3692
    """
3693
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3694
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3695
    return env, nl, nl
3696

    
3697
  def CheckPrereq(self):
3698
    """Check prerequisites.
3699

3700
    This checks that the instance is in the cluster and is not running.
3701

3702
    """
3703
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3704
    assert instance is not None, \
3705
      "Cannot retrieve locked instance %s" % self.op.instance_name
3706
    _CheckNodeOnline(self, instance.primary_node)
3707

    
3708
    if instance.disk_template == constants.DT_DISKLESS:
3709
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3710
                                 self.op.instance_name)
3711
    if instance.admin_up:
3712
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3713
                                 self.op.instance_name)
3714
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3715
                                              instance.name,
3716
                                              instance.hypervisor)
3717
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3718
                      prereq=True)
3719
    if remote_info.payload:
3720
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3721
                                 (self.op.instance_name,
3722
                                  instance.primary_node))
3723

    
3724
    if not self.op.disks:
3725
      self.op.disks = range(len(instance.disks))
3726
    else:
3727
      for idx in self.op.disks:
3728
        if idx >= len(instance.disks):
3729
          raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3730

    
3731
    self.instance = instance
3732

    
3733
  def Exec(self, feedback_fn):
3734
    """Recreate the disks.
3735

3736
    """
3737
    to_skip = []
3738
    for idx, disk in enumerate(self.instance.disks):
3739
      if idx not in self.op.disks: # disk idx has not been passed in
3740
        to_skip.append(idx)
3741
        continue
3742

    
3743
    _CreateDisks(self, self.instance, to_skip=to_skip)
3744

    
3745

    
3746
class LURenameInstance(LogicalUnit):
3747
  """Rename an instance.
3748

3749
  """
3750
  HPATH = "instance-rename"
3751
  HTYPE = constants.HTYPE_INSTANCE
3752
  _OP_REQP = ["instance_name", "new_name"]
3753

    
3754
  def BuildHooksEnv(self):
3755
    """Build hooks env.
3756

3757
    This runs on master, primary and secondary nodes of the instance.
3758

3759
    """
3760
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3761
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3762
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3763
    return env, nl, nl
3764

    
3765
  def CheckPrereq(self):
3766
    """Check prerequisites.
3767

3768
    This checks that the instance is in the cluster and is not running.
3769

3770
    """
3771
    instance = self.cfg.GetInstanceInfo(
3772
      self.cfg.ExpandInstanceName(self.op.instance_name))
3773
    if instance is None:
3774
      raise errors.OpPrereqError("Instance '%s' not known" %
3775
                                 self.op.instance_name)
3776
    _CheckNodeOnline(self, instance.primary_node)
3777

    
3778
    if instance.admin_up:
3779
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3780
                                 self.op.instance_name)
3781
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3782
                                              instance.name,
3783
                                              instance.hypervisor)
3784
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3785
                      prereq=True)
3786
    if remote_info.payload:
3787
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3788
                                 (self.op.instance_name,
3789
                                  instance.primary_node))
3790
    self.instance = instance
3791

    
3792
    # new name verification
3793
    name_info = utils.HostInfo(self.op.new_name)
3794

    
3795
    self.op.new_name = new_name = name_info.name
3796
    instance_list = self.cfg.GetInstanceList()
3797
    if new_name in instance_list:
3798
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3799
                                 new_name)
3800

    
3801
    if not getattr(self.op, "ignore_ip", False):
3802
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3803
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3804
                                   (name_info.ip, new_name))
3805

    
3806

    
3807
  def Exec(self, feedback_fn):
3808
    """Reinstall the instance.
3809

3810
    """
3811
    inst = self.instance
3812
    old_name = inst.name
3813

    
3814
    if inst.disk_template == constants.DT_FILE:
3815
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3816

    
3817
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3818
    # Change the instance lock. This is definitely safe while we hold the BGL
3819
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3820
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3821

    
3822
    # re-read the instance from the configuration after rename
3823
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3824

    
3825
    if inst.disk_template == constants.DT_FILE:
3826
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3827
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3828
                                                     old_file_storage_dir,
3829
                                                     new_file_storage_dir)
3830
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
3831
                   " (but the instance has been renamed in Ganeti)" %
3832
                   (inst.primary_node, old_file_storage_dir,
3833
                    new_file_storage_dir))
3834

    
3835
    _StartInstanceDisks(self, inst, None)
3836
    try:
3837
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3838
                                                 old_name)
3839
      msg = result.fail_msg
3840
      if msg:
3841
        msg = ("Could not run OS rename script for instance %s on node %s"
3842
               " (but the instance has been renamed in Ganeti): %s" %
3843
               (inst.name, inst.primary_node, msg))
3844
        self.proc.LogWarning(msg)
3845
    finally:
3846
      _ShutdownInstanceDisks(self, inst)
3847

    
3848

    
3849
class LURemoveInstance(LogicalUnit):
3850
  """Remove an instance.
3851

3852
  """
3853
  HPATH = "instance-remove"
3854
  HTYPE = constants.HTYPE_INSTANCE
3855
  _OP_REQP = ["instance_name", "ignore_failures"]
3856
  REQ_BGL = False
3857

    
3858
  def ExpandNames(self):
3859
    self._ExpandAndLockInstance()
3860
    self.needed_locks[locking.LEVEL_NODE] = []
3861
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3862

    
3863
  def DeclareLocks(self, level):
3864
    if level == locking.LEVEL_NODE:
3865
      self._LockInstancesNodes()
3866

    
3867
  def BuildHooksEnv(self):
3868
    """Build hooks env.
3869

3870
    This runs on master, primary and secondary nodes of the instance.
3871

3872
    """
3873
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3874
    nl = [self.cfg.GetMasterNode()]
3875
    return env, nl, nl
3876

    
3877
  def CheckPrereq(self):
3878
    """Check prerequisites.
3879

3880
    This checks that the instance is in the cluster.
3881

3882
    """
3883
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3884
    assert self.instance is not None, \
3885
      "Cannot retrieve locked instance %s" % self.op.instance_name
3886

    
3887
  def Exec(self, feedback_fn):
3888
    """Remove the instance.
3889

3890
    """
3891
    instance = self.instance
3892
    logging.info("Shutting down instance %s on node %s",
3893
                 instance.name, instance.primary_node)
3894

    
3895
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3896
    msg = result.fail_msg
3897
    if msg:
3898
      if self.op.ignore_failures:
3899
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3900
      else:
3901
        raise errors.OpExecError("Could not shutdown instance %s on"
3902
                                 " node %s: %s" %
3903
                                 (instance.name, instance.primary_node, msg))
3904

    
3905
    logging.info("Removing block devices for instance %s", instance.name)
3906

    
3907
    if not _RemoveDisks(self, instance):
3908
      if self.op.ignore_failures:
3909
        feedback_fn("Warning: can't remove instance's disks")
3910
      else:
3911
        raise errors.OpExecError("Can't remove instance's disks")
3912

    
3913
    logging.info("Removing instance %s out of cluster config", instance.name)
3914

    
3915
    self.cfg.RemoveInstance(instance.name)
3916
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3917

    
3918

    
3919
class LUQueryInstances(NoHooksLU):
3920
  """Logical unit for querying instances.
3921

3922
  """
3923
  _OP_REQP = ["output_fields", "names", "use_locking"]
3924
  REQ_BGL = False
3925
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3926
                                    "admin_state",
3927
                                    "disk_template", "ip", "mac", "bridge",
3928
                                    "nic_mode", "nic_link",
3929
                                    "sda_size", "sdb_size", "vcpus", "tags",
3930
                                    "network_port", "beparams",
3931
                                    r"(disk)\.(size)/([0-9]+)",
3932
                                    r"(disk)\.(sizes)", "disk_usage",
3933
                                    r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3934
                                    r"(nic)\.(bridge)/([0-9]+)",
3935
                                    r"(nic)\.(macs|ips|modes|links|bridges)",
3936
                                    r"(disk|nic)\.(count)",
3937
                                    "serial_no", "hypervisor", "hvparams",
3938
                                    "ctime", "mtime",
3939
                                    ] +
3940
                                  ["hv/%s" % name
3941
                                   for name in constants.HVS_PARAMETERS] +
3942
                                  ["be/%s" % name
3943
                                   for name in constants.BES_PARAMETERS])
3944
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3945

    
3946

    
3947
  def ExpandNames(self):
3948
    _CheckOutputFields(static=self._FIELDS_STATIC,
3949
                       dynamic=self._FIELDS_DYNAMIC,
3950
                       selected=self.op.output_fields)
3951

    
3952
    self.needed_locks = {}
3953
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3954
    self.share_locks[locking.LEVEL_NODE] = 1
3955

    
3956
    if self.op.names:
3957
      self.wanted = _GetWantedInstances(self, self.op.names)
3958
    else:
3959
      self.wanted = locking.ALL_SET
3960

    
3961
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3962
    self.do_locking = self.do_node_query and self.op.use_locking
3963
    if self.do_locking:
3964
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3965
      self.needed_locks[locking.LEVEL_NODE] = []
3966
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3967

    
3968
  def DeclareLocks(self, level):
3969
    if level == locking.LEVEL_NODE and self.do_locking:
3970
      self._LockInstancesNodes()
3971

    
3972
  def CheckPrereq(self):
3973
    """Check prerequisites.
3974

3975
    """
3976
    pass
3977

    
3978
  def Exec(self, feedback_fn):
3979
    """Computes the list of nodes and their attributes.
3980

3981
    """
3982
    all_info = self.cfg.GetAllInstancesInfo()
3983
    if self.wanted == locking.ALL_SET:
3984
      # caller didn't specify instance names, so ordering is not important
3985
      if self.do_locking:
3986
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3987
      else:
3988
        instance_names = all_info.keys()
3989
      instance_names = utils.NiceSort(instance_names)
3990
    else:
3991
      # caller did specify names, so we must keep the ordering
3992
      if self.do_locking:
3993
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3994
      else:
3995
        tgt_set = all_info.keys()
3996
      missing = set(self.wanted).difference(tgt_set)
3997
      if missing:
3998
        raise errors.OpExecError("Some instances were removed before"
3999
                                 " retrieving their data: %s" % missing)
4000
      instance_names = self.wanted
4001

    
4002