Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ b2c750a4

History | View | Annotate | Download (286.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq (except when tasklets are used)
51
    - implement Exec (except when tasklets are used)
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  @ivar dry_run_result: the value (if any) that will be returned to the caller
60
      in dry-run mode (signalled by opcode dry_run parameter)
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overridden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92
    self.LogStep = processor.LogStep
93
    # support for dry-run
94
    self.dry_run_result = None
95

    
96
    # Tasklets
97
    self.tasklets = None
98

    
99
    for attr_name in self._OP_REQP:
100
      attr_val = getattr(op, attr_name, None)
101
      if attr_val is None:
102
        raise errors.OpPrereqError("Required parameter '%s' missing" %
103
                                   attr_name)
104

    
105
    self.CheckArguments()
106

    
107
  def __GetSSH(self):
108
    """Returns the SshRunner object
109

110
    """
111
    if not self.__ssh:
112
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113
    return self.__ssh
114

    
115
  ssh = property(fget=__GetSSH)
116

    
117
  def CheckArguments(self):
118
    """Check syntactic validity for the opcode arguments.
119

120
    This method is for doing a simple syntactic check and ensure
121
    validity of opcode parameters, without any cluster-related
122
    checks. While the same can be accomplished in ExpandNames and/or
123
    CheckPrereq, doing these separate is better because:
124

125
      - ExpandNames is left as as purely a lock-related function
126
      - CheckPrereq is run after we have acquired locks (and possible
127
        waited for them)
128

129
    The function is allowed to change the self.op attribute so that
130
    later methods can no longer worry about missing parameters.
131

132
    """
133
    pass
134

    
135
  def ExpandNames(self):
136
    """Expand names for this LU.
137

138
    This method is called before starting to execute the opcode, and it should
139
    update all the parameters of the opcode to their canonical form (e.g. a
140
    short node name must be fully expanded after this method has successfully
141
    completed). This way locking, hooks, logging, ecc. can work correctly.
142

143
    LUs which implement this method must also populate the self.needed_locks
144
    member, as a dict with lock levels as keys, and a list of needed lock names
145
    as values. Rules:
146

147
      - use an empty dict if you don't need any lock
148
      - if you don't need any lock at a particular level omit that level
149
      - don't put anything for the BGL level
150
      - if you want all locks at a level use locking.ALL_SET as a value
151

152
    If you need to share locks (rather than acquire them exclusively) at one
153
    level you can modify self.share_locks, setting a true value (usually 1) for
154
    that level. By default locks are not shared.
155

156
    This function can also define a list of tasklets, which then will be
157
    executed in order instead of the usual LU-level CheckPrereq and Exec
158
    functions, if those are not defined by the LU.
159

160
    Examples::
161

162
      # Acquire all nodes and one instance
163
      self.needed_locks = {
164
        locking.LEVEL_NODE: locking.ALL_SET,
165
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
166
      }
167
      # Acquire just two nodes
168
      self.needed_locks = {
169
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
170
      }
171
      # Acquire no locks
172
      self.needed_locks = {} # No, you can't leave it to the default value None
173

174
    """
175
    # The implementation of this method is mandatory only if the new LU is
176
    # concurrent, so that old LUs don't need to be changed all at the same
177
    # time.
178
    if self.REQ_BGL:
179
      self.needed_locks = {} # Exclusive LUs don't need locks.
180
    else:
181
      raise NotImplementedError
182

    
183
  def DeclareLocks(self, level):
184
    """Declare LU locking needs for a level
185

186
    While most LUs can just declare their locking needs at ExpandNames time,
187
    sometimes there's the need to calculate some locks after having acquired
188
    the ones before. This function is called just before acquiring locks at a
189
    particular level, but after acquiring the ones at lower levels, and permits
190
    such calculations. It can be used to modify self.needed_locks, and by
191
    default it does nothing.
192

193
    This function is only called if you have something already set in
194
    self.needed_locks for the level.
195

196
    @param level: Locking level which is going to be locked
197
    @type level: member of ganeti.locking.LEVELS
198

199
    """
200

    
201
  def CheckPrereq(self):
202
    """Check prerequisites for this LU.
203

204
    This method should check that the prerequisites for the execution
205
    of this LU are fulfilled. It can do internode communication, but
206
    it should be idempotent - no cluster or system changes are
207
    allowed.
208

209
    The method should raise errors.OpPrereqError in case something is
210
    not fulfilled. Its return value is ignored.
211

212
    This method should also update all the parameters of the opcode to
213
    their canonical form if it hasn't been done by ExpandNames before.
214

215
    """
216
    if self.tasklets is not None:
217
      for (idx, tl) in enumerate(self.tasklets):
218
        logging.debug("Checking prerequisites for tasklet %s/%s",
219
                      idx + 1, len(self.tasklets))
220
        tl.CheckPrereq()
221
    else:
222
      raise NotImplementedError
223

    
224
  def Exec(self, feedback_fn):
225
    """Execute the LU.
226

227
    This method should implement the actual work. It should raise
228
    errors.OpExecError for failures that are somewhat dealt with in
229
    code, or expected.
230

231
    """
232
    if self.tasklets is not None:
233
      for (idx, tl) in enumerate(self.tasklets):
234
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
235
        tl.Exec(feedback_fn)
236
    else:
237
      raise NotImplementedError
238

    
239
  def BuildHooksEnv(self):
240
    """Build hooks environment for this LU.
241

242
    This method should return a three-node tuple consisting of: a dict
243
    containing the environment that will be used for running the
244
    specific hook for this LU, a list of node names on which the hook
245
    should run before the execution, and a list of node names on which
246
    the hook should run after the execution.
247

248
    The keys of the dict must not have 'GANETI_' prefixed as this will
249
    be handled in the hooks runner. Also note additional keys will be
250
    added by the hooks runner. If the LU doesn't define any
251
    environment, an empty dict (and not None) should be returned.
252

253
    No nodes should be returned as an empty list (and not None).
254

255
    Note that if the HPATH for a LU class is None, this function will
256
    not be called.
257

258
    """
259
    raise NotImplementedError
260

    
261
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262
    """Notify the LU about the results of its hooks.
263

264
    This method is called every time a hooks phase is executed, and notifies
265
    the Logical Unit about the hooks' result. The LU can then use it to alter
266
    its result based on the hooks.  By default the method does nothing and the
267
    previous result is passed back unchanged but any LU can define it if it
268
    wants to use the local cluster hook-scripts somehow.
269

270
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
271
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272
    @param hook_results: the results of the multi-node hooks rpc call
273
    @param feedback_fn: function used send feedback back to the caller
274
    @param lu_result: the previous Exec result this LU had, or None
275
        in the PRE phase
276
    @return: the new Exec result, based on the previous result
277
        and hook results
278

279
    """
280
    return lu_result
281

    
282
  def _ExpandAndLockInstance(self):
283
    """Helper function to expand and lock an instance.
284

285
    Many LUs that work on an instance take its name in self.op.instance_name
286
    and need to expand it and then declare the expanded name for locking. This
287
    function does it, and then updates self.op.instance_name to the expanded
288
    name. It also initializes needed_locks as a dict, if this hasn't been done
289
    before.
290

291
    """
292
    if self.needed_locks is None:
293
      self.needed_locks = {}
294
    else:
295
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296
        "_ExpandAndLockInstance called with instance-level locks set"
297
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298
    if expanded_name is None:
299
      raise errors.OpPrereqError("Instance '%s' not known" %
300
                                  self.op.instance_name)
301
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302
    self.op.instance_name = expanded_name
303

    
304
  def _LockInstancesNodes(self, primary_only=False):
305
    """Helper function to declare instances' nodes for locking.
306

307
    This function should be called after locking one or more instances to lock
308
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309
    with all primary or secondary nodes for instances already locked and
310
    present in self.needed_locks[locking.LEVEL_INSTANCE].
311

312
    It should be called from DeclareLocks, and for safety only works if
313
    self.recalculate_locks[locking.LEVEL_NODE] is set.
314

315
    In the future it may grow parameters to just lock some instance's nodes, or
316
    to just lock primaries or secondary nodes, if needed.
317

318
    If should be called in DeclareLocks in a way similar to::
319

320
      if level == locking.LEVEL_NODE:
321
        self._LockInstancesNodes()
322

323
    @type primary_only: boolean
324
    @param primary_only: only lock primary nodes of locked instances
325

326
    """
327
    assert locking.LEVEL_NODE in self.recalculate_locks, \
328
      "_LockInstancesNodes helper function called with no nodes to recalculate"
329

    
330
    # TODO: check if we're really been called with the instance locks held
331

    
332
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333
    # future we might want to have different behaviors depending on the value
334
    # of self.recalculate_locks[locking.LEVEL_NODE]
335
    wanted_nodes = []
336
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337
      instance = self.context.cfg.GetInstanceInfo(instance_name)
338
      wanted_nodes.append(instance.primary_node)
339
      if not primary_only:
340
        wanted_nodes.extend(instance.secondary_nodes)
341

    
342
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
346

    
347
    del self.recalculate_locks[locking.LEVEL_NODE]
348

    
349

    
350
class NoHooksLU(LogicalUnit):
351
  """Simple LU which runs no hooks.
352

353
  This LU is intended as a parent for other LogicalUnits which will
354
  run no hooks, in order to reduce duplicate code.
355

356
  """
357
  HPATH = None
358
  HTYPE = None
359

    
360

    
361
class Tasklet:
362
  """Tasklet base class.
363

364
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
366
  tasklets know nothing about locks.
367

368
  Subclasses must follow these rules:
369
    - Implement CheckPrereq
370
    - Implement Exec
371

372
  """
373
  def __init__(self, lu):
374
    self.lu = lu
375

    
376
    # Shortcuts
377
    self.cfg = lu.cfg
378
    self.rpc = lu.rpc
379

    
380
  def CheckPrereq(self):
381
    """Check prerequisites for this tasklets.
382

383
    This method should check whether the prerequisites for the execution of
384
    this tasklet are fulfilled. It can do internode communication, but it
385
    should be idempotent - no cluster or system changes are allowed.
386

387
    The method should raise errors.OpPrereqError in case something is not
388
    fulfilled. Its return value is ignored.
389

390
    This method should also update all parameters to their canonical form if it
391
    hasn't been done before.
392

393
    """
394
    raise NotImplementedError
395

    
396
  def Exec(self, feedback_fn):
397
    """Execute the tasklet.
398

399
    This method should implement the actual work. It should raise
400
    errors.OpExecError for failures that are somewhat dealt with in code, or
401
    expected.
402

403
    """
404
    raise NotImplementedError
405

    
406

    
407
def _GetWantedNodes(lu, nodes):
408
  """Returns list of checked and expanded node names.
409

410
  @type lu: L{LogicalUnit}
411
  @param lu: the logical unit on whose behalf we execute
412
  @type nodes: list
413
  @param nodes: list of node names or None for all nodes
414
  @rtype: list
415
  @return: the list of nodes, sorted
416
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
417

418
  """
419
  if not isinstance(nodes, list):
420
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
421

    
422
  if not nodes:
423
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424
      " non-empty list of nodes whose name is to be expanded.")
425

    
426
  wanted = []
427
  for name in nodes:
428
    node = lu.cfg.ExpandNodeName(name)
429
    if node is None:
430
      raise errors.OpPrereqError("No such node name '%s'" % name)
431
    wanted.append(node)
432

    
433
  return utils.NiceSort(wanted)
434

    
435

    
436
def _GetWantedInstances(lu, instances):
437
  """Returns list of checked and expanded instance names.
438

439
  @type lu: L{LogicalUnit}
440
  @param lu: the logical unit on whose behalf we execute
441
  @type instances: list
442
  @param instances: list of instance names or None for all instances
443
  @rtype: list
444
  @return: the list of instances, sorted
445
  @raise errors.OpPrereqError: if the instances parameter is wrong type
446
  @raise errors.OpPrereqError: if any of the passed instances is not found
447

448
  """
449
  if not isinstance(instances, list):
450
    raise errors.OpPrereqError("Invalid argument type 'instances'")
451

    
452
  if instances:
453
    wanted = []
454

    
455
    for name in instances:
456
      instance = lu.cfg.ExpandInstanceName(name)
457
      if instance is None:
458
        raise errors.OpPrereqError("No such instance name '%s'" % name)
459
      wanted.append(instance)
460

    
461
  else:
462
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
463
  return wanted
464

    
465

    
466
def _CheckOutputFields(static, dynamic, selected):
467
  """Checks whether all selected fields are valid.
468

469
  @type static: L{utils.FieldSet}
470
  @param static: static fields set
471
  @type dynamic: L{utils.FieldSet}
472
  @param dynamic: dynamic fields set
473

474
  """
475
  f = utils.FieldSet()
476
  f.Extend(static)
477
  f.Extend(dynamic)
478

    
479
  delta = f.NonMatching(selected)
480
  if delta:
481
    raise errors.OpPrereqError("Unknown output fields selected: %s"
482
                               % ",".join(delta))
483

    
484

    
485
def _CheckBooleanOpField(op, name):
486
  """Validates boolean opcode parameters.
487

488
  This will ensure that an opcode parameter is either a boolean value,
489
  or None (but that it always exists).
490

491
  """
492
  val = getattr(op, name, None)
493
  if not (val is None or isinstance(val, bool)):
494
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
495
                               (name, str(val)))
496
  setattr(op, name, val)
497

    
498

    
499
def _CheckNodeOnline(lu, node):
500
  """Ensure that a given node is online.
501

502
  @param lu: the LU on behalf of which we make the check
503
  @param node: the node to check
504
  @raise errors.OpPrereqError: if the node is offline
505

506
  """
507
  if lu.cfg.GetNodeInfo(node).offline:
508
    raise errors.OpPrereqError("Can't use offline node %s" % node)
509

    
510

    
511
def _CheckNodeNotDrained(lu, node):
512
  """Ensure that a given node is not drained.
513

514
  @param lu: the LU on behalf of which we make the check
515
  @param node: the node to check
516
  @raise errors.OpPrereqError: if the node is drained
517

518
  """
519
  if lu.cfg.GetNodeInfo(node).drained:
520
    raise errors.OpPrereqError("Can't use drained node %s" % node)
521

    
522

    
523
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524
                          memory, vcpus, nics, disk_template, disks,
525
                          bep, hvp, hypervisor_name):
526
  """Builds instance related env variables for hooks
527

528
  This builds the hook environment from individual variables.
529

530
  @type name: string
531
  @param name: the name of the instance
532
  @type primary_node: string
533
  @param primary_node: the name of the instance's primary node
534
  @type secondary_nodes: list
535
  @param secondary_nodes: list of secondary nodes as strings
536
  @type os_type: string
537
  @param os_type: the name of the instance's OS
538
  @type status: boolean
539
  @param status: the should_run status of the instance
540
  @type memory: string
541
  @param memory: the memory size of the instance
542
  @type vcpus: string
543
  @param vcpus: the count of VCPUs the instance has
544
  @type nics: list
545
  @param nics: list of tuples (ip, mac, mode, link) representing
546
      the NICs the instance has
547
  @type disk_template: string
548
  @param disk_template: the disk template of the instance
549
  @type disks: list
550
  @param disks: the list of (size, mode) pairs
551
  @type bep: dict
552
  @param bep: the backend parameters for the instance
553
  @type hvp: dict
554
  @param hvp: the hypervisor parameters for the instance
555
  @type hypervisor_name: string
556
  @param hypervisor_name: the hypervisor for the instance
557
  @rtype: dict
558
  @return: the hook environment for this instance
559

560
  """
561
  if status:
562
    str_status = "up"
563
  else:
564
    str_status = "down"
565
  env = {
566
    "OP_TARGET": name,
567
    "INSTANCE_NAME": name,
568
    "INSTANCE_PRIMARY": primary_node,
569
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570
    "INSTANCE_OS_TYPE": os_type,
571
    "INSTANCE_STATUS": str_status,
572
    "INSTANCE_MEMORY": memory,
573
    "INSTANCE_VCPUS": vcpus,
574
    "INSTANCE_DISK_TEMPLATE": disk_template,
575
    "INSTANCE_HYPERVISOR": hypervisor_name,
576
  }
577

    
578
  if nics:
579
    nic_count = len(nics)
580
    for idx, (ip, mac, mode, link) in enumerate(nics):
581
      if ip is None:
582
        ip = ""
583
      env["INSTANCE_NIC%d_IP" % idx] = ip
584
      env["INSTANCE_NIC%d_MAC" % idx] = mac
585
      env["INSTANCE_NIC%d_MODE" % idx] = mode
586
      env["INSTANCE_NIC%d_LINK" % idx] = link
587
      if mode == constants.NIC_MODE_BRIDGED:
588
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
589
  else:
590
    nic_count = 0
591

    
592
  env["INSTANCE_NIC_COUNT"] = nic_count
593

    
594
  if disks:
595
    disk_count = len(disks)
596
    for idx, (size, mode) in enumerate(disks):
597
      env["INSTANCE_DISK%d_SIZE" % idx] = size
598
      env["INSTANCE_DISK%d_MODE" % idx] = mode
599
  else:
600
    disk_count = 0
601

    
602
  env["INSTANCE_DISK_COUNT"] = disk_count
603

    
604
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
605
    for key, value in source.items():
606
      env["INSTANCE_%s_%s" % (kind, key)] = value
607

    
608
  return env
609

    
610

    
611
def _NICListToTuple(lu, nics):
612
  """Build a list of nic information tuples.
613

614
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615
  value in LUQueryInstanceData.
616

617
  @type lu:  L{LogicalUnit}
618
  @param lu: the logical unit on whose behalf we execute
619
  @type nics: list of L{objects.NIC}
620
  @param nics: list of nics to convert to hooks tuples
621

622
  """
623
  hooks_nics = []
624
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
625
  for nic in nics:
626
    ip = nic.ip
627
    mac = nic.mac
628
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629
    mode = filled_params[constants.NIC_MODE]
630
    link = filled_params[constants.NIC_LINK]
631
    hooks_nics.append((ip, mac, mode, link))
632
  return hooks_nics
633

    
634

    
635
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636
  """Builds instance related env variables for hooks from an object.
637

638
  @type lu: L{LogicalUnit}
639
  @param lu: the logical unit on whose behalf we execute
640
  @type instance: L{objects.Instance}
641
  @param instance: the instance for which we should build the
642
      environment
643
  @type override: dict
644
  @param override: dictionary with key/values that will override
645
      our values
646
  @rtype: dict
647
  @return: the hook environment dictionary
648

649
  """
650
  cluster = lu.cfg.GetClusterInfo()
651
  bep = cluster.FillBE(instance)
652
  hvp = cluster.FillHV(instance)
653
  args = {
654
    'name': instance.name,
655
    'primary_node': instance.primary_node,
656
    'secondary_nodes': instance.secondary_nodes,
657
    'os_type': instance.os,
658
    'status': instance.admin_up,
659
    'memory': bep[constants.BE_MEMORY],
660
    'vcpus': bep[constants.BE_VCPUS],
661
    'nics': _NICListToTuple(lu, instance.nics),
662
    'disk_template': instance.disk_template,
663
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
664
    'bep': bep,
665
    'hvp': hvp,
666
    'hypervisor_name': instance.hypervisor,
667
  }
668
  if override:
669
    args.update(override)
670
  return _BuildInstanceHookEnv(**args)
671

    
672

    
673
def _AdjustCandidatePool(lu):
674
  """Adjust the candidate pool after node operations.
675

676
  """
677
  mod_list = lu.cfg.MaintainCandidatePool()
678
  if mod_list:
679
    lu.LogInfo("Promoted nodes to master candidate role: %s",
680
               ", ".join(node.name for node in mod_list))
681
    for name in mod_list:
682
      lu.context.ReaddNode(name)
683
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
684
  if mc_now > mc_max:
685
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
686
               (mc_now, mc_max))
687

    
688

    
689
def _CheckNicsBridgesExist(lu, target_nics, target_node,
690
                               profile=constants.PP_DEFAULT):
691
  """Check that the brigdes needed by a list of nics exist.
692

693
  """
694
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696
                for nic in target_nics]
697
  brlist = [params[constants.NIC_LINK] for params in paramslist
698
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
699
  if brlist:
700
    result = lu.rpc.call_bridges_exist(target_node, brlist)
701
    result.Raise("Error checking bridges on destination node '%s'" %
702
                 target_node, prereq=True)
703

    
704

    
705
def _CheckInstanceBridgesExist(lu, instance, node=None):
706
  """Check that the brigdes needed by an instance exist.
707

708
  """
709
  if node is None:
710
    node = instance.primary_node
711
  _CheckNicsBridgesExist(lu, instance.nics, node)
712

    
713

    
714
def _GetNodeInstancesInner(cfg, fn):
715
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
716

    
717

    
718
def _GetNodeInstances(cfg, node_name):
719
  """Returns a list of all primary and secondary instances on a node.
720

721
  """
722

    
723
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
724

    
725

    
726
def _GetNodePrimaryInstances(cfg, node_name):
727
  """Returns primary instances on a node.
728

729
  """
730
  return _GetNodeInstancesInner(cfg,
731
                                lambda inst: node_name == inst.primary_node)
732

    
733

    
734
def _GetNodeSecondaryInstances(cfg, node_name):
735
  """Returns secondary instances on a node.
736

737
  """
738
  return _GetNodeInstancesInner(cfg,
739
                                lambda inst: node_name in inst.secondary_nodes)
740

    
741

    
742
def _GetStorageTypeArgs(cfg, storage_type):
743
  """Returns the arguments for a storage type.
744

745
  """
746
  # Special case for file storage
747
  if storage_type == constants.ST_FILE:
748
    # storage.FileStorage wants a list of storage directories
749
    return [[cfg.GetFileStorageDir()]]
750

    
751
  return []
752

    
753

    
754
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
755
  faulty = []
756

    
757
  for dev in instance.disks:
758
    cfg.SetDiskID(dev, node_name)
759

    
760
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
761
  result.Raise("Failed to get disk status from node %s" % node_name,
762
               prereq=prereq)
763

    
764
  for idx, bdev_status in enumerate(result.payload):
765
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
766
      faulty.append(idx)
767

    
768
  return faulty
769

    
770

    
771
class LUPostInitCluster(LogicalUnit):
772
  """Logical unit for running hooks after cluster initialization.
773

774
  """
775
  HPATH = "cluster-init"
776
  HTYPE = constants.HTYPE_CLUSTER
777
  _OP_REQP = []
778

    
779
  def BuildHooksEnv(self):
780
    """Build hooks env.
781

782
    """
783
    env = {"OP_TARGET": self.cfg.GetClusterName()}
784
    mn = self.cfg.GetMasterNode()
785
    return env, [], [mn]
786

    
787
  def CheckPrereq(self):
788
    """No prerequisites to check.
789

790
    """
791
    return True
792

    
793
  def Exec(self, feedback_fn):
794
    """Nothing to do.
795

796
    """
797
    return True
798

    
799

    
800
class LUDestroyCluster(LogicalUnit):
801
  """Logical unit for destroying the cluster.
802

803
  """
804
  HPATH = "cluster-destroy"
805
  HTYPE = constants.HTYPE_CLUSTER
806
  _OP_REQP = []
807

    
808
  def BuildHooksEnv(self):
809
    """Build hooks env.
810

811
    """
812
    env = {"OP_TARGET": self.cfg.GetClusterName()}
813
    return env, [], []
814

    
815
  def CheckPrereq(self):
816
    """Check prerequisites.
817

818
    This checks whether the cluster is empty.
819

820
    Any errors are signaled by raising errors.OpPrereqError.
821

822
    """
823
    master = self.cfg.GetMasterNode()
824

    
825
    nodelist = self.cfg.GetNodeList()
826
    if len(nodelist) != 1 or nodelist[0] != master:
827
      raise errors.OpPrereqError("There are still %d node(s) in"
828
                                 " this cluster." % (len(nodelist) - 1))
829
    instancelist = self.cfg.GetInstanceList()
830
    if instancelist:
831
      raise errors.OpPrereqError("There are still %d instance(s) in"
832
                                 " this cluster." % len(instancelist))
833

    
834
  def Exec(self, feedback_fn):
835
    """Destroys the cluster.
836

837
    """
838
    master = self.cfg.GetMasterNode()
839
    result = self.rpc.call_node_stop_master(master, False)
840
    result.Raise("Could not disable the master role")
841
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
842
    utils.CreateBackup(priv_key)
843
    utils.CreateBackup(pub_key)
844
    return master
845

    
846

    
847
class LUVerifyCluster(LogicalUnit):
848
  """Verifies the cluster status.
849

850
  """
851
  HPATH = "cluster-verify"
852
  HTYPE = constants.HTYPE_CLUSTER
853
  _OP_REQP = ["skip_checks"]
854
  REQ_BGL = False
855

    
856
  def ExpandNames(self):
857
    self.needed_locks = {
858
      locking.LEVEL_NODE: locking.ALL_SET,
859
      locking.LEVEL_INSTANCE: locking.ALL_SET,
860
    }
861
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
862

    
863
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
864
                  node_result, feedback_fn, master_files,
865
                  drbd_map, vg_name):
866
    """Run multiple tests against a node.
867

868
    Test list:
869

870
      - compares ganeti version
871
      - checks vg existence and size > 20G
872
      - checks config file checksum
873
      - checks ssh to other nodes
874

875
    @type nodeinfo: L{objects.Node}
876
    @param nodeinfo: the node to check
877
    @param file_list: required list of files
878
    @param local_cksum: dictionary of local files and their checksums
879
    @param node_result: the results from the node
880
    @param feedback_fn: function used to accumulate results
881
    @param master_files: list of files that only masters should have
882
    @param drbd_map: the useddrbd minors for this node, in
883
        form of minor: (instance, must_exist) which correspond to instances
884
        and their running status
885
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
886

887
    """
888
    node = nodeinfo.name
889

    
890
    # main result, node_result should be a non-empty dict
891
    if not node_result or not isinstance(node_result, dict):
892
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
893
      return True
894

    
895
    # compares ganeti version
896
    local_version = constants.PROTOCOL_VERSION
897
    remote_version = node_result.get('version', None)
898
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
899
            len(remote_version) == 2):
900
      feedback_fn("  - ERROR: connection to %s failed" % (node))
901
      return True
902

    
903
    if local_version != remote_version[0]:
904
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
905
                  " node %s %s" % (local_version, node, remote_version[0]))
906
      return True
907

    
908
    # node seems compatible, we can actually try to look into its results
909

    
910
    bad = False
911

    
912
    # full package version
913
    if constants.RELEASE_VERSION != remote_version[1]:
914
      feedback_fn("  - WARNING: software version mismatch: master %s,"
915
                  " node %s %s" %
916
                  (constants.RELEASE_VERSION, node, remote_version[1]))
917

    
918
    # checks vg existence and size > 20G
919
    if vg_name is not None:
920
      vglist = node_result.get(constants.NV_VGLIST, None)
921
      if not vglist:
922
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
923
                        (node,))
924
        bad = True
925
      else:
926
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
927
                                              constants.MIN_VG_SIZE)
928
        if vgstatus:
929
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
930
          bad = True
931

    
932
    # checks config file checksum
933

    
934
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
935
    if not isinstance(remote_cksum, dict):
936
      bad = True
937
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
938
    else:
939
      for file_name in file_list:
940
        node_is_mc = nodeinfo.master_candidate
941
        must_have_file = file_name not in master_files
942
        if file_name not in remote_cksum:
943
          if node_is_mc or must_have_file:
944
            bad = True
945
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
946
        elif remote_cksum[file_name] != local_cksum[file_name]:
947
          if node_is_mc or must_have_file:
948
            bad = True
949
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
950
          else:
951
            # not candidate and this is not a must-have file
952
            bad = True
953
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
954
                        " candidates (and the file is outdated)" % file_name)
955
        else:
956
          # all good, except non-master/non-must have combination
957
          if not node_is_mc and not must_have_file:
958
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
959
                        " candidates" % file_name)
960

    
961
    # checks ssh to any
962

    
963
    if constants.NV_NODELIST not in node_result:
964
      bad = True
965
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
966
    else:
967
      if node_result[constants.NV_NODELIST]:
968
        bad = True
969
        for node in node_result[constants.NV_NODELIST]:
970
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
971
                          (node, node_result[constants.NV_NODELIST][node]))
972

    
973
    if constants.NV_NODENETTEST not in node_result:
974
      bad = True
975
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
976
    else:
977
      if node_result[constants.NV_NODENETTEST]:
978
        bad = True
979
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
980
        for node in nlist:
981
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
982
                          (node, node_result[constants.NV_NODENETTEST][node]))
983

    
984
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
985
    if isinstance(hyp_result, dict):
986
      for hv_name, hv_result in hyp_result.iteritems():
987
        if hv_result is not None:
988
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
989
                      (hv_name, hv_result))
990

    
991
    # check used drbd list
992
    if vg_name is not None:
993
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
994
      if not isinstance(used_minors, (tuple, list)):
995
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
996
                    str(used_minors))
997
      else:
998
        for minor, (iname, must_exist) in drbd_map.items():
999
          if minor not in used_minors and must_exist:
1000
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
1001
                        " not active" % (minor, iname))
1002
            bad = True
1003
        for minor in used_minors:
1004
          if minor not in drbd_map:
1005
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
1006
                        minor)
1007
            bad = True
1008

    
1009
    return bad
1010

    
1011
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1012
                      node_instance, feedback_fn, n_offline):
1013
    """Verify an instance.
1014

1015
    This function checks to see if the required block devices are
1016
    available on the instance's node.
1017

1018
    """
1019
    bad = False
1020

    
1021
    node_current = instanceconfig.primary_node
1022

    
1023
    node_vol_should = {}
1024
    instanceconfig.MapLVsByNode(node_vol_should)
1025

    
1026
    for node in node_vol_should:
1027
      if node in n_offline:
1028
        # ignore missing volumes on offline nodes
1029
        continue
1030
      for volume in node_vol_should[node]:
1031
        if node not in node_vol_is or volume not in node_vol_is[node]:
1032
          feedback_fn("  - ERROR: volume %s missing on node %s" %
1033
                          (volume, node))
1034
          bad = True
1035

    
1036
    if instanceconfig.admin_up:
1037
      if ((node_current not in node_instance or
1038
          not instance in node_instance[node_current]) and
1039
          node_current not in n_offline):
1040
        feedback_fn("  - ERROR: instance %s not running on node %s" %
1041
                        (instance, node_current))
1042
        bad = True
1043

    
1044
    for node in node_instance:
1045
      if (not node == node_current):
1046
        if instance in node_instance[node]:
1047
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
1048
                          (instance, node))
1049
          bad = True
1050

    
1051
    return bad
1052

    
1053
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
1054
    """Verify if there are any unknown volumes in the cluster.
1055

1056
    The .os, .swap and backup volumes are ignored. All other volumes are
1057
    reported as unknown.
1058

1059
    """
1060
    bad = False
1061

    
1062
    for node in node_vol_is:
1063
      for volume in node_vol_is[node]:
1064
        if node not in node_vol_should or volume not in node_vol_should[node]:
1065
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
1066
                      (volume, node))
1067
          bad = True
1068
    return bad
1069

    
1070
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1071
    """Verify the list of running instances.
1072

1073
    This checks what instances are running but unknown to the cluster.
1074

1075
    """
1076
    bad = False
1077
    for node in node_instance:
1078
      for runninginstance in node_instance[node]:
1079
        if runninginstance not in instancelist:
1080
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
1081
                          (runninginstance, node))
1082
          bad = True
1083
    return bad
1084

    
1085
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1086
    """Verify N+1 Memory Resilience.
1087

1088
    Check that if one single node dies we can still start all the instances it
1089
    was primary for.
1090

1091
    """
1092
    bad = False
1093

    
1094
    for node, nodeinfo in node_info.iteritems():
1095
      # This code checks that every node which is now listed as secondary has
1096
      # enough memory to host all instances it is supposed to should a single
1097
      # other node in the cluster fail.
1098
      # FIXME: not ready for failover to an arbitrary node
1099
      # FIXME: does not support file-backed instances
1100
      # WARNING: we currently take into account down instances as well as up
1101
      # ones, considering that even if they're down someone might want to start
1102
      # them even in the event of a node failure.
1103
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1104
        needed_mem = 0
1105
        for instance in instances:
1106
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1107
          if bep[constants.BE_AUTO_BALANCE]:
1108
            needed_mem += bep[constants.BE_MEMORY]
1109
        if nodeinfo['mfree'] < needed_mem:
1110
          feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
1111
                      " failovers should node %s fail" % (node, prinode))
1112
          bad = True
1113
    return bad
1114

    
1115
  def CheckPrereq(self):
1116
    """Check prerequisites.
1117

1118
    Transform the list of checks we're going to skip into a set and check that
1119
    all its members are valid.
1120

1121
    """
1122
    self.skip_set = frozenset(self.op.skip_checks)
1123
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1124
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
1125

    
1126
  def BuildHooksEnv(self):
1127
    """Build hooks env.
1128

1129
    Cluster-Verify hooks just ran in the post phase and their failure makes
1130
    the output be logged in the verify output and the verification to fail.
1131

1132
    """
1133
    all_nodes = self.cfg.GetNodeList()
1134
    env = {
1135
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1136
      }
1137
    for node in self.cfg.GetAllNodesInfo().values():
1138
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1139

    
1140
    return env, [], all_nodes
1141

    
1142
  def Exec(self, feedback_fn):
1143
    """Verify integrity of cluster, performing various test on nodes.
1144

1145
    """
1146
    bad = False
1147
    feedback_fn("* Verifying global settings")
1148
    for msg in self.cfg.VerifyConfig():
1149
      feedback_fn("  - ERROR: %s" % msg)
1150

    
1151
    vg_name = self.cfg.GetVGName()
1152
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1153
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1154
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1155
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1156
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1157
                        for iname in instancelist)
1158
    i_non_redundant = [] # Non redundant instances
1159
    i_non_a_balanced = [] # Non auto-balanced instances
1160
    n_offline = [] # List of offline nodes
1161
    n_drained = [] # List of nodes being drained
1162
    node_volume = {}
1163
    node_instance = {}
1164
    node_info = {}
1165
    instance_cfg = {}
1166

    
1167
    # FIXME: verify OS list
1168
    # do local checksums
1169
    master_files = [constants.CLUSTER_CONF_FILE]
1170

    
1171
    file_names = ssconf.SimpleStore().GetFileList()
1172
    file_names.append(constants.SSL_CERT_FILE)
1173
    file_names.append(constants.RAPI_CERT_FILE)
1174
    file_names.extend(master_files)
1175

    
1176
    local_checksums = utils.FingerprintFiles(file_names)
1177

    
1178
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1179
    node_verify_param = {
1180
      constants.NV_FILELIST: file_names,
1181
      constants.NV_NODELIST: [node.name for node in nodeinfo
1182
                              if not node.offline],
1183
      constants.NV_HYPERVISOR: hypervisors,
1184
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1185
                                  node.secondary_ip) for node in nodeinfo
1186
                                 if not node.offline],
1187
      constants.NV_INSTANCELIST: hypervisors,
1188
      constants.NV_VERSION: None,
1189
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1190
      }
1191
    if vg_name is not None:
1192
      node_verify_param[constants.NV_VGLIST] = None
1193
      node_verify_param[constants.NV_LVLIST] = vg_name
1194
      node_verify_param[constants.NV_DRBDLIST] = None
1195
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1196
                                           self.cfg.GetClusterName())
1197

    
1198
    cluster = self.cfg.GetClusterInfo()
1199
    master_node = self.cfg.GetMasterNode()
1200
    all_drbd_map = self.cfg.ComputeDRBDMap()
1201

    
1202
    for node_i in nodeinfo:
1203
      node = node_i.name
1204

    
1205
      if node_i.offline:
1206
        feedback_fn("* Skipping offline node %s" % (node,))
1207
        n_offline.append(node)
1208
        continue
1209

    
1210
      if node == master_node:
1211
        ntype = "master"
1212
      elif node_i.master_candidate:
1213
        ntype = "master candidate"
1214
      elif node_i.drained:
1215
        ntype = "drained"
1216
        n_drained.append(node)
1217
      else:
1218
        ntype = "regular"
1219
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1220

    
1221
      msg = all_nvinfo[node].fail_msg
1222
      if msg:
1223
        feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1224
        bad = True
1225
        continue
1226

    
1227
      nresult = all_nvinfo[node].payload
1228
      node_drbd = {}
1229
      for minor, instance in all_drbd_map[node].items():
1230
        if instance not in instanceinfo:
1231
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1232
                      instance)
1233
          # ghost instance should not be running, but otherwise we
1234
          # don't give double warnings (both ghost instance and
1235
          # unallocated minor in use)
1236
          node_drbd[minor] = (instance, False)
1237
        else:
1238
          instance = instanceinfo[instance]
1239
          node_drbd[minor] = (instance.name, instance.admin_up)
1240
      result = self._VerifyNode(node_i, file_names, local_checksums,
1241
                                nresult, feedback_fn, master_files,
1242
                                node_drbd, vg_name)
1243
      bad = bad or result
1244

    
1245
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1246
      if vg_name is None:
1247
        node_volume[node] = {}
1248
      elif isinstance(lvdata, basestring):
1249
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1250
                    (node, utils.SafeEncode(lvdata)))
1251
        bad = True
1252
        node_volume[node] = {}
1253
      elif not isinstance(lvdata, dict):
1254
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1255
        bad = True
1256
        continue
1257
      else:
1258
        node_volume[node] = lvdata
1259

    
1260
      # node_instance
1261
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1262
      if not isinstance(idata, list):
1263
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1264
                    (node,))
1265
        bad = True
1266
        continue
1267

    
1268
      node_instance[node] = idata
1269

    
1270
      # node_info
1271
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1272
      if not isinstance(nodeinfo, dict):
1273
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1274
        bad = True
1275
        continue
1276

    
1277
      try:
1278
        node_info[node] = {
1279
          "mfree": int(nodeinfo['memory_free']),
1280
          "pinst": [],
1281
          "sinst": [],
1282
          # dictionary holding all instances this node is secondary for,
1283
          # grouped by their primary node. Each key is a cluster node, and each
1284
          # value is a list of instances which have the key as primary and the
1285
          # current node as secondary.  this is handy to calculate N+1 memory
1286
          # availability if you can only failover from a primary to its
1287
          # secondary.
1288
          "sinst-by-pnode": {},
1289
        }
1290
        # FIXME: devise a free space model for file based instances as well
1291
        if vg_name is not None:
1292
          if (constants.NV_VGLIST not in nresult or
1293
              vg_name not in nresult[constants.NV_VGLIST]):
1294
            feedback_fn("  - ERROR: node %s didn't return data for the"
1295
                        " volume group '%s' - it is either missing or broken" %
1296
                        (node, vg_name))
1297
            bad = True
1298
            continue
1299
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1300
      except (ValueError, KeyError):
1301
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1302
                    " from node %s" % (node,))
1303
        bad = True
1304
        continue
1305

    
1306
    node_vol_should = {}
1307

    
1308
    for instance in instancelist:
1309
      feedback_fn("* Verifying instance %s" % instance)
1310
      inst_config = instanceinfo[instance]
1311
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1312
                                     node_instance, feedback_fn, n_offline)
1313
      bad = bad or result
1314
      inst_nodes_offline = []
1315

    
1316
      inst_config.MapLVsByNode(node_vol_should)
1317

    
1318
      instance_cfg[instance] = inst_config
1319

    
1320
      pnode = inst_config.primary_node
1321
      if pnode in node_info:
1322
        node_info[pnode]['pinst'].append(instance)
1323
      elif pnode not in n_offline:
1324
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1325
                    " %s failed" % (instance, pnode))
1326
        bad = True
1327

    
1328
      if pnode in n_offline:
1329
        inst_nodes_offline.append(pnode)
1330

    
1331
      # If the instance is non-redundant we cannot survive losing its primary
1332
      # node, so we are not N+1 compliant. On the other hand we have no disk
1333
      # templates with more than one secondary so that situation is not well
1334
      # supported either.
1335
      # FIXME: does not support file-backed instances
1336
      if len(inst_config.secondary_nodes) == 0:
1337
        i_non_redundant.append(instance)
1338
      elif len(inst_config.secondary_nodes) > 1:
1339
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1340
                    % instance)
1341

    
1342
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1343
        i_non_a_balanced.append(instance)
1344

    
1345
      for snode in inst_config.secondary_nodes:
1346
        if snode in node_info:
1347
          node_info[snode]['sinst'].append(instance)
1348
          if pnode not in node_info[snode]['sinst-by-pnode']:
1349
            node_info[snode]['sinst-by-pnode'][pnode] = []
1350
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1351
        elif snode not in n_offline:
1352
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1353
                      " %s failed" % (instance, snode))
1354
          bad = True
1355
        if snode in n_offline:
1356
          inst_nodes_offline.append(snode)
1357

    
1358
      if inst_nodes_offline:
1359
        # warn that the instance lives on offline nodes, and set bad=True
1360
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1361
                    ", ".join(inst_nodes_offline))
1362
        bad = True
1363

    
1364
    feedback_fn("* Verifying orphan volumes")
1365
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1366
                                       feedback_fn)
1367
    bad = bad or result
1368

    
1369
    feedback_fn("* Verifying remaining instances")
1370
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1371
                                         feedback_fn)
1372
    bad = bad or result
1373

    
1374
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1375
      feedback_fn("* Verifying N+1 Memory redundancy")
1376
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1377
      bad = bad or result
1378

    
1379
    feedback_fn("* Other Notes")
1380
    if i_non_redundant:
1381
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1382
                  % len(i_non_redundant))
1383

    
1384
    if i_non_a_balanced:
1385
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1386
                  % len(i_non_a_balanced))
1387

    
1388
    if n_offline:
1389
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1390

    
1391
    if n_drained:
1392
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1393

    
1394
    return not bad
1395

    
1396
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1397
    """Analyze the post-hooks' result
1398

1399
    This method analyses the hook result, handles it, and sends some
1400
    nicely-formatted feedback back to the user.
1401

1402
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1403
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1404
    @param hooks_results: the results of the multi-node hooks rpc call
1405
    @param feedback_fn: function used send feedback back to the caller
1406
    @param lu_result: previous Exec result
1407
    @return: the new Exec result, based on the previous result
1408
        and hook results
1409

1410
    """
1411
    # We only really run POST phase hooks, and are only interested in
1412
    # their results
1413
    if phase == constants.HOOKS_PHASE_POST:
1414
      # Used to change hooks' output to proper indentation
1415
      indent_re = re.compile('^', re.M)
1416
      feedback_fn("* Hooks Results")
1417
      if not hooks_results:
1418
        feedback_fn("  - ERROR: general communication failure")
1419
        lu_result = 1
1420
      else:
1421
        for node_name in hooks_results:
1422
          show_node_header = True
1423
          res = hooks_results[node_name]
1424
          msg = res.fail_msg
1425
          if msg:
1426
            if res.offline:
1427
              # no need to warn or set fail return value
1428
              continue
1429
            feedback_fn("    Communication failure in hooks execution: %s" %
1430
                        msg)
1431
            lu_result = 1
1432
            continue
1433
          for script, hkr, output in res.payload:
1434
            if hkr == constants.HKR_FAIL:
1435
              # The node header is only shown once, if there are
1436
              # failing hooks on that node
1437
              if show_node_header:
1438
                feedback_fn("  Node %s:" % node_name)
1439
                show_node_header = False
1440
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1441
              output = indent_re.sub('      ', output)
1442
              feedback_fn("%s" % output)
1443
              lu_result = 1
1444

    
1445
      return lu_result
1446

    
1447

    
1448
class LUVerifyDisks(NoHooksLU):
1449
  """Verifies the cluster disks status.
1450

1451
  """
1452
  _OP_REQP = []
1453
  REQ_BGL = False
1454

    
1455
  def ExpandNames(self):
1456
    self.needed_locks = {
1457
      locking.LEVEL_NODE: locking.ALL_SET,
1458
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1459
    }
1460
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1461

    
1462
  def CheckPrereq(self):
1463
    """Check prerequisites.
1464

1465
    This has no prerequisites.
1466

1467
    """
1468
    pass
1469

    
1470
  def Exec(self, feedback_fn):
1471
    """Verify integrity of cluster disks.
1472

1473
    @rtype: tuple of three items
1474
    @return: a tuple of (dict of node-to-node_error, list of instances
1475
        which need activate-disks, dict of instance: (node, volume) for
1476
        missing volumes
1477

1478
    """
1479
    result = res_nodes, res_instances, res_missing = {}, [], {}
1480

    
1481
    vg_name = self.cfg.GetVGName()
1482
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1483
    instances = [self.cfg.GetInstanceInfo(name)
1484
                 for name in self.cfg.GetInstanceList()]
1485

    
1486
    nv_dict = {}
1487
    for inst in instances:
1488
      inst_lvs = {}
1489
      if (not inst.admin_up or
1490
          inst.disk_template not in constants.DTS_NET_MIRROR):
1491
        continue
1492
      inst.MapLVsByNode(inst_lvs)
1493
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1494
      for node, vol_list in inst_lvs.iteritems():
1495
        for vol in vol_list:
1496
          nv_dict[(node, vol)] = inst
1497

    
1498
    if not nv_dict:
1499
      return result
1500

    
1501
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1502

    
1503
    for node in nodes:
1504
      # node_volume
1505
      node_res = node_lvs[node]
1506
      if node_res.offline:
1507
        continue
1508
      msg = node_res.fail_msg
1509
      if msg:
1510
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1511
        res_nodes[node] = msg
1512
        continue
1513

    
1514
      lvs = node_res.payload
1515
      for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1516
        inst = nv_dict.pop((node, lv_name), None)
1517
        if (not lv_online and inst is not None
1518
            and inst.name not in res_instances):
1519
          res_instances.append(inst.name)
1520

    
1521
    # any leftover items in nv_dict are missing LVs, let's arrange the
1522
    # data better
1523
    for key, inst in nv_dict.iteritems():
1524
      if inst.name not in res_missing:
1525
        res_missing[inst.name] = []
1526
      res_missing[inst.name].append(key)
1527

    
1528
    return result
1529

    
1530

    
1531
class LURepairDiskSizes(NoHooksLU):
1532
  """Verifies the cluster disks sizes.
1533

1534
  """
1535
  _OP_REQP = ["instances"]
1536
  REQ_BGL = False
1537

    
1538
  def ExpandNames(self):
1539

    
1540
    if not isinstance(self.op.instances, list):
1541
      raise errors.OpPrereqError("Invalid argument type 'instances'")
1542

    
1543
    if self.op.instances:
1544
      self.wanted_names = []
1545
      for name in self.op.instances:
1546
        full_name = self.cfg.ExpandInstanceName(name)
1547
        if full_name is None:
1548
          raise errors.OpPrereqError("Instance '%s' not known" % name)
1549
        self.wanted_names.append(full_name)
1550
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1551
      self.needed_locks = {
1552
        locking.LEVEL_NODE: [],
1553
        locking.LEVEL_INSTANCE: self.wanted_names,
1554
        }
1555
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1556
    else:
1557
      self.wanted_names = None
1558
      self.needed_locks = {
1559
        locking.LEVEL_NODE: locking.ALL_SET,
1560
        locking.LEVEL_INSTANCE: locking.ALL_SET,
1561
        }
1562
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1563

    
1564
  def DeclareLocks(self, level):
1565
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
1566
      self._LockInstancesNodes(primary_only=True)
1567

    
1568
  def CheckPrereq(self):
1569
    """Check prerequisites.
1570

1571
    This only checks the optional instance list against the existing names.
1572

1573
    """
1574
    if self.wanted_names is None:
1575
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1576

    
1577
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1578
                             in self.wanted_names]
1579

    
1580
  def Exec(self, feedback_fn):
1581
    """Verify the size of cluster disks.
1582

1583
    """
1584
    # TODO: check child disks too
1585
    # TODO: check differences in size between primary/secondary nodes
1586
    per_node_disks = {}
1587
    for instance in self.wanted_instances:
1588
      pnode = instance.primary_node
1589
      if pnode not in per_node_disks:
1590
        per_node_disks[pnode] = []
1591
      for idx, disk in enumerate(instance.disks):
1592
        per_node_disks[pnode].append((instance, idx, disk))
1593

    
1594
    changed = []
1595
    for node, dskl in per_node_disks.items():
1596
      result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1597
      if result.failed:
1598
        self.LogWarning("Failure in blockdev_getsizes call to node"
1599
                        " %s, ignoring", node)
1600
        continue
1601
      if len(result.data) != len(dskl):
1602
        self.LogWarning("Invalid result from node %s, ignoring node results",
1603
                        node)
1604
        continue
1605
      for ((instance, idx, disk), size) in zip(dskl, result.data):
1606
        if size is None:
1607
          self.LogWarning("Disk %d of instance %s did not return size"
1608
                          " information, ignoring", idx, instance.name)
1609
          continue
1610
        if not isinstance(size, (int, long)):
1611
          self.LogWarning("Disk %d of instance %s did not return valid"
1612
                          " size information, ignoring", idx, instance.name)
1613
          continue
1614
        size = size >> 20
1615
        if size != disk.size:
1616
          self.LogInfo("Disk %d of instance %s has mismatched size,"
1617
                       " correcting: recorded %d, actual %d", idx,
1618
                       instance.name, disk.size, size)
1619
          disk.size = size
1620
          self.cfg.Update(instance)
1621
          changed.append((instance.name, idx, size))
1622
    return changed
1623

    
1624

    
1625
class LURenameCluster(LogicalUnit):
1626
  """Rename the cluster.
1627

1628
  """
1629
  HPATH = "cluster-rename"
1630
  HTYPE = constants.HTYPE_CLUSTER
1631
  _OP_REQP = ["name"]
1632

    
1633
  def BuildHooksEnv(self):
1634
    """Build hooks env.
1635

1636
    """
1637
    env = {
1638
      "OP_TARGET": self.cfg.GetClusterName(),
1639
      "NEW_NAME": self.op.name,
1640
      }
1641
    mn = self.cfg.GetMasterNode()
1642
    return env, [mn], [mn]
1643

    
1644
  def CheckPrereq(self):
1645
    """Verify that the passed name is a valid one.
1646

1647
    """
1648
    hostname = utils.HostInfo(self.op.name)
1649

    
1650
    new_name = hostname.name
1651
    self.ip = new_ip = hostname.ip
1652
    old_name = self.cfg.GetClusterName()
1653
    old_ip = self.cfg.GetMasterIP()
1654
    if new_name == old_name and new_ip == old_ip:
1655
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1656
                                 " cluster has changed")
1657
    if new_ip != old_ip:
1658
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1659
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1660
                                   " reachable on the network. Aborting." %
1661
                                   new_ip)
1662

    
1663
    self.op.name = new_name
1664

    
1665
  def Exec(self, feedback_fn):
1666
    """Rename the cluster.
1667

1668
    """
1669
    clustername = self.op.name
1670
    ip = self.ip
1671

    
1672
    # shutdown the master IP
1673
    master = self.cfg.GetMasterNode()
1674
    result = self.rpc.call_node_stop_master(master, False)
1675
    result.Raise("Could not disable the master role")
1676

    
1677
    try:
1678
      cluster = self.cfg.GetClusterInfo()
1679
      cluster.cluster_name = clustername
1680
      cluster.master_ip = ip
1681
      self.cfg.Update(cluster)
1682

    
1683
      # update the known hosts file
1684
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1685
      node_list = self.cfg.GetNodeList()
1686
      try:
1687
        node_list.remove(master)
1688
      except ValueError:
1689
        pass
1690
      result = self.rpc.call_upload_file(node_list,
1691
                                         constants.SSH_KNOWN_HOSTS_FILE)
1692
      for to_node, to_result in result.iteritems():
1693
        msg = to_result.fail_msg
1694
        if msg:
1695
          msg = ("Copy of file %s to node %s failed: %s" %
1696
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1697
          self.proc.LogWarning(msg)
1698

    
1699
    finally:
1700
      result = self.rpc.call_node_start_master(master, False, False)
1701
      msg = result.fail_msg
1702
      if msg:
1703
        self.LogWarning("Could not re-enable the master role on"
1704
                        " the master, please restart manually: %s", msg)
1705

    
1706

    
1707
def _RecursiveCheckIfLVMBased(disk):
1708
  """Check if the given disk or its children are lvm-based.
1709

1710
  @type disk: L{objects.Disk}
1711
  @param disk: the disk to check
1712
  @rtype: boolean
1713
  @return: boolean indicating whether a LD_LV dev_type was found or not
1714

1715
  """
1716
  if disk.children:
1717
    for chdisk in disk.children:
1718
      if _RecursiveCheckIfLVMBased(chdisk):
1719
        return True
1720
  return disk.dev_type == constants.LD_LV
1721

    
1722

    
1723
class LUSetClusterParams(LogicalUnit):
1724
  """Change the parameters of the cluster.
1725

1726
  """
1727
  HPATH = "cluster-modify"
1728
  HTYPE = constants.HTYPE_CLUSTER
1729
  _OP_REQP = []
1730
  REQ_BGL = False
1731

    
1732
  def CheckArguments(self):
1733
    """Check parameters
1734

1735
    """
1736
    if not hasattr(self.op, "candidate_pool_size"):
1737
      self.op.candidate_pool_size = None
1738
    if self.op.candidate_pool_size is not None:
1739
      try:
1740
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1741
      except (ValueError, TypeError), err:
1742
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1743
                                   str(err))
1744
      if self.op.candidate_pool_size < 1:
1745
        raise errors.OpPrereqError("At least one master candidate needed")
1746

    
1747
  def ExpandNames(self):
1748
    # FIXME: in the future maybe other cluster params won't require checking on
1749
    # all nodes to be modified.
1750
    self.needed_locks = {
1751
      locking.LEVEL_NODE: locking.ALL_SET,
1752
    }
1753
    self.share_locks[locking.LEVEL_NODE] = 1
1754

    
1755
  def BuildHooksEnv(self):
1756
    """Build hooks env.
1757

1758
    """
1759
    env = {
1760
      "OP_TARGET": self.cfg.GetClusterName(),
1761
      "NEW_VG_NAME": self.op.vg_name,
1762
      }
1763
    mn = self.cfg.GetMasterNode()
1764
    return env, [mn], [mn]
1765

    
1766
  def CheckPrereq(self):
1767
    """Check prerequisites.
1768

1769
    This checks whether the given params don't conflict and
1770
    if the given volume group is valid.
1771

1772
    """
1773
    if self.op.vg_name is not None and not self.op.vg_name:
1774
      instances = self.cfg.GetAllInstancesInfo().values()
1775
      for inst in instances:
1776
        for disk in inst.disks:
1777
          if _RecursiveCheckIfLVMBased(disk):
1778
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1779
                                       " lvm-based instances exist")
1780

    
1781
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1782

    
1783
    # if vg_name not None, checks given volume group on all nodes
1784
    if self.op.vg_name:
1785
      vglist = self.rpc.call_vg_list(node_list)
1786
      for node in node_list:
1787
        msg = vglist[node].fail_msg
1788
        if msg:
1789
          # ignoring down node
1790
          self.LogWarning("Error while gathering data on node %s"
1791
                          " (ignoring node): %s", node, msg)
1792
          continue
1793
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1794
                                              self.op.vg_name,
1795
                                              constants.MIN_VG_SIZE)
1796
        if vgstatus:
1797
          raise errors.OpPrereqError("Error on node '%s': %s" %
1798
                                     (node, vgstatus))
1799

    
1800
    self.cluster = cluster = self.cfg.GetClusterInfo()
1801
    # validate params changes
1802
    if self.op.beparams:
1803
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1804
      self.new_beparams = objects.FillDict(
1805
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1806

    
1807
    if self.op.nicparams:
1808
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1809
      self.new_nicparams = objects.FillDict(
1810
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1811
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1812

    
1813
    # hypervisor list/parameters
1814
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1815
    if self.op.hvparams:
1816
      if not isinstance(self.op.hvparams, dict):
1817
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1818
      for hv_name, hv_dict in self.op.hvparams.items():
1819
        if hv_name not in self.new_hvparams:
1820
          self.new_hvparams[hv_name] = hv_dict
1821
        else:
1822
          self.new_hvparams[hv_name].update(hv_dict)
1823

    
1824
    if self.op.enabled_hypervisors is not None:
1825
      self.hv_list = self.op.enabled_hypervisors
1826
      if not self.hv_list:
1827
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1828
                                   " least one member")
1829
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1830
      if invalid_hvs:
1831
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1832
                                   " entries: %s" %
1833
                                   utils.CommaJoin(invalid_hvs))
1834
    else:
1835
      self.hv_list = cluster.enabled_hypervisors
1836

    
1837
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1838
      # either the enabled list has changed, or the parameters have, validate
1839
      for hv_name, hv_params in self.new_hvparams.items():
1840
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1841
            (self.op.enabled_hypervisors and
1842
             hv_name in self.op.enabled_hypervisors)):
1843
          # either this is a new hypervisor, or its parameters have changed
1844
          hv_class = hypervisor.GetHypervisor(hv_name)
1845
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1846
          hv_class.CheckParameterSyntax(hv_params)
1847
          _CheckHVParams(self, node_list, hv_name, hv_params)
1848

    
1849
  def Exec(self, feedback_fn):
1850
    """Change the parameters of the cluster.
1851

1852
    """
1853
    if self.op.vg_name is not None:
1854
      new_volume = self.op.vg_name
1855
      if not new_volume:
1856
        new_volume = None
1857
      if new_volume != self.cfg.GetVGName():
1858
        self.cfg.SetVGName(new_volume)
1859
      else:
1860
        feedback_fn("Cluster LVM configuration already in desired"
1861
                    " state, not changing")
1862
    if self.op.hvparams:
1863
      self.cluster.hvparams = self.new_hvparams
1864
    if self.op.enabled_hypervisors is not None:
1865
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1866
    if self.op.beparams:
1867
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1868
    if self.op.nicparams:
1869
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1870

    
1871
    if self.op.candidate_pool_size is not None:
1872
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1873
      # we need to update the pool size here, otherwise the save will fail
1874
      _AdjustCandidatePool(self)
1875

    
1876
    self.cfg.Update(self.cluster)
1877

    
1878

    
1879
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1880
  """Distribute additional files which are part of the cluster configuration.
1881

1882
  ConfigWriter takes care of distributing the config and ssconf files, but
1883
  there are more files which should be distributed to all nodes. This function
1884
  makes sure those are copied.
1885

1886
  @param lu: calling logical unit
1887
  @param additional_nodes: list of nodes not in the config to distribute to
1888

1889
  """
1890
  # 1. Gather target nodes
1891
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1892
  dist_nodes = lu.cfg.GetNodeList()
1893
  if additional_nodes is not None:
1894
    dist_nodes.extend(additional_nodes)
1895
  if myself.name in dist_nodes:
1896
    dist_nodes.remove(myself.name)
1897
  # 2. Gather files to distribute
1898
  dist_files = set([constants.ETC_HOSTS,
1899
                    constants.SSH_KNOWN_HOSTS_FILE,
1900
                    constants.RAPI_CERT_FILE,
1901
                    constants.RAPI_USERS_FILE,
1902
                    constants.HMAC_CLUSTER_KEY,
1903
                   ])
1904

    
1905
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1906
  for hv_name in enabled_hypervisors:
1907
    hv_class = hypervisor.GetHypervisor(hv_name)
1908
    dist_files.update(hv_class.GetAncillaryFiles())
1909

    
1910
  # 3. Perform the files upload
1911
  for fname in dist_files:
1912
    if os.path.exists(fname):
1913
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1914
      for to_node, to_result in result.items():
1915
        msg = to_result.fail_msg
1916
        if msg:
1917
          msg = ("Copy of file %s to node %s failed: %s" %
1918
                 (fname, to_node, msg))
1919
          lu.proc.LogWarning(msg)
1920

    
1921

    
1922
class LURedistributeConfig(NoHooksLU):
1923
  """Force the redistribution of cluster configuration.
1924

1925
  This is a very simple LU.
1926

1927
  """
1928
  _OP_REQP = []
1929
  REQ_BGL = False
1930

    
1931
  def ExpandNames(self):
1932
    self.needed_locks = {
1933
      locking.LEVEL_NODE: locking.ALL_SET,
1934
    }
1935
    self.share_locks[locking.LEVEL_NODE] = 1
1936

    
1937
  def CheckPrereq(self):
1938
    """Check prerequisites.
1939

1940
    """
1941

    
1942
  def Exec(self, feedback_fn):
1943
    """Redistribute the configuration.
1944

1945
    """
1946
    self.cfg.Update(self.cfg.GetClusterInfo())
1947
    _RedistributeAncillaryFiles(self)
1948

    
1949

    
1950
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1951
  """Sleep and poll for an instance's disk to sync.
1952

1953
  """
1954
  if not instance.disks:
1955
    return True
1956

    
1957
  if not oneshot:
1958
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1959

    
1960
  node = instance.primary_node
1961

    
1962
  for dev in instance.disks:
1963
    lu.cfg.SetDiskID(dev, node)
1964

    
1965
  retries = 0
1966
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1967
  while True:
1968
    max_time = 0
1969
    done = True
1970
    cumul_degraded = False
1971
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1972
    msg = rstats.fail_msg
1973
    if msg:
1974
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1975
      retries += 1
1976
      if retries >= 10:
1977
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1978
                                 " aborting." % node)
1979
      time.sleep(6)
1980
      continue
1981
    rstats = rstats.payload
1982
    retries = 0
1983
    for i, mstat in enumerate(rstats):
1984
      if mstat is None:
1985
        lu.LogWarning("Can't compute data for node %s/%s",
1986
                           node, instance.disks[i].iv_name)
1987
        continue
1988

    
1989
      cumul_degraded = (cumul_degraded or
1990
                        (mstat.is_degraded and mstat.sync_percent is None))
1991
      if mstat.sync_percent is not None:
1992
        done = False
1993
        if mstat.estimated_time is not None:
1994
          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
1995
          max_time = mstat.estimated_time
1996
        else:
1997
          rem_time = "no time estimate"
1998
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1999
                        (instance.disks[i].iv_name, mstat.sync_percent, rem_time))
2000

    
2001
    # if we're done but degraded, let's do a few small retries, to
2002
    # make sure we see a stable and not transient situation; therefore
2003
    # we force restart of the loop
2004
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2005
      logging.info("Degraded disks found, %d retries left", degr_retries)
2006
      degr_retries -= 1
2007
      time.sleep(1)
2008
      continue
2009

    
2010
    if done or oneshot:
2011
      break
2012

    
2013
    time.sleep(min(60, max_time))
2014

    
2015
  if done:
2016
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2017
  return not cumul_degraded
2018

    
2019

    
2020
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2021
  """Check that mirrors are not degraded.
2022

2023
  The ldisk parameter, if True, will change the test from the
2024
  is_degraded attribute (which represents overall non-ok status for
2025
  the device(s)) to the ldisk (representing the local storage status).
2026

2027
  """
2028
  lu.cfg.SetDiskID(dev, node)
2029

    
2030
  result = True
2031

    
2032
  if on_primary or dev.AssembleOnSecondary():
2033
    rstats = lu.rpc.call_blockdev_find(node, dev)
2034
    msg = rstats.fail_msg
2035
    if msg:
2036
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2037
      result = False
2038
    elif not rstats.payload:
2039
      lu.LogWarning("Can't find disk on node %s", node)
2040
      result = False
2041
    else:
2042
      if ldisk:
2043
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2044
      else:
2045
        result = result and not rstats.payload.is_degraded
2046

    
2047
  if dev.children:
2048
    for child in dev.children:
2049
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2050

    
2051
  return result
2052

    
2053

    
2054
class LUDiagnoseOS(NoHooksLU):
2055
  """Logical unit for OS diagnose/query.
2056

2057
  """
2058
  _OP_REQP = ["output_fields", "names"]
2059
  REQ_BGL = False
2060
  _FIELDS_STATIC = utils.FieldSet()
2061
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2062

    
2063
  def ExpandNames(self):
2064
    if self.op.names:
2065
      raise errors.OpPrereqError("Selective OS query not supported")
2066

    
2067
    _CheckOutputFields(static=self._FIELDS_STATIC,
2068
                       dynamic=self._FIELDS_DYNAMIC,
2069
                       selected=self.op.output_fields)
2070

    
2071
    # Lock all nodes, in shared mode
2072
    # Temporary removal of locks, should be reverted later
2073
    # TODO: reintroduce locks when they are lighter-weight
2074
    self.needed_locks = {}
2075
    #self.share_locks[locking.LEVEL_NODE] = 1
2076
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2077

    
2078
  def CheckPrereq(self):
2079
    """Check prerequisites.
2080

2081
    """
2082

    
2083
  @staticmethod
2084
  def _DiagnoseByOS(node_list, rlist):
2085
    """Remaps a per-node return list into an a per-os per-node dictionary
2086

2087
    @param node_list: a list with the names of all nodes
2088
    @param rlist: a map with node names as keys and OS objects as values
2089

2090
    @rtype: dict
2091
    @return: a dictionary with osnames as keys and as value another map, with
2092
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
2093

2094
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2095
                                     (/srv/..., False, "invalid api")],
2096
                           "node2": [(/srv/..., True, "")]}
2097
          }
2098

2099
    """
2100
    all_os = {}
2101
    # we build here the list of nodes that didn't fail the RPC (at RPC
2102
    # level), so that nodes with a non-responding node daemon don't
2103
    # make all OSes invalid
2104
    good_nodes = [node_name for node_name in rlist
2105
                  if not rlist[node_name].fail_msg]
2106
    for node_name, nr in rlist.items():
2107
      if nr.fail_msg or not nr.payload:
2108
        continue
2109
      for name, path, status, diagnose in nr.payload:
2110
        if name not in all_os:
2111
          # build a list of nodes for this os containing empty lists
2112
          # for each node in node_list
2113
          all_os[name] = {}
2114
          for nname in good_nodes:
2115
            all_os[name][nname] = []
2116
        all_os[name][node_name].append((path, status, diagnose))
2117
    return all_os
2118

    
2119
  def Exec(self, feedback_fn):
2120
    """Compute the list of OSes.
2121

2122
    """
2123
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2124
    node_data = self.rpc.call_os_diagnose(valid_nodes)
2125
    pol = self._DiagnoseByOS(valid_nodes, node_data)
2126
    output = []
2127
    for os_name, os_data in pol.items():
2128
      row = []
2129
      for field in self.op.output_fields:
2130
        if field == "name":
2131
          val = os_name
2132
        elif field == "valid":
2133
          val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2134
        elif field == "node_status":
2135
          # this is just a copy of the dict
2136
          val = {}
2137
          for node_name, nos_list in os_data.items():
2138
            val[node_name] = nos_list
2139
        else:
2140
          raise errors.ParameterError(field)
2141
        row.append(val)
2142
      output.append(row)
2143

    
2144
    return output
2145

    
2146

    
2147
class LURemoveNode(LogicalUnit):
2148
  """Logical unit for removing a node.
2149

2150
  """
2151
  HPATH = "node-remove"
2152
  HTYPE = constants.HTYPE_NODE
2153
  _OP_REQP = ["node_name"]
2154

    
2155
  def BuildHooksEnv(self):
2156
    """Build hooks env.
2157

2158
    This doesn't run on the target node in the pre phase as a failed
2159
    node would then be impossible to remove.
2160

2161
    """
2162
    env = {
2163
      "OP_TARGET": self.op.node_name,
2164
      "NODE_NAME": self.op.node_name,
2165
      }
2166
    all_nodes = self.cfg.GetNodeList()
2167
    if self.op.node_name in all_nodes:
2168
      all_nodes.remove(self.op.node_name)
2169
    return env, all_nodes, all_nodes
2170

    
2171
  def CheckPrereq(self):
2172
    """Check prerequisites.
2173

2174
    This checks:
2175
     - the node exists in the configuration
2176
     - it does not have primary or secondary instances
2177
     - it's not the master
2178

2179
    Any errors are signaled by raising errors.OpPrereqError.
2180

2181
    """
2182
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2183
    if node is None:
2184
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2185

    
2186
    instance_list = self.cfg.GetInstanceList()
2187

    
2188
    masternode = self.cfg.GetMasterNode()
2189
    if node.name == masternode:
2190
      raise errors.OpPrereqError("Node is the master node,"
2191
                                 " you need to failover first.")
2192

    
2193
    for instance_name in instance_list:
2194
      instance = self.cfg.GetInstanceInfo(instance_name)
2195
      if node.name in instance.all_nodes:
2196
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2197
                                   " please remove first." % instance_name)
2198
    self.op.node_name = node.name
2199
    self.node = node
2200

    
2201
  def Exec(self, feedback_fn):
2202
    """Removes the node from the cluster.
2203

2204
    """
2205
    node = self.node
2206
    logging.info("Stopping the node daemon and removing configs from node %s",
2207
                 node.name)
2208

    
2209
    self.context.RemoveNode(node.name)
2210

    
2211
    # Run post hooks on the node before it's removed
2212
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2213
    try:
2214
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2215
    except:
2216
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
2217

    
2218
    result = self.rpc.call_node_leave_cluster(node.name)
2219
    msg = result.fail_msg
2220
    if msg:
2221
      self.LogWarning("Errors encountered on the remote node while leaving"
2222
                      " the cluster: %s", msg)
2223

    
2224
    # Promote nodes to master candidate as needed
2225
    _AdjustCandidatePool(self)
2226

    
2227

    
2228
class LUQueryNodes(NoHooksLU):
2229
  """Logical unit for querying nodes.
2230

2231
  """
2232
  _OP_REQP = ["output_fields", "names", "use_locking"]
2233
  REQ_BGL = False
2234
  _FIELDS_DYNAMIC = utils.FieldSet(
2235
    "dtotal", "dfree",
2236
    "mtotal", "mnode", "mfree",
2237
    "bootid",
2238
    "ctotal", "cnodes", "csockets",
2239
    )
2240

    
2241
  _FIELDS_STATIC = utils.FieldSet(
2242
    "name", "pinst_cnt", "sinst_cnt",
2243
    "pinst_list", "sinst_list",
2244
    "pip", "sip", "tags",
2245
    "serial_no", "ctime", "mtime",
2246
    "master_candidate",
2247
    "master",
2248
    "offline",
2249
    "drained",
2250
    "role",
2251
    )
2252

    
2253
  def ExpandNames(self):
2254
    _CheckOutputFields(static=self._FIELDS_STATIC,
2255
                       dynamic=self._FIELDS_DYNAMIC,
2256
                       selected=self.op.output_fields)
2257

    
2258
    self.needed_locks = {}
2259
    self.share_locks[locking.LEVEL_NODE] = 1
2260

    
2261
    if self.op.names:
2262
      self.wanted = _GetWantedNodes(self, self.op.names)
2263
    else:
2264
      self.wanted = locking.ALL_SET
2265

    
2266
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2267
    self.do_locking = self.do_node_query and self.op.use_locking
2268
    if self.do_locking:
2269
      # if we don't request only static fields, we need to lock the nodes
2270
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2271

    
2272

    
2273
  def CheckPrereq(self):
2274
    """Check prerequisites.
2275

2276
    """
2277
    # The validation of the node list is done in the _GetWantedNodes,
2278
    # if non empty, and if empty, there's no validation to do
2279
    pass
2280

    
2281
  def Exec(self, feedback_fn):
2282
    """Computes the list of nodes and their attributes.
2283

2284
    """
2285
    all_info = self.cfg.GetAllNodesInfo()
2286
    if self.do_locking:
2287
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2288
    elif self.wanted != locking.ALL_SET:
2289
      nodenames = self.wanted
2290
      missing = set(nodenames).difference(all_info.keys())
2291
      if missing:
2292
        raise errors.OpExecError(
2293
          "Some nodes were removed before retrieving their data: %s" % missing)
2294
    else:
2295
      nodenames = all_info.keys()
2296

    
2297
    nodenames = utils.NiceSort(nodenames)
2298
    nodelist = [all_info[name] for name in nodenames]
2299

    
2300
    # begin data gathering
2301

    
2302
    if self.do_node_query:
2303
      live_data = {}
2304
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2305
                                          self.cfg.GetHypervisorType())
2306
      for name in nodenames:
2307
        nodeinfo = node_data[name]
2308
        if not nodeinfo.fail_msg and nodeinfo.payload:
2309
          nodeinfo = nodeinfo.payload
2310
          fn = utils.TryConvert
2311
          live_data[name] = {
2312
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2313
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2314
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2315
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2316
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2317
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2318
            "bootid": nodeinfo.get('bootid', None),
2319
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2320
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2321
            }
2322
        else:
2323
          live_data[name] = {}
2324
    else:
2325
      live_data = dict.fromkeys(nodenames, {})
2326

    
2327
    node_to_primary = dict([(name, set()) for name in nodenames])
2328
    node_to_secondary = dict([(name, set()) for name in nodenames])
2329

    
2330
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2331
                             "sinst_cnt", "sinst_list"))
2332
    if inst_fields & frozenset(self.op.output_fields):
2333
      instancelist = self.cfg.GetInstanceList()
2334

    
2335
      for instance_name in instancelist:
2336
        inst = self.cfg.GetInstanceInfo(instance_name)
2337
        if inst.primary_node in node_to_primary:
2338
          node_to_primary[inst.primary_node].add(inst.name)
2339
        for secnode in inst.secondary_nodes:
2340
          if secnode in node_to_secondary:
2341
            node_to_secondary[secnode].add(inst.name)
2342

    
2343
    master_node = self.cfg.GetMasterNode()
2344

    
2345
    # end data gathering
2346

    
2347
    output = []
2348
    for node in nodelist:
2349
      node_output = []
2350
      for field in self.op.output_fields:
2351
        if field == "name":
2352
          val = node.name
2353
        elif field == "pinst_list":
2354
          val = list(node_to_primary[node.name])
2355
        elif field == "sinst_list":
2356
          val = list(node_to_secondary[node.name])
2357
        elif field == "pinst_cnt":
2358
          val = len(node_to_primary[node.name])
2359
        elif field == "sinst_cnt":
2360
          val = len(node_to_secondary[node.name])
2361
        elif field == "pip":
2362
          val = node.primary_ip
2363
        elif field == "sip":
2364
          val = node.secondary_ip
2365
        elif field == "tags":
2366
          val = list(node.GetTags())
2367
        elif field == "serial_no":
2368
          val = node.serial_no
2369
        elif field == "ctime":
2370
          val = node.ctime
2371
        elif field == "mtime":
2372
          val = node.mtime
2373
        elif field == "master_candidate":
2374
          val = node.master_candidate
2375
        elif field == "master":
2376
          val = node.name == master_node
2377
        elif field == "offline":
2378
          val = node.offline
2379
        elif field == "drained":
2380
          val = node.drained
2381
        elif self._FIELDS_DYNAMIC.Matches(field):
2382
          val = live_data[node.name].get(field, None)
2383
        elif field == "role":
2384
          if node.name == master_node:
2385
            val = "M"
2386
          elif node.master_candidate:
2387
            val = "C"
2388
          elif node.drained:
2389
            val = "D"
2390
          elif node.offline:
2391
            val = "O"
2392
          else:
2393
            val = "R"
2394
        else:
2395
          raise errors.ParameterError(field)
2396
        node_output.append(val)
2397
      output.append(node_output)
2398

    
2399
    return output
2400

    
2401

    
2402
class LUQueryNodeVolumes(NoHooksLU):
2403
  """Logical unit for getting volumes on node(s).
2404

2405
  """
2406
  _OP_REQP = ["nodes", "output_fields"]
2407
  REQ_BGL = False
2408
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2409
  _FIELDS_STATIC = utils.FieldSet("node")
2410

    
2411
  def ExpandNames(self):
2412
    _CheckOutputFields(static=self._FIELDS_STATIC,
2413
                       dynamic=self._FIELDS_DYNAMIC,
2414
                       selected=self.op.output_fields)
2415

    
2416
    self.needed_locks = {}
2417
    self.share_locks[locking.LEVEL_NODE] = 1
2418
    if not self.op.nodes:
2419
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2420
    else:
2421
      self.needed_locks[locking.LEVEL_NODE] = \
2422
        _GetWantedNodes(self, self.op.nodes)
2423

    
2424
  def CheckPrereq(self):
2425
    """Check prerequisites.
2426

2427
    This checks that the fields required are valid output fields.
2428

2429
    """
2430
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2431

    
2432
  def Exec(self, feedback_fn):
2433
    """Computes the list of nodes and their attributes.
2434

2435
    """
2436
    nodenames = self.nodes
2437
    volumes = self.rpc.call_node_volumes(nodenames)
2438

    
2439
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2440
             in self.cfg.GetInstanceList()]
2441

    
2442
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2443

    
2444
    output = []
2445
    for node in nodenames:
2446
      nresult = volumes[node]
2447
      if nresult.offline:
2448
        continue
2449
      msg = nresult.fail_msg
2450
      if msg:
2451
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2452
        continue
2453

    
2454
      node_vols = nresult.payload[:]
2455
      node_vols.sort(key=lambda vol: vol['dev'])
2456

    
2457
      for vol in node_vols:
2458
        node_output = []
2459
        for field in self.op.output_fields:
2460
          if field == "node":
2461
            val = node
2462
          elif field == "phys":
2463
            val = vol['dev']
2464
          elif field == "vg":
2465
            val = vol['vg']
2466
          elif field == "name":
2467
            val = vol['name']
2468
          elif field == "size":
2469
            val = int(float(vol['size']))
2470
          elif field == "instance":
2471
            for inst in ilist:
2472
              if node not in lv_by_node[inst]:
2473
                continue
2474
              if vol['name'] in lv_by_node[inst][node]:
2475
                val = inst.name
2476
                break
2477
            else:
2478
              val = '-'
2479
          else:
2480
            raise errors.ParameterError(field)
2481
          node_output.append(str(val))
2482

    
2483
        output.append(node_output)
2484

    
2485
    return output
2486

    
2487

    
2488
class LUQueryNodeStorage(NoHooksLU):
2489
  """Logical unit for getting information on storage units on node(s).
2490

2491
  """
2492
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
2493
  REQ_BGL = False
2494
  _FIELDS_STATIC = utils.FieldSet("node")
2495

    
2496
  def ExpandNames(self):
2497
    storage_type = self.op.storage_type
2498

    
2499
    if storage_type not in constants.VALID_STORAGE_FIELDS:
2500
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2501

    
2502
    dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2503

    
2504
    _CheckOutputFields(static=self._FIELDS_STATIC,
2505
                       dynamic=utils.FieldSet(*dynamic_fields),
2506
                       selected=self.op.output_fields)
2507

    
2508
    self.needed_locks = {}
2509
    self.share_locks[locking.LEVEL_NODE] = 1
2510

    
2511
    if self.op.nodes:
2512
      self.needed_locks[locking.LEVEL_NODE] = \
2513
        _GetWantedNodes(self, self.op.nodes)
2514
    else:
2515
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2516

    
2517
  def CheckPrereq(self):
2518
    """Check prerequisites.
2519

2520
    This checks that the fields required are valid output fields.
2521

2522
    """
2523
    self.op.name = getattr(self.op, "name", None)
2524

    
2525
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2526

    
2527
  def Exec(self, feedback_fn):
2528
    """Computes the list of nodes and their attributes.
2529

2530
    """
2531
    # Always get name to sort by
2532
    if constants.SF_NAME in self.op.output_fields:
2533
      fields = self.op.output_fields[:]
2534
    else:
2535
      fields = [constants.SF_NAME] + self.op.output_fields
2536

    
2537
    # Never ask for node as it's only known to the LU
2538
    while "node" in fields:
2539
      fields.remove("node")
2540

    
2541
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2542
    name_idx = field_idx[constants.SF_NAME]
2543

    
2544
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2545
    data = self.rpc.call_storage_list(self.nodes,
2546
                                      self.op.storage_type, st_args,
2547
                                      self.op.name, fields)
2548

    
2549
    result = []
2550

    
2551
    for node in utils.NiceSort(self.nodes):
2552
      nresult = data[node]
2553
      if nresult.offline:
2554
        continue
2555

    
2556
      msg = nresult.fail_msg
2557
      if msg:
2558
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2559
        continue
2560

    
2561
      rows = dict([(row[name_idx], row) for row in nresult.payload])
2562

    
2563
      for name in utils.NiceSort(rows.keys()):
2564
        row = rows[name]
2565

    
2566
        out = []
2567

    
2568
        for field in self.op.output_fields:
2569
          if field == "node":
2570
            val = node
2571
          elif field in field_idx:
2572
            val = row[field_idx[field]]
2573
          else:
2574
            raise errors.ParameterError(field)
2575

    
2576
          out.append(val)
2577

    
2578
        result.append(out)
2579

    
2580
    return result
2581

    
2582

    
2583
class LUModifyNodeStorage(NoHooksLU):
2584
  """Logical unit for modifying a storage volume on a node.
2585

2586
  """
2587
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2588
  REQ_BGL = False
2589

    
2590
  def CheckArguments(self):
2591
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2592
    if node_name is None:
2593
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2594

    
2595
    self.op.node_name = node_name
2596

    
2597
    storage_type = self.op.storage_type
2598
    if storage_type not in constants.VALID_STORAGE_FIELDS:
2599
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2600

    
2601
  def ExpandNames(self):
2602
    self.needed_locks = {
2603
      locking.LEVEL_NODE: self.op.node_name,
2604
      }
2605

    
2606
  def CheckPrereq(self):
2607
    """Check prerequisites.
2608

2609
    """
2610
    storage_type = self.op.storage_type
2611

    
2612
    try:
2613
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2614
    except KeyError:
2615
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
2616
                                 " modified" % storage_type)
2617

    
2618
    diff = set(self.op.changes.keys()) - modifiable
2619
    if diff:
2620
      raise errors.OpPrereqError("The following fields can not be modified for"
2621
                                 " storage units of type '%s': %r" %
2622
                                 (storage_type, list(diff)))
2623

    
2624
  def Exec(self, feedback_fn):
2625
    """Computes the list of nodes and their attributes.
2626

2627
    """
2628
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2629
    result = self.rpc.call_storage_modify(self.op.node_name,
2630
                                          self.op.storage_type, st_args,
2631
                                          self.op.name, self.op.changes)
2632
    result.Raise("Failed to modify storage unit '%s' on %s" %
2633
                 (self.op.name, self.op.node_name))
2634

    
2635

    
2636
class LUAddNode(LogicalUnit):
2637
  """Logical unit for adding node to the cluster.
2638

2639
  """
2640
  HPATH = "node-add"
2641
  HTYPE = constants.HTYPE_NODE
2642
  _OP_REQP = ["node_name"]
2643

    
2644
  def BuildHooksEnv(self):
2645
    """Build hooks env.
2646

2647
    This will run on all nodes before, and on all nodes + the new node after.
2648

2649
    """
2650
    env = {
2651
      "OP_TARGET": self.op.node_name,
2652
      "NODE_NAME": self.op.node_name,
2653
      "NODE_PIP": self.op.primary_ip,
2654
      "NODE_SIP": self.op.secondary_ip,
2655
      }
2656
    nodes_0 = self.cfg.GetNodeList()
2657
    nodes_1 = nodes_0 + [self.op.node_name, ]
2658
    return env, nodes_0, nodes_1
2659

    
2660
  def CheckPrereq(self):
2661
    """Check prerequisites.
2662

2663
    This checks:
2664
     - the new node is not already in the config
2665
     - it is resolvable
2666
     - its parameters (single/dual homed) matches the cluster
2667

2668
    Any errors are signaled by raising errors.OpPrereqError.
2669

2670
    """
2671
    node_name = self.op.node_name
2672
    cfg = self.cfg
2673

    
2674
    dns_data = utils.HostInfo(node_name)
2675

    
2676
    node = dns_data.name
2677
    primary_ip = self.op.primary_ip = dns_data.ip
2678
    secondary_ip = getattr(self.op, "secondary_ip", None)
2679
    if secondary_ip is None:
2680
      secondary_ip = primary_ip
2681
    if not utils.IsValidIP(secondary_ip):
2682
      raise errors.OpPrereqError("Invalid secondary IP given")
2683
    self.op.secondary_ip = secondary_ip
2684

    
2685
    node_list = cfg.GetNodeList()
2686
    if not self.op.readd and node in node_list:
2687
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2688
                                 node)
2689
    elif self.op.readd and node not in node_list:
2690
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2691

    
2692
    for existing_node_name in node_list:
2693
      existing_node = cfg.GetNodeInfo(existing_node_name)
2694

    
2695
      if self.op.readd and node == existing_node_name:
2696
        if (existing_node.primary_ip != primary_ip or
2697
            existing_node.secondary_ip != secondary_ip):
2698
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2699
                                     " address configuration as before")
2700
        continue
2701

    
2702
      if (existing_node.primary_ip == primary_ip or
2703
          existing_node.secondary_ip == primary_ip or
2704
          existing_node.primary_ip == secondary_ip or
2705
          existing_node.secondary_ip == secondary_ip):
2706
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2707
                                   " existing node %s" % existing_node.name)
2708

    
2709
    # check that the type of the node (single versus dual homed) is the
2710
    # same as for the master
2711
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2712
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2713
    newbie_singlehomed = secondary_ip == primary_ip
2714
    if master_singlehomed != newbie_singlehomed:
2715
      if master_singlehomed:
2716
        raise errors.OpPrereqError("The master has no private ip but the"
2717
                                   " new node has one")
2718
      else:
2719
        raise errors.OpPrereqError("The master has a private ip but the"
2720
                                   " new node doesn't have one")
2721

    
2722
    # checks reachability
2723
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2724
      raise errors.OpPrereqError("Node not reachable by ping")
2725

    
2726
    if not newbie_singlehomed:
2727
      # check reachability from my secondary ip to newbie's secondary ip
2728
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2729
                           source=myself.secondary_ip):
2730
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2731
                                   " based ping to noded port")
2732

    
2733
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2734
    if self.op.readd:
2735
      exceptions = [node]
2736
    else:
2737
      exceptions = []
2738
    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2739
    # the new node will increase mc_max with one, so:
2740
    mc_max = min(mc_max + 1, cp_size)
2741
    self.master_candidate = mc_now < mc_max
2742

    
2743
    if self.op.readd:
2744
      self.new_node = self.cfg.GetNodeInfo(node)
2745
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2746
    else:
2747
      self.new_node = objects.Node(name=node,
2748
                                   primary_ip=primary_ip,
2749
                                   secondary_ip=secondary_ip,
2750
                                   master_candidate=self.master_candidate,
2751
                                   offline=False, drained=False)
2752

    
2753
  def Exec(self, feedback_fn):
2754
    """Adds the new node to the cluster.
2755

2756
    """
2757
    new_node = self.new_node
2758
    node = new_node.name
2759

    
2760
    # for re-adds, reset the offline/drained/master-candidate flags;
2761
    # we need to reset here, otherwise offline would prevent RPC calls
2762
    # later in the procedure; this also means that if the re-add
2763
    # fails, we are left with a non-offlined, broken node
2764
    if self.op.readd:
2765
      new_node.drained = new_node.offline = False
2766
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2767
      # if we demote the node, we do cleanup later in the procedure
2768
      new_node.master_candidate = self.master_candidate
2769

    
2770
    # notify the user about any possible mc promotion
2771
    if new_node.master_candidate:
2772
      self.LogInfo("Node will be a master candidate")
2773

    
2774
    # check connectivity
2775
    result = self.rpc.call_version([node])[node]
2776
    result.Raise("Can't get version information from node %s" % node)
2777
    if constants.PROTOCOL_VERSION == result.payload:
2778
      logging.info("Communication to node %s fine, sw version %s match",
2779
                   node, result.payload)
2780
    else:
2781
      raise errors.OpExecError("Version mismatch master version %s,"
2782
                               " node version %s" %
2783
                               (constants.PROTOCOL_VERSION, result.payload))
2784

    
2785
    # setup ssh on node
2786
    logging.info("Copy ssh key to node %s", node)
2787
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2788
    keyarray = []
2789
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2790
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2791
                priv_key, pub_key]
2792

    
2793
    for i in keyfiles:
2794
      f = open(i, 'r')
2795
      try:
2796
        keyarray.append(f.read())
2797
      finally:
2798
        f.close()
2799

    
2800
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2801
                                    keyarray[2],
2802
                                    keyarray[3], keyarray[4], keyarray[5])
2803
    result.Raise("Cannot transfer ssh keys to the new node")
2804

    
2805
    # Add node to our /etc/hosts, and add key to known_hosts
2806
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2807
      utils.AddHostToEtcHosts(new_node.name)
2808

    
2809
    if new_node.secondary_ip != new_node.primary_ip:
2810
      result = self.rpc.call_node_has_ip_address(new_node.name,
2811
                                                 new_node.secondary_ip)
2812
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2813
                   prereq=True)
2814
      if not result.payload:
2815
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2816
                                 " you gave (%s). Please fix and re-run this"
2817
                                 " command." % new_node.secondary_ip)
2818

    
2819
    node_verify_list = [self.cfg.GetMasterNode()]
2820
    node_verify_param = {
2821
      'nodelist': [node],
2822
      # TODO: do a node-net-test as well?
2823
    }
2824

    
2825
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2826
                                       self.cfg.GetClusterName())
2827
    for verifier in node_verify_list:
2828
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2829
      nl_payload = result[verifier].payload['nodelist']
2830
      if nl_payload:
2831
        for failed in nl_payload:
2832
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2833
                      (verifier, nl_payload[failed]))
2834
        raise errors.OpExecError("ssh/hostname verification failed.")
2835

    
2836
    if self.op.readd:
2837
      _RedistributeAncillaryFiles(self)
2838
      self.context.ReaddNode(new_node)
2839
      # make sure we redistribute the config
2840
      self.cfg.Update(new_node)
2841
      # and make sure the new node will not have old files around
2842
      if not new_node.master_candidate:
2843
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2844
        msg = result.RemoteFailMsg()
2845
        if msg:
2846
          self.LogWarning("Node failed to demote itself from master"
2847
                          " candidate status: %s" % msg)
2848
    else:
2849
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2850
      self.context.AddNode(new_node)
2851

    
2852

    
2853
class LUSetNodeParams(LogicalUnit):
2854
  """Modifies the parameters of a node.
2855

2856
  """
2857
  HPATH = "node-modify"
2858
  HTYPE = constants.HTYPE_NODE
2859
  _OP_REQP = ["node_name"]
2860
  REQ_BGL = False
2861

    
2862
  def CheckArguments(self):
2863
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2864
    if node_name is None:
2865
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2866
    self.op.node_name = node_name
2867
    _CheckBooleanOpField(self.op, 'master_candidate')
2868
    _CheckBooleanOpField(self.op, 'offline')
2869
    _CheckBooleanOpField(self.op, 'drained')
2870
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2871
    if all_mods.count(None) == 3:
2872
      raise errors.OpPrereqError("Please pass at least one modification")
2873
    if all_mods.count(True) > 1:
2874
      raise errors.OpPrereqError("Can't set the node into more than one"
2875
                                 " state at the same time")
2876

    
2877
  def ExpandNames(self):
2878
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2879

    
2880
  def BuildHooksEnv(self):
2881
    """Build hooks env.
2882

2883
    This runs on the master node.
2884

2885
    """
2886
    env = {
2887
      "OP_TARGET": self.op.node_name,
2888
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2889
      "OFFLINE": str(self.op.offline),
2890
      "DRAINED": str(self.op.drained),
2891
      }
2892
    nl = [self.cfg.GetMasterNode(),
2893
          self.op.node_name]
2894
    return env, nl, nl
2895

    
2896
  def CheckPrereq(self):
2897
    """Check prerequisites.
2898

2899
    This only checks the instance list against the existing names.
2900

2901
    """
2902
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2903

    
2904
    if ((self.op.master_candidate == False or self.op.offline == True or
2905
         self.op.drained == True) and node.master_candidate):
2906
      # we will demote the node from master_candidate
2907
      if self.op.node_name == self.cfg.GetMasterNode():
2908
        raise errors.OpPrereqError("The master node has to be a"
2909
                                   " master candidate, online and not drained")
2910
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2911
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2912
      if num_candidates <= cp_size:
2913
        msg = ("Not enough master candidates (desired"
2914
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2915
        if self.op.force:
2916
          self.LogWarning(msg)
2917
        else:
2918
          raise errors.OpPrereqError(msg)
2919

    
2920
    if (self.op.master_candidate == True and
2921
        ((node.offline and not self.op.offline == False) or
2922
         (node.drained and not self.op.drained == False))):
2923
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2924
                                 " to master_candidate" % node.name)
2925

    
2926
    return
2927

    
2928
  def Exec(self, feedback_fn):
2929
    """Modifies a node.
2930

2931
    """
2932
    node = self.node
2933

    
2934
    result = []
2935
    changed_mc = False
2936

    
2937
    if self.op.offline is not None:
2938
      node.offline = self.op.offline
2939
      result.append(("offline", str(self.op.offline)))
2940
      if self.op.offline == True:
2941
        if node.master_candidate:
2942
          node.master_candidate = False
2943
          changed_mc = True
2944
          result.append(("master_candidate", "auto-demotion due to offline"))
2945
        if node.drained:
2946
          node.drained = False
2947
          result.append(("drained", "clear drained status due to offline"))
2948

    
2949
    if self.op.master_candidate is not None:
2950
      node.master_candidate = self.op.master_candidate
2951
      changed_mc = True
2952
      result.append(("master_candidate", str(self.op.master_candidate)))
2953
      if self.op.master_candidate == False:
2954
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2955
        msg = rrc.fail_msg
2956
        if msg:
2957
          self.LogWarning("Node failed to demote itself: %s" % msg)
2958

    
2959
    if self.op.drained is not None:
2960
      node.drained = self.op.drained
2961
      result.append(("drained", str(self.op.drained)))
2962
      if self.op.drained == True:
2963
        if node.master_candidate:
2964
          node.master_candidate = False
2965
          changed_mc = True
2966
          result.append(("master_candidate", "auto-demotion due to drain"))
2967
          rrc = self.rpc.call_node_demote_from_mc(node.name)
2968
          msg = rrc.RemoteFailMsg()
2969
          if msg:
2970
            self.LogWarning("Node failed to demote itself: %s" % msg)
2971
        if node.offline:
2972
          node.offline = False
2973
          result.append(("offline", "clear offline status due to drain"))
2974

    
2975
    # this will trigger configuration file update, if needed
2976
    self.cfg.Update(node)
2977
    # this will trigger job queue propagation or cleanup
2978
    if changed_mc:
2979
      self.context.ReaddNode(node)
2980

    
2981
    return result
2982

    
2983

    
2984
class LUPowercycleNode(NoHooksLU):
2985
  """Powercycles a node.
2986

2987
  """
2988
  _OP_REQP = ["node_name", "force"]
2989
  REQ_BGL = False
2990

    
2991
  def CheckArguments(self):
2992
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2993
    if node_name is None:
2994
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2995
    self.op.node_name = node_name
2996
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
2997
      raise errors.OpPrereqError("The node is the master and the force"
2998
                                 " parameter was not set")
2999

    
3000
  def ExpandNames(self):
3001
    """Locking for PowercycleNode.
3002

3003
    This is a last-resort option and shouldn't block on other
3004
    jobs. Therefore, we grab no locks.
3005

3006
    """
3007
    self.needed_locks = {}
3008

    
3009
  def CheckPrereq(self):
3010
    """Check prerequisites.
3011

3012
    This LU has no prereqs.
3013

3014
    """
3015
    pass
3016

    
3017
  def Exec(self, feedback_fn):
3018
    """Reboots a node.
3019

3020
    """
3021
    result = self.rpc.call_node_powercycle(self.op.node_name,
3022
                                           self.cfg.GetHypervisorType())
3023
    result.Raise("Failed to schedule the reboot")
3024
    return result.payload
3025

    
3026

    
3027
class LUQueryClusterInfo(NoHooksLU):
3028
  """Query cluster configuration.
3029

3030
  """
3031
  _OP_REQP = []
3032
  REQ_BGL = False
3033

    
3034
  def ExpandNames(self):
3035
    self.needed_locks = {}
3036

    
3037
  def CheckPrereq(self):
3038
    """No prerequsites needed for this LU.
3039

3040
    """
3041
    pass
3042

    
3043
  def Exec(self, feedback_fn):
3044
    """Return cluster config.
3045

3046
    """
3047
    cluster = self.cfg.GetClusterInfo()
3048
    result = {
3049
      "software_version": constants.RELEASE_VERSION,
3050
      "protocol_version": constants.PROTOCOL_VERSION,
3051
      "config_version": constants.CONFIG_VERSION,
3052
      "os_api_version": max(constants.OS_API_VERSIONS),
3053
      "export_version": constants.EXPORT_VERSION,
3054
      "architecture": (platform.architecture()[0], platform.machine()),
3055
      "name": cluster.cluster_name,
3056
      "master": cluster.master_node,
3057
      "default_hypervisor": cluster.enabled_hypervisors[0],
3058
      "enabled_hypervisors": cluster.enabled_hypervisors,
3059
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3060
                        for hypervisor_name in cluster.enabled_hypervisors]),
3061
      "beparams": cluster.beparams,
3062
      "nicparams": cluster.nicparams,
3063
      "candidate_pool_size": cluster.candidate_pool_size,
3064
      "master_netdev": cluster.master_netdev,
3065
      "volume_group_name": cluster.volume_group_name,
3066
      "file_storage_dir": cluster.file_storage_dir,
3067
      "ctime": cluster.ctime,
3068
      "mtime": cluster.mtime,
3069
      }
3070

    
3071
    return result
3072

    
3073

    
3074
class LUQueryConfigValues(NoHooksLU):
3075
  """Return configuration values.
3076

3077
  """
3078
  _OP_REQP = []
3079
  REQ_BGL = False
3080
  _FIELDS_DYNAMIC = utils.FieldSet()
3081
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
3082

    
3083
  def ExpandNames(self):
3084
    self.needed_locks = {}
3085

    
3086
    _CheckOutputFields(static=self._FIELDS_STATIC,
3087
                       dynamic=self._FIELDS_DYNAMIC,
3088
                       selected=self.op.output_fields)
3089

    
3090
  def CheckPrereq(self):
3091
    """No prerequisites.
3092

3093
    """
3094
    pass
3095

    
3096
  def Exec(self, feedback_fn):
3097
    """Dump a representation of the cluster config to the standard output.
3098

3099
    """
3100
    values = []
3101
    for field in self.op.output_fields:
3102
      if field == "cluster_name":
3103
        entry = self.cfg.GetClusterName()
3104
      elif field == "master_node":
3105
        entry = self.cfg.GetMasterNode()
3106
      elif field == "drain_flag":
3107
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3108
      else:
3109
        raise errors.ParameterError(field)
3110
      values.append(entry)
3111
    return values
3112

    
3113

    
3114
class LUActivateInstanceDisks(NoHooksLU):
3115
  """Bring up an instance's disks.
3116

3117
  """
3118
  _OP_REQP = ["instance_name"]
3119
  REQ_BGL = False
3120

    
3121
  def ExpandNames(self):
3122
    self._ExpandAndLockInstance()
3123
    self.needed_locks[locking.LEVEL_NODE] = []
3124
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3125

    
3126
  def DeclareLocks(self, level):
3127
    if level == locking.LEVEL_NODE:
3128
      self._LockInstancesNodes()
3129

    
3130
  def CheckPrereq(self):
3131
    """Check prerequisites.
3132

3133
    This checks that the instance is in the cluster.
3134

3135
    """
3136
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3137
    assert self.instance is not None, \
3138
      "Cannot retrieve locked instance %s" % self.op.instance_name
3139
    _CheckNodeOnline(self, self.instance.primary_node)
3140
    if not hasattr(self.op, "ignore_size"):
3141
      self.op.ignore_size = False
3142

    
3143
  def Exec(self, feedback_fn):
3144
    """Activate the disks.
3145

3146
    """
3147
    disks_ok, disks_info = \
3148
              _AssembleInstanceDisks(self, self.instance,
3149
                                     ignore_size=self.op.ignore_size)
3150
    if not disks_ok:
3151
      raise errors.OpExecError("Cannot activate block devices")
3152

    
3153
    return disks_info
3154

    
3155

    
3156
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3157
                           ignore_size=False):
3158
  """Prepare the block devices for an instance.
3159

3160
  This sets up the block devices on all nodes.
3161

3162
  @type lu: L{LogicalUnit}
3163
  @param lu: the logical unit on whose behalf we execute
3164
  @type instance: L{objects.Instance}
3165
  @param instance: the instance for whose disks we assemble
3166
  @type ignore_secondaries: boolean
3167
  @param ignore_secondaries: if true, errors on secondary nodes
3168
      won't result in an error return from the function
3169
  @type ignore_size: boolean
3170
  @param ignore_size: if true, the current known size of the disk
3171
      will not be used during the disk activation, useful for cases
3172
      when the size is wrong
3173
  @return: False if the operation failed, otherwise a list of
3174
      (host, instance_visible_name, node_visible_name)
3175
      with the mapping from node devices to instance devices
3176

3177
  """
3178
  device_info = []
3179
  disks_ok = True
3180
  iname = instance.name
3181
  # With the two passes mechanism we try to reduce the window of
3182
  # opportunity for the race condition of switching DRBD to primary
3183
  # before handshaking occured, but we do not eliminate it
3184

    
3185
  # The proper fix would be to wait (with some limits) until the
3186
  # connection has been made and drbd transitions from WFConnection
3187
  # into any other network-connected state (Connected, SyncTarget,
3188
  # SyncSource, etc.)
3189

    
3190
  # 1st pass, assemble on all nodes in secondary mode
3191
  for inst_disk in instance.disks:
3192
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3193
      if ignore_size:
3194
        node_disk = node_disk.Copy()
3195
        node_disk.UnsetSize()
3196
      lu.cfg.SetDiskID(node_disk, node)
3197
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3198
      msg = result.fail_msg
3199
      if msg:
3200
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3201
                           " (is_primary=False, pass=1): %s",
3202
                           inst_disk.iv_name, node, msg)
3203
        if not ignore_secondaries:
3204
          disks_ok = False
3205

    
3206
  # FIXME: race condition on drbd migration to primary
3207

    
3208
  # 2nd pass, do only the primary node
3209
  for inst_disk in instance.disks:
3210
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3211
      if node != instance.primary_node:
3212
        continue
3213
      if ignore_size:
3214
        node_disk = node_disk.Copy()
3215
        node_disk.UnsetSize()
3216
      lu.cfg.SetDiskID(node_disk, node)
3217
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3218
      msg = result.fail_msg
3219
      if msg:
3220
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3221
                           " (is_primary=True, pass=2): %s",
3222
                           inst_disk.iv_name, node, msg)
3223
        disks_ok = False
3224
    device_info.append((instance.primary_node, inst_disk.iv_name,
3225
                        result.payload))
3226

    
3227
  # leave the disks configured for the primary node
3228
  # this is a workaround that would be fixed better by
3229
  # improving the logical/physical id handling
3230
  for disk in instance.disks:
3231
    lu.cfg.SetDiskID(disk, instance.primary_node)
3232

    
3233
  return disks_ok, device_info
3234

    
3235

    
3236
def _StartInstanceDisks(lu, instance, force):
3237
  """Start the disks of an instance.
3238

3239
  """
3240
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3241
                                           ignore_secondaries=force)
3242
  if not disks_ok:
3243
    _ShutdownInstanceDisks(lu, instance)
3244
    if force is not None and not force:
3245
      lu.proc.LogWarning("", hint="If the message above refers to a"
3246
                         " secondary node,"
3247
                         " you can retry the operation using '--force'.")
3248
    raise errors.OpExecError("Disk consistency error")
3249

    
3250

    
3251
class LUDeactivateInstanceDisks(NoHooksLU):
3252
  """Shutdown an instance's disks.
3253

3254
  """
3255
  _OP_REQP = ["instance_name"]
3256
  REQ_BGL = False
3257

    
3258
  def ExpandNames(self):
3259
    self._ExpandAndLockInstance()
3260
    self.needed_locks[locking.LEVEL_NODE] = []
3261
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3262

    
3263
  def DeclareLocks(self, level):
3264
    if level == locking.LEVEL_NODE:
3265
      self._LockInstancesNodes()
3266

    
3267
  def CheckPrereq(self):
3268
    """Check prerequisites.
3269

3270
    This checks that the instance is in the cluster.
3271

3272
    """
3273
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3274
    assert self.instance is not None, \
3275
      "Cannot retrieve locked instance %s" % self.op.instance_name
3276

    
3277
  def Exec(self, feedback_fn):
3278
    """Deactivate the disks
3279

3280
    """
3281
    instance = self.instance
3282
    _SafeShutdownInstanceDisks(self, instance)
3283

    
3284

    
3285
def _SafeShutdownInstanceDisks(lu, instance):
3286
  """Shutdown block devices of an instance.
3287

3288
  This function checks if an instance is running, before calling
3289
  _ShutdownInstanceDisks.
3290

3291
  """
3292
  pnode = instance.primary_node
3293
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3294
  ins_l.Raise("Can't contact node %s" % pnode)
3295

    
3296
  if instance.name in ins_l.payload:
3297
    raise errors.OpExecError("Instance is running, can't shutdown"
3298
                             " block devices.")
3299

    
3300
  _ShutdownInstanceDisks(lu, instance)
3301

    
3302

    
3303
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3304
  """Shutdown block devices of an instance.
3305

3306
  This does the shutdown on all nodes of the instance.
3307

3308
  If the ignore_primary is false, errors on the primary node are
3309
  ignored.
3310

3311
  """
3312
  all_result = True
3313
  for disk in instance.disks:
3314
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3315
      lu.cfg.SetDiskID(top_disk, node)
3316
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3317
      msg = result.fail_msg
3318
      if msg:
3319
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3320
                      disk.iv_name, node, msg)
3321
        if not ignore_primary or node != instance.primary_node:
3322
          all_result = False
3323
  return all_result
3324

    
3325

    
3326
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3327
  """Checks if a node has enough free memory.
3328

3329
  This function check if a given node has the needed amount of free
3330
  memory. In case the node has less memory or we cannot get the
3331
  information from the node, this function raise an OpPrereqError
3332
  exception.
3333

3334
  @type lu: C{LogicalUnit}
3335
  @param lu: a logical unit from which we get configuration data
3336
  @type node: C{str}
3337
  @param node: the node to check
3338
  @type reason: C{str}
3339
  @param reason: string to use in the error message
3340
  @type requested: C{int}
3341
  @param requested: the amount of memory in MiB to check for
3342
  @type hypervisor_name: C{str}
3343
  @param hypervisor_name: the hypervisor to ask for memory stats
3344
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3345
      we cannot check the node
3346

3347
  """
3348
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3349
  nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3350
  free_mem = nodeinfo[node].payload.get('memory_free', None)
3351
  if not isinstance(free_mem, int):
3352
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3353
                               " was '%s'" % (node, free_mem))
3354
  if requested > free_mem:
3355
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3356
                               " needed %s MiB, available %s MiB" %
3357
                               (node, reason, requested, free_mem))
3358

    
3359

    
3360
class LUStartupInstance(LogicalUnit):
3361
  """Starts an instance.
3362

3363
  """
3364
  HPATH = "instance-start"
3365
  HTYPE = constants.HTYPE_INSTANCE
3366
  _OP_REQP = ["instance_name", "force"]
3367
  REQ_BGL = False
3368

    
3369
  def ExpandNames(self):
3370
    self._ExpandAndLockInstance()
3371

    
3372
  def BuildHooksEnv(self):
3373
    """Build hooks env.
3374

3375
    This runs on master, primary and secondary nodes of the instance.
3376

3377
    """
3378
    env = {
3379
      "FORCE": self.op.force,
3380
      }
3381
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3382
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3383
    return env, nl, nl
3384

    
3385
  def CheckPrereq(self):
3386
    """Check prerequisites.
3387

3388
    This checks that the instance is in the cluster.
3389

3390
    """
3391
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3392
    assert self.instance is not None, \
3393
      "Cannot retrieve locked instance %s" % self.op.instance_name
3394

    
3395
    # extra beparams
3396
    self.beparams = getattr(self.op, "beparams", {})
3397
    if self.beparams:
3398
      if not isinstance(self.beparams, dict):
3399
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3400
                                   " dict" % (type(self.beparams), ))
3401
      # fill the beparams dict
3402
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3403
      self.op.beparams = self.beparams
3404

    
3405
    # extra hvparams
3406
    self.hvparams = getattr(self.op, "hvparams", {})
3407
    if self.hvparams:
3408
      if not isinstance(self.hvparams, dict):
3409
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3410
                                   " dict" % (type(self.hvparams), ))
3411

    
3412
      # check hypervisor parameter syntax (locally)
3413
      cluster = self.cfg.GetClusterInfo()
3414
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3415
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3416
                                    instance.hvparams)
3417
      filled_hvp.update(self.hvparams)
3418
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3419
      hv_type.CheckParameterSyntax(filled_hvp)
3420
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3421
      self.op.hvparams = self.hvparams
3422

    
3423
    _CheckNodeOnline(self, instance.primary_node)
3424

    
3425
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3426
    # check bridges existence
3427
    _CheckInstanceBridgesExist(self, instance)
3428

    
3429
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3430
                                              instance.name,
3431
                                              instance.hypervisor)
3432
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3433
                      prereq=True)
3434
    if not remote_info.payload: # not running already
3435
      _CheckNodeFreeMemory(self, instance.primary_node,
3436
                           "starting instance %s" % instance.name,
3437
                           bep[constants.BE_MEMORY], instance.hypervisor)
3438

    
3439
  def Exec(self, feedback_fn):
3440
    """Start the instance.
3441

3442
    """
3443
    instance = self.instance
3444
    force = self.op.force
3445

    
3446
    self.cfg.MarkInstanceUp(instance.name)
3447

    
3448
    node_current = instance.primary_node
3449

    
3450
    _StartInstanceDisks(self, instance, force)
3451

    
3452
    result = self.rpc.call_instance_start(node_current, instance,
3453
                                          self.hvparams, self.beparams)
3454
    msg = result.fail_msg
3455
    if msg:
3456
      _ShutdownInstanceDisks(self, instance)
3457
      raise errors.OpExecError("Could not start instance: %s" % msg)
3458

    
3459

    
3460
class LURebootInstance(LogicalUnit):
3461
  """Reboot an instance.
3462

3463
  """
3464
  HPATH = "instance-reboot"
3465
  HTYPE = constants.HTYPE_INSTANCE
3466
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3467
  REQ_BGL = False
3468

    
3469
  def ExpandNames(self):
3470
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3471
                                   constants.INSTANCE_REBOOT_HARD,
3472
                                   constants.INSTANCE_REBOOT_FULL]:
3473
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3474
                                  (constants.INSTANCE_REBOOT_SOFT,
3475
                                   constants.INSTANCE_REBOOT_HARD,
3476
                                   constants.INSTANCE_REBOOT_FULL))
3477
    self._ExpandAndLockInstance()
3478

    
3479
  def BuildHooksEnv(self):
3480
    """Build hooks env.
3481

3482
    This runs on master, primary and secondary nodes of the instance.
3483

3484
    """
3485
    env = {
3486
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3487
      "REBOOT_TYPE": self.op.reboot_type,
3488
      }
3489
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3490
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3491
    return env, nl, nl
3492

    
3493
  def CheckPrereq(self):
3494
    """Check prerequisites.
3495

3496
    This checks that the instance is in the cluster.
3497

3498
    """
3499
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3500
    assert self.instance is not None, \
3501
      "Cannot retrieve locked instance %s" % self.op.instance_name
3502

    
3503
    _CheckNodeOnline(self, instance.primary_node)
3504

    
3505
    # check bridges existence
3506
    _CheckInstanceBridgesExist(self, instance)
3507

    
3508
  def Exec(self, feedback_fn):
3509
    """Reboot the instance.
3510

3511
    """
3512
    instance = self.instance
3513
    ignore_secondaries = self.op.ignore_secondaries
3514
    reboot_type = self.op.reboot_type
3515

    
3516
    node_current = instance.primary_node
3517

    
3518
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3519
                       constants.INSTANCE_REBOOT_HARD]:
3520
      for disk in instance.disks:
3521
        self.cfg.SetDiskID(disk, node_current)
3522
      result = self.rpc.call_instance_reboot(node_current, instance,
3523
                                             reboot_type)
3524
      result.Raise("Could not reboot instance")
3525
    else:
3526
      result = self.rpc.call_instance_shutdown(node_current, instance)
3527
      result.Raise("Could not shutdown instance for full reboot")
3528
      _ShutdownInstanceDisks(self, instance)
3529
      _StartInstanceDisks(self, instance, ignore_secondaries)
3530
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3531
      msg = result.fail_msg
3532
      if msg:
3533
        _ShutdownInstanceDisks(self, instance)
3534
        raise errors.OpExecError("Could not start instance for"
3535
                                 " full reboot: %s" % msg)
3536

    
3537
    self.cfg.MarkInstanceUp(instance.name)
3538

    
3539

    
3540
class LUShutdownInstance(LogicalUnit):
3541
  """Shutdown an instance.
3542

3543
  """
3544
  HPATH = "instance-stop"
3545
  HTYPE = constants.HTYPE_INSTANCE
3546
  _OP_REQP = ["instance_name"]
3547
  REQ_BGL = False
3548

    
3549
  def ExpandNames(self):
3550
    self._ExpandAndLockInstance()
3551

    
3552
  def BuildHooksEnv(self):
3553
    """Build hooks env.
3554

3555
    This runs on master, primary and secondary nodes of the instance.
3556

3557
    """
3558
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3559
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3560
    return env, nl, nl
3561

    
3562
  def CheckPrereq(self):
3563
    """Check prerequisites.
3564

3565
    This checks that the instance is in the cluster.
3566

3567
    """
3568
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3569
    assert self.instance is not None, \
3570
      "Cannot retrieve locked instance %s" % self.op.instance_name
3571
    _CheckNodeOnline(self, self.instance.primary_node)
3572

    
3573
  def Exec(self, feedback_fn):
3574
    """Shutdown the instance.
3575

3576
    """
3577
    instance = self.instance
3578
    node_current = instance.primary_node
3579
    self.cfg.MarkInstanceDown(instance.name)
3580
    result = self.rpc.call_instance_shutdown(node_current, instance)
3581
    msg = result.fail_msg
3582
    if msg:
3583
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3584

    
3585
    _ShutdownInstanceDisks(self, instance)
3586

    
3587

    
3588
class LUReinstallInstance(LogicalUnit):
3589
  """Reinstall an instance.
3590

3591
  """
3592
  HPATH = "instance-reinstall"
3593
  HTYPE = constants.HTYPE_INSTANCE
3594
  _OP_REQP = ["instance_name"]
3595
  REQ_BGL = False
3596

    
3597
  def ExpandNames(self):
3598
    self._ExpandAndLockInstance()
3599

    
3600
  def BuildHooksEnv(self):
3601
    """Build hooks env.
3602

3603
    This runs on master, primary and secondary nodes of the instance.
3604

3605
    """
3606
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3607
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3608
    return env, nl, nl
3609

    
3610
  def CheckPrereq(self):
3611
    """Check prerequisites.
3612

3613
    This checks that the instance is in the cluster and is not running.
3614

3615
    """
3616
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3617
    assert instance is not None, \
3618
      "Cannot retrieve locked instance %s" % self.op.instance_name
3619
    _CheckNodeOnline(self, instance.primary_node)
3620

    
3621
    if instance.disk_template == constants.DT_DISKLESS:
3622
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3623
                                 self.op.instance_name)
3624
    if instance.admin_up:
3625
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3626
                                 self.op.instance_name)
3627
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3628
                                              instance.name,
3629
                                              instance.hypervisor)
3630
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3631
                      prereq=True)
3632
    if remote_info.payload:
3633
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3634
                                 (self.op.instance_name,
3635
                                  instance.primary_node))
3636

    
3637
    self.op.os_type = getattr(self.op, "os_type", None)
3638
    if self.op.os_type is not None:
3639
      # OS verification
3640
      pnode = self.cfg.GetNodeInfo(
3641
        self.cfg.ExpandNodeName(instance.primary_node))
3642
      if pnode is None:
3643
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3644
                                   self.op.pnode)
3645
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3646
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3647
                   (self.op.os_type, pnode.name), prereq=True)
3648

    
3649
    self.instance = instance
3650

    
3651
  def Exec(self, feedback_fn):
3652
    """Reinstall the instance.
3653

3654
    """
3655
    inst = self.instance
3656

    
3657
    if self.op.os_type is not None:
3658
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3659
      inst.os = self.op.os_type
3660
      self.cfg.Update(inst)
3661

    
3662
    _StartInstanceDisks(self, inst, None)
3663
    try:
3664
      feedback_fn("Running the instance OS create scripts...")
3665
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3666
      result.Raise("Could not install OS for instance %s on node %s" %
3667
                   (inst.name, inst.primary_node))
3668
    finally:
3669
      _ShutdownInstanceDisks(self, inst)
3670

    
3671

    
3672
class LURecreateInstanceDisks(LogicalUnit):
3673
  """Recreate an instance's missing disks.
3674

3675
  """
3676
  HPATH = "instance-recreate-disks"
3677
  HTYPE = constants.HTYPE_INSTANCE
3678
  _OP_REQP = ["instance_name", "disks"]
3679
  REQ_BGL = False
3680

    
3681
  def CheckArguments(self):
3682
    """Check the arguments.
3683

3684
    """
3685
    if not isinstance(self.op.disks, list):
3686
      raise errors.OpPrereqError("Invalid disks parameter")
3687
    for item in self.op.disks:
3688
      if (not isinstance(item, int) or
3689
          item < 0):
3690
        raise errors.OpPrereqError("Invalid disk specification '%s'" %
3691
                                   str(item))
3692

    
3693
  def ExpandNames(self):
3694
    self._ExpandAndLockInstance()
3695

    
3696
  def BuildHooksEnv(self):
3697
    """Build hooks env.
3698

3699
    This runs on master, primary and secondary nodes of the instance.
3700

3701
    """
3702
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3703
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3704
    return env, nl, nl
3705

    
3706
  def CheckPrereq(self):
3707
    """Check prerequisites.
3708

3709
    This checks that the instance is in the cluster and is not running.
3710

3711
    """
3712
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3713
    assert instance is not None, \
3714
      "Cannot retrieve locked instance %s" % self.op.instance_name
3715
    _CheckNodeOnline(self, instance.primary_node)
3716

    
3717
    if instance.disk_template == constants.DT_DISKLESS:
3718
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3719
                                 self.op.instance_name)
3720
    if instance.admin_up:
3721
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3722
                                 self.op.instance_name)
3723
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3724
                                              instance.name,
3725
                                              instance.hypervisor)
3726
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3727
                      prereq=True)
3728
    if remote_info.payload:
3729
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3730
                                 (self.op.instance_name,
3731
                                  instance.primary_node))
3732

    
3733
    if not self.op.disks:
3734
      self.op.disks = range(len(instance.disks))
3735
    else:
3736
      for idx in self.op.disks:
3737
        if idx >= len(instance.disks):
3738
          raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3739

    
3740
    self.instance = instance
3741

    
3742
  def Exec(self, feedback_fn):
3743
    """Recreate the disks.
3744

3745
    """
3746
    to_skip = []
3747
    for idx, disk in enumerate(self.instance.disks):
3748
      if idx not in self.op.disks: # disk idx has not been passed in
3749
        to_skip.append(idx)
3750
        continue
3751

    
3752
    _CreateDisks(self, self.instance, to_skip=to_skip)
3753

    
3754

    
3755
class LURenameInstance(LogicalUnit):
3756
  """Rename an instance.
3757

3758
  """
3759
  HPATH = "instance-rename"
3760
  HTYPE = constants.HTYPE_INSTANCE
3761
  _OP_REQP = ["instance_name", "new_name"]
3762

    
3763
  def BuildHooksEnv(self):
3764
    """Build hooks env.
3765

3766
    This runs on master, primary and secondary nodes of the instance.
3767

3768
    """
3769
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3770
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3771
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3772
    return env, nl, nl
3773

    
3774
  def CheckPrereq(self):
3775
    """Check prerequisites.
3776

3777
    This checks that the instance is in the cluster and is not running.
3778

3779
    """
3780
    instance = self.cfg.GetInstanceInfo(
3781
      self.cfg.ExpandInstanceName(self.op.instance_name))
3782
    if instance is None:
3783
      raise errors.OpPrereqError("Instance '%s' not known" %
3784
                                 self.op.instance_name)
3785
    _CheckNodeOnline(self, instance.primary_node)
3786

    
3787
    if instance.admin_up:
3788
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3789
                                 self.op.instance_name)
3790
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3791
                                              instance.name,
3792
                                              instance.hypervisor)
3793
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3794
                      prereq=True)
3795
    if remote_info.payload:
3796
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3797
                                 (self.op.instance_name,
3798
                                  instance.primary_node))
3799
    self.instance = instance
3800

    
3801
    # new name verification
3802
    name_info = utils.HostInfo(self.op.new_name)
3803

    
3804
    self.op.new_name = new_name = name_info.name
3805
    instance_list = self.cfg.GetInstanceList()
3806
    if new_name in instance_list:
3807
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3808
                                 new_name)
3809

    
3810
    if not getattr(self.op, "ignore_ip", False):
3811
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3812
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3813
                                   (name_info.ip, new_name))
3814

    
3815

    
3816
  def Exec(self, feedback_fn):
3817
    """Reinstall the instance.
3818

3819
    """
3820
    inst = self.instance
3821
    old_name = inst.name
3822

    
3823
    if inst.disk_template == constants.DT_FILE:
3824
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3825

    
3826
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3827
    # Change the instance lock. This is definitely safe while we hold the BGL
3828
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3829
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3830

    
3831
    # re-read the instance from the configuration after rename
3832
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3833

    
3834
    if inst.disk_template == constants.DT_FILE:
3835
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3836
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3837
                                                     old_file_storage_dir,
3838
                                                     new_file_storage_dir)
3839
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
3840
                   " (but the instance has been renamed in Ganeti)" %
3841
                   (inst.primary_node, old_file_storage_dir,
3842
                    new_file_storage_dir))
3843

    
3844
    _StartInstanceDisks(self, inst, None)
3845
    try:
3846
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3847
                                                 old_name)
3848
      msg = result.fail_msg
3849
      if msg:
3850
        msg = ("Could not run OS rename script for instance %s on node %s"
3851
               " (but the instance has been renamed in Ganeti): %s" %
3852
               (inst.name, inst.primary_node, msg))
3853
        self.proc.LogWarning(msg)
3854
    finally:
3855
      _ShutdownInstanceDisks(self, inst)
3856

    
3857

    
3858
class LURemoveInstance(LogicalUnit):
3859
  """Remove an instance.
3860

3861
  """
3862
  HPATH = "instance-remove"
3863
  HTYPE = constants.HTYPE_INSTANCE
3864
  _OP_REQP = ["instance_name", "ignore_failures"]
3865
  REQ_BGL = False
3866

    
3867
  def ExpandNames(self):
3868
    self._ExpandAndLockInstance()
3869
    self.needed_locks[locking.LEVEL_NODE] = []
3870
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3871

    
3872
  def DeclareLocks(self, level):
3873
    if level == locking.LEVEL_NODE:
3874
      self._LockInstancesNodes()
3875

    
3876
  def BuildHooksEnv(self):
3877
    """Build hooks env.
3878

3879
    This runs on master, primary and secondary nodes of the instance.
3880

3881
    """
3882
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3883
    nl = [self.cfg.GetMasterNode()]
3884
    return env, nl, nl
3885

    
3886
  def CheckPrereq(self):
3887
    """Check prerequisites.
3888

3889
    This checks that the instance is in the cluster.
3890

3891
    """
3892
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3893
    assert self.instance is not None, \
3894
      "Cannot retrieve locked instance %s" % self.op.instance_name
3895

    
3896
  def Exec(self, feedback_fn):
3897
    """Remove the instance.
3898

3899
    """
3900
    instance = self.instance
3901
    logging.info("Shutting down instance %s on node %s",
3902
                 instance.name, instance.primary_node)
3903

    
3904
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3905
    msg = result.fail_msg
3906
    if msg:
3907
      if self.op.ignore_failures:
3908
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3909
      else:
3910
        raise errors.OpExecError("Could not shutdown instance %s on"
3911
                                 " node %s: %s" %
3912
                                 (instance.name, instance.primary_node, msg))
3913

    
3914
    logging.info("Removing block devices for instance %s", instance.name)
3915

    
3916
    if not _RemoveDisks(self, instance):
3917
      if self.op.ignore_failures:
3918
        feedback_fn("Warning: can't remove instance's disks")
3919
      else:
3920
        raise errors.OpExecError("Can't remove instance's disks")
3921

    
3922
    logging.info("Removing instance %s out of cluster config", instance.name)
3923

    
3924
    self.cfg.RemoveInstance(instance.name)
3925
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3926

    
3927

    
3928
class LUQueryInstances(NoHooksLU):
3929
  """Logical unit for querying instances.
3930

3931
  """
3932
  _OP_REQP = ["output_fields", "names", "use_locking"]
3933
  REQ_BGL = False
3934
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3935
                                    "admin_state",
3936
                                    "disk_template", "ip", "mac", "bridge",
3937
                                    "nic_mode", "nic_link",
3938
                                    "sda_size", "sdb_size", "vcpus", "tags",
3939
                                    "network_port", "beparams",
3940
                                    r"(disk)\.(size)/([0-9]+)",
3941
                                    r"(disk)\.(sizes)", "disk_usage",
3942
                                    r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3943
                                    r"(nic)\.(bridge)/([0-9]+)",
3944
                                    r"(nic)\.(macs|ips|modes|links|bridges)",
3945
                                    r"(disk|nic)\.(count)",
3946
                                    "serial_no", "hypervisor", "hvparams",
3947
                                    "ctime", "mtime",
3948
                                    ] +
3949
                                  ["hv/%s" % name
3950
                                   for name in constants.HVS_PARAMETERS] +
3951
                                  ["be/%s" % name
3952
                                   for name in constants.BES_PARAMETERS])
3953
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3954

    
3955

    
3956
  def ExpandNames(self):
3957
    _CheckOutputFields(static=self._FIELDS_STATIC,
3958
                       dynamic=self._FIELDS_DYNAMIC,
3959
                       selected=self.op.output_fields)
3960

    
3961
    self.needed_locks = {}
3962
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3963
    self.share_locks[locking.LEVEL_NODE] = 1
3964

    
3965
    if self.op.names:
3966
      self.wanted = _GetWantedInstances(self, self.op.names)
3967
    else:
3968
      self.wanted = locking.ALL_SET
3969

    
3970
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3971
    self.do_locking = self.do_node_query and self.op.use_locking
3972
    if self.do_locking:
3973
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3974
      self.needed_locks[locking.LEVEL_NODE] = []
3975
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3976

    
3977
  def DeclareLocks(self, level):
3978
    if level == locking.LEVEL_NODE and self.do_locking:
3979
      self._LockInstancesNodes()
3980

    
3981
  def CheckPrereq(self):
3982
    """Check prerequisites.
3983

3984
    """
3985
    pass
3986

    
3987
  def Exec(self, feedback_fn):
3988
    """Computes the list of nodes and their attributes.
3989

3990
    """
3991
    all_info = self.cfg.GetAllInstancesInfo()
3992
    if self.wanted == locking.ALL_SET:
3993
      # caller didn't specify instance names, so ordering is not important
3994
      if self.do_locking:
3995
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3996
      else:
3997
        instance_names = all_info.keys()
3998
      instance_names = utils.NiceSort(instance_names)
3999
    else:
4000
      # caller did specify names, so we must keep the ordering
4001
      if self.do_locking:
4002
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4003
      else:
4004
        tgt_set = all_info.keys()
4005
      missing = set(self.wanted).difference(tgt_set)
4006
      if missing:
4007
        raise errors.OpExecError("Some instances were removed before"
4008
                                 " retrieving their data: %s" % missing)
4009
      instance_names = self.wanted
4010

    
4011
    instance_list = [all_info[iname] for iname in instance_names]
4012

    
4013
    # begin data gathering
4014

    
4015
    nodes = frozenset([inst.primary_node for inst in instance_list])
4016
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
4017

    
4018
    bad_nodes = []
4019
    off_nodes = []
4020
    if self.do_node_query:
4021
      live_data = {}
4022
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4023
      for name in nodes:
4024
        result = node_data[name]
4025
        if result.offline: