Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ cd46f3b4

History | View | Annotate | Download (286.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq (except when tasklets are used)
51
    - implement Exec (except when tasklets are used)
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  @ivar dry_run_result: the value (if any) that will be returned to the caller
60
      in dry-run mode (signalled by opcode dry_run parameter)
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overridden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92
    self.LogStep = processor.LogStep
93
    # support for dry-run
94
    self.dry_run_result = None
95

    
96
    # Tasklets
97
    self.tasklets = None
98

    
99
    for attr_name in self._OP_REQP:
100
      attr_val = getattr(op, attr_name, None)
101
      if attr_val is None:
102
        raise errors.OpPrereqError("Required parameter '%s' missing" %
103
                                   attr_name)
104

    
105
    self.CheckArguments()
106

    
107
  def __GetSSH(self):
108
    """Returns the SshRunner object
109

110
    """
111
    if not self.__ssh:
112
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113
    return self.__ssh
114

    
115
  ssh = property(fget=__GetSSH)
116

    
117
  def CheckArguments(self):
118
    """Check syntactic validity for the opcode arguments.
119

120
    This method is for doing a simple syntactic check and ensure
121
    validity of opcode parameters, without any cluster-related
122
    checks. While the same can be accomplished in ExpandNames and/or
123
    CheckPrereq, doing these separate is better because:
124

125
      - ExpandNames is left as as purely a lock-related function
126
      - CheckPrereq is run after we have acquired locks (and possible
127
        waited for them)
128

129
    The function is allowed to change the self.op attribute so that
130
    later methods can no longer worry about missing parameters.
131

132
    """
133
    pass
134

    
135
  def ExpandNames(self):
136
    """Expand names for this LU.
137

138
    This method is called before starting to execute the opcode, and it should
139
    update all the parameters of the opcode to their canonical form (e.g. a
140
    short node name must be fully expanded after this method has successfully
141
    completed). This way locking, hooks, logging, ecc. can work correctly.
142

143
    LUs which implement this method must also populate the self.needed_locks
144
    member, as a dict with lock levels as keys, and a list of needed lock names
145
    as values. Rules:
146

147
      - use an empty dict if you don't need any lock
148
      - if you don't need any lock at a particular level omit that level
149
      - don't put anything for the BGL level
150
      - if you want all locks at a level use locking.ALL_SET as a value
151

152
    If you need to share locks (rather than acquire them exclusively) at one
153
    level you can modify self.share_locks, setting a true value (usually 1) for
154
    that level. By default locks are not shared.
155

156
    This function can also define a list of tasklets, which then will be
157
    executed in order instead of the usual LU-level CheckPrereq and Exec
158
    functions, if those are not defined by the LU.
159

160
    Examples::
161

162
      # Acquire all nodes and one instance
163
      self.needed_locks = {
164
        locking.LEVEL_NODE: locking.ALL_SET,
165
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
166
      }
167
      # Acquire just two nodes
168
      self.needed_locks = {
169
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
170
      }
171
      # Acquire no locks
172
      self.needed_locks = {} # No, you can't leave it to the default value None
173

174
    """
175
    # The implementation of this method is mandatory only if the new LU is
176
    # concurrent, so that old LUs don't need to be changed all at the same
177
    # time.
178
    if self.REQ_BGL:
179
      self.needed_locks = {} # Exclusive LUs don't need locks.
180
    else:
181
      raise NotImplementedError
182

    
183
  def DeclareLocks(self, level):
184
    """Declare LU locking needs for a level
185

186
    While most LUs can just declare their locking needs at ExpandNames time,
187
    sometimes there's the need to calculate some locks after having acquired
188
    the ones before. This function is called just before acquiring locks at a
189
    particular level, but after acquiring the ones at lower levels, and permits
190
    such calculations. It can be used to modify self.needed_locks, and by
191
    default it does nothing.
192

193
    This function is only called if you have something already set in
194
    self.needed_locks for the level.
195

196
    @param level: Locking level which is going to be locked
197
    @type level: member of ganeti.locking.LEVELS
198

199
    """
200

    
201
  def CheckPrereq(self):
202
    """Check prerequisites for this LU.
203

204
    This method should check that the prerequisites for the execution
205
    of this LU are fulfilled. It can do internode communication, but
206
    it should be idempotent - no cluster or system changes are
207
    allowed.
208

209
    The method should raise errors.OpPrereqError in case something is
210
    not fulfilled. Its return value is ignored.
211

212
    This method should also update all the parameters of the opcode to
213
    their canonical form if it hasn't been done by ExpandNames before.
214

215
    """
216
    if self.tasklets is not None:
217
      for (idx, tl) in enumerate(self.tasklets):
218
        logging.debug("Checking prerequisites for tasklet %s/%s",
219
                      idx + 1, len(self.tasklets))
220
        tl.CheckPrereq()
221
    else:
222
      raise NotImplementedError
223

    
224
  def Exec(self, feedback_fn):
225
    """Execute the LU.
226

227
    This method should implement the actual work. It should raise
228
    errors.OpExecError for failures that are somewhat dealt with in
229
    code, or expected.
230

231
    """
232
    if self.tasklets is not None:
233
      for (idx, tl) in enumerate(self.tasklets):
234
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
235
        tl.Exec(feedback_fn)
236
    else:
237
      raise NotImplementedError
238

    
239
  def BuildHooksEnv(self):
240
    """Build hooks environment for this LU.
241

242
    This method should return a three-node tuple consisting of: a dict
243
    containing the environment that will be used for running the
244
    specific hook for this LU, a list of node names on which the hook
245
    should run before the execution, and a list of node names on which
246
    the hook should run after the execution.
247

248
    The keys of the dict must not have 'GANETI_' prefixed as this will
249
    be handled in the hooks runner. Also note additional keys will be
250
    added by the hooks runner. If the LU doesn't define any
251
    environment, an empty dict (and not None) should be returned.
252

253
    No nodes should be returned as an empty list (and not None).
254

255
    Note that if the HPATH for a LU class is None, this function will
256
    not be called.
257

258
    """
259
    raise NotImplementedError
260

    
261
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262
    """Notify the LU about the results of its hooks.
263

264
    This method is called every time a hooks phase is executed, and notifies
265
    the Logical Unit about the hooks' result. The LU can then use it to alter
266
    its result based on the hooks.  By default the method does nothing and the
267
    previous result is passed back unchanged but any LU can define it if it
268
    wants to use the local cluster hook-scripts somehow.
269

270
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
271
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272
    @param hook_results: the results of the multi-node hooks rpc call
273
    @param feedback_fn: function used send feedback back to the caller
274
    @param lu_result: the previous Exec result this LU had, or None
275
        in the PRE phase
276
    @return: the new Exec result, based on the previous result
277
        and hook results
278

279
    """
280
    return lu_result
281

    
282
  def _ExpandAndLockInstance(self):
283
    """Helper function to expand and lock an instance.
284

285
    Many LUs that work on an instance take its name in self.op.instance_name
286
    and need to expand it and then declare the expanded name for locking. This
287
    function does it, and then updates self.op.instance_name to the expanded
288
    name. It also initializes needed_locks as a dict, if this hasn't been done
289
    before.
290

291
    """
292
    if self.needed_locks is None:
293
      self.needed_locks = {}
294
    else:
295
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296
        "_ExpandAndLockInstance called with instance-level locks set"
297
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298
    if expanded_name is None:
299
      raise errors.OpPrereqError("Instance '%s' not known" %
300
                                  self.op.instance_name)
301
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302
    self.op.instance_name = expanded_name
303

    
304
  def _LockInstancesNodes(self, primary_only=False):
305
    """Helper function to declare instances' nodes for locking.
306

307
    This function should be called after locking one or more instances to lock
308
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309
    with all primary or secondary nodes for instances already locked and
310
    present in self.needed_locks[locking.LEVEL_INSTANCE].
311

312
    It should be called from DeclareLocks, and for safety only works if
313
    self.recalculate_locks[locking.LEVEL_NODE] is set.
314

315
    In the future it may grow parameters to just lock some instance's nodes, or
316
    to just lock primaries or secondary nodes, if needed.
317

318
    If should be called in DeclareLocks in a way similar to::
319

320
      if level == locking.LEVEL_NODE:
321
        self._LockInstancesNodes()
322

323
    @type primary_only: boolean
324
    @param primary_only: only lock primary nodes of locked instances
325

326
    """
327
    assert locking.LEVEL_NODE in self.recalculate_locks, \
328
      "_LockInstancesNodes helper function called with no nodes to recalculate"
329

    
330
    # TODO: check if we're really been called with the instance locks held
331

    
332
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333
    # future we might want to have different behaviors depending on the value
334
    # of self.recalculate_locks[locking.LEVEL_NODE]
335
    wanted_nodes = []
336
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337
      instance = self.context.cfg.GetInstanceInfo(instance_name)
338
      wanted_nodes.append(instance.primary_node)
339
      if not primary_only:
340
        wanted_nodes.extend(instance.secondary_nodes)
341

    
342
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
346

    
347
    del self.recalculate_locks[locking.LEVEL_NODE]
348

    
349

    
350
class NoHooksLU(LogicalUnit):
351
  """Simple LU which runs no hooks.
352

353
  This LU is intended as a parent for other LogicalUnits which will
354
  run no hooks, in order to reduce duplicate code.
355

356
  """
357
  HPATH = None
358
  HTYPE = None
359

    
360

    
361
class Tasklet:
362
  """Tasklet base class.
363

364
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
366
  tasklets know nothing about locks.
367

368
  Subclasses must follow these rules:
369
    - Implement CheckPrereq
370
    - Implement Exec
371

372
  """
373
  def __init__(self, lu):
374
    self.lu = lu
375

    
376
    # Shortcuts
377
    self.cfg = lu.cfg
378
    self.rpc = lu.rpc
379

    
380
  def CheckPrereq(self):
381
    """Check prerequisites for this tasklets.
382

383
    This method should check whether the prerequisites for the execution of
384
    this tasklet are fulfilled. It can do internode communication, but it
385
    should be idempotent - no cluster or system changes are allowed.
386

387
    The method should raise errors.OpPrereqError in case something is not
388
    fulfilled. Its return value is ignored.
389

390
    This method should also update all parameters to their canonical form if it
391
    hasn't been done before.
392

393
    """
394
    raise NotImplementedError
395

    
396
  def Exec(self, feedback_fn):
397
    """Execute the tasklet.
398

399
    This method should implement the actual work. It should raise
400
    errors.OpExecError for failures that are somewhat dealt with in code, or
401
    expected.
402

403
    """
404
    raise NotImplementedError
405

    
406

    
407
def _GetWantedNodes(lu, nodes):
408
  """Returns list of checked and expanded node names.
409

410
  @type lu: L{LogicalUnit}
411
  @param lu: the logical unit on whose behalf we execute
412
  @type nodes: list
413
  @param nodes: list of node names or None for all nodes
414
  @rtype: list
415
  @return: the list of nodes, sorted
416
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
417

418
  """
419
  if not isinstance(nodes, list):
420
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
421

    
422
  if not nodes:
423
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424
      " non-empty list of nodes whose name is to be expanded.")
425

    
426
  wanted = []
427
  for name in nodes:
428
    node = lu.cfg.ExpandNodeName(name)
429
    if node is None:
430
      raise errors.OpPrereqError("No such node name '%s'" % name)
431
    wanted.append(node)
432

    
433
  return utils.NiceSort(wanted)
434

    
435

    
436
def _GetWantedInstances(lu, instances):
437
  """Returns list of checked and expanded instance names.
438

439
  @type lu: L{LogicalUnit}
440
  @param lu: the logical unit on whose behalf we execute
441
  @type instances: list
442
  @param instances: list of instance names or None for all instances
443
  @rtype: list
444
  @return: the list of instances, sorted
445
  @raise errors.OpPrereqError: if the instances parameter is wrong type
446
  @raise errors.OpPrereqError: if any of the passed instances is not found
447

448
  """
449
  if not isinstance(instances, list):
450
    raise errors.OpPrereqError("Invalid argument type 'instances'")
451

    
452
  if instances:
453
    wanted = []
454

    
455
    for name in instances:
456
      instance = lu.cfg.ExpandInstanceName(name)
457
      if instance is None:
458
        raise errors.OpPrereqError("No such instance name '%s'" % name)
459
      wanted.append(instance)
460

    
461
  else:
462
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
463
  return wanted
464

    
465

    
466
def _CheckOutputFields(static, dynamic, selected):
467
  """Checks whether all selected fields are valid.
468

469
  @type static: L{utils.FieldSet}
470
  @param static: static fields set
471
  @type dynamic: L{utils.FieldSet}
472
  @param dynamic: dynamic fields set
473

474
  """
475
  f = utils.FieldSet()
476
  f.Extend(static)
477
  f.Extend(dynamic)
478

    
479
  delta = f.NonMatching(selected)
480
  if delta:
481
    raise errors.OpPrereqError("Unknown output fields selected: %s"
482
                               % ",".join(delta))
483

    
484

    
485
def _CheckBooleanOpField(op, name):
486
  """Validates boolean opcode parameters.
487

488
  This will ensure that an opcode parameter is either a boolean value,
489
  or None (but that it always exists).
490

491
  """
492
  val = getattr(op, name, None)
493
  if not (val is None or isinstance(val, bool)):
494
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
495
                               (name, str(val)))
496
  setattr(op, name, val)
497

    
498

    
499
def _CheckNodeOnline(lu, node):
500
  """Ensure that a given node is online.
501

502
  @param lu: the LU on behalf of which we make the check
503
  @param node: the node to check
504
  @raise errors.OpPrereqError: if the node is offline
505

506
  """
507
  if lu.cfg.GetNodeInfo(node).offline:
508
    raise errors.OpPrereqError("Can't use offline node %s" % node)
509

    
510

    
511
def _CheckNodeNotDrained(lu, node):
512
  """Ensure that a given node is not drained.
513

514
  @param lu: the LU on behalf of which we make the check
515
  @param node: the node to check
516
  @raise errors.OpPrereqError: if the node is drained
517

518
  """
519
  if lu.cfg.GetNodeInfo(node).drained:
520
    raise errors.OpPrereqError("Can't use drained node %s" % node)
521

    
522

    
523
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524
                          memory, vcpus, nics, disk_template, disks,
525
                          bep, hvp, hypervisor_name):
526
  """Builds instance related env variables for hooks
527

528
  This builds the hook environment from individual variables.
529

530
  @type name: string
531
  @param name: the name of the instance
532
  @type primary_node: string
533
  @param primary_node: the name of the instance's primary node
534
  @type secondary_nodes: list
535
  @param secondary_nodes: list of secondary nodes as strings
536
  @type os_type: string
537
  @param os_type: the name of the instance's OS
538
  @type status: boolean
539
  @param status: the should_run status of the instance
540
  @type memory: string
541
  @param memory: the memory size of the instance
542
  @type vcpus: string
543
  @param vcpus: the count of VCPUs the instance has
544
  @type nics: list
545
  @param nics: list of tuples (ip, mac, mode, link) representing
546
      the NICs the instance has
547
  @type disk_template: string
548
  @param disk_template: the disk template of the instance
549
  @type disks: list
550
  @param disks: the list of (size, mode) pairs
551
  @type bep: dict
552
  @param bep: the backend parameters for the instance
553
  @type hvp: dict
554
  @param hvp: the hypervisor parameters for the instance
555
  @type hypervisor_name: string
556
  @param hypervisor_name: the hypervisor for the instance
557
  @rtype: dict
558
  @return: the hook environment for this instance
559

560
  """
561
  if status:
562
    str_status = "up"
563
  else:
564
    str_status = "down"
565
  env = {
566
    "OP_TARGET": name,
567
    "INSTANCE_NAME": name,
568
    "INSTANCE_PRIMARY": primary_node,
569
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570
    "INSTANCE_OS_TYPE": os_type,
571
    "INSTANCE_STATUS": str_status,
572
    "INSTANCE_MEMORY": memory,
573
    "INSTANCE_VCPUS": vcpus,
574
    "INSTANCE_DISK_TEMPLATE": disk_template,
575
    "INSTANCE_HYPERVISOR": hypervisor_name,
576
  }
577

    
578
  if nics:
579
    nic_count = len(nics)
580
    for idx, (ip, mac, mode, link) in enumerate(nics):
581
      if ip is None:
582
        ip = ""
583
      env["INSTANCE_NIC%d_IP" % idx] = ip
584
      env["INSTANCE_NIC%d_MAC" % idx] = mac
585
      env["INSTANCE_NIC%d_MODE" % idx] = mode
586
      env["INSTANCE_NIC%d_LINK" % idx] = link
587
      if mode == constants.NIC_MODE_BRIDGED:
588
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
589
  else:
590
    nic_count = 0
591

    
592
  env["INSTANCE_NIC_COUNT"] = nic_count
593

    
594
  if disks:
595
    disk_count = len(disks)
596
    for idx, (size, mode) in enumerate(disks):
597
      env["INSTANCE_DISK%d_SIZE" % idx] = size
598
      env["INSTANCE_DISK%d_MODE" % idx] = mode
599
  else:
600
    disk_count = 0
601

    
602
  env["INSTANCE_DISK_COUNT"] = disk_count
603

    
604
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
605
    for key, value in source.items():
606
      env["INSTANCE_%s_%s" % (kind, key)] = value
607

    
608
  return env
609

    
610

    
611
def _NICListToTuple(lu, nics):
612
  """Build a list of nic information tuples.
613

614
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615
  value in LUQueryInstanceData.
616

617
  @type lu:  L{LogicalUnit}
618
  @param lu: the logical unit on whose behalf we execute
619
  @type nics: list of L{objects.NIC}
620
  @param nics: list of nics to convert to hooks tuples
621

622
  """
623
  hooks_nics = []
624
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
625
  for nic in nics:
626
    ip = nic.ip
627
    mac = nic.mac
628
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629
    mode = filled_params[constants.NIC_MODE]
630
    link = filled_params[constants.NIC_LINK]
631
    hooks_nics.append((ip, mac, mode, link))
632
  return hooks_nics
633

    
634

    
635
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636
  """Builds instance related env variables for hooks from an object.
637

638
  @type lu: L{LogicalUnit}
639
  @param lu: the logical unit on whose behalf we execute
640
  @type instance: L{objects.Instance}
641
  @param instance: the instance for which we should build the
642
      environment
643
  @type override: dict
644
  @param override: dictionary with key/values that will override
645
      our values
646
  @rtype: dict
647
  @return: the hook environment dictionary
648

649
  """
650
  cluster = lu.cfg.GetClusterInfo()
651
  bep = cluster.FillBE(instance)
652
  hvp = cluster.FillHV(instance)
653
  args = {
654
    'name': instance.name,
655
    'primary_node': instance.primary_node,
656
    'secondary_nodes': instance.secondary_nodes,
657
    'os_type': instance.os,
658
    'status': instance.admin_up,
659
    'memory': bep[constants.BE_MEMORY],
660
    'vcpus': bep[constants.BE_VCPUS],
661
    'nics': _NICListToTuple(lu, instance.nics),
662
    'disk_template': instance.disk_template,
663
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
664
    'bep': bep,
665
    'hvp': hvp,
666
    'hypervisor_name': instance.hypervisor,
667
  }
668
  if override:
669
    args.update(override)
670
  return _BuildInstanceHookEnv(**args)
671

    
672

    
673
def _AdjustCandidatePool(lu):
674
  """Adjust the candidate pool after node operations.
675

676
  """
677
  mod_list = lu.cfg.MaintainCandidatePool()
678
  if mod_list:
679
    lu.LogInfo("Promoted nodes to master candidate role: %s",
680
               ", ".join(node.name for node in mod_list))
681
    for name in mod_list:
682
      lu.context.ReaddNode(name)
683
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
684
  if mc_now > mc_max:
685
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
686
               (mc_now, mc_max))
687

    
688

    
689
def _CheckNicsBridgesExist(lu, target_nics, target_node,
690
                               profile=constants.PP_DEFAULT):
691
  """Check that the brigdes needed by a list of nics exist.
692

693
  """
694
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696
                for nic in target_nics]
697
  brlist = [params[constants.NIC_LINK] for params in paramslist
698
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
699
  if brlist:
700
    result = lu.rpc.call_bridges_exist(target_node, brlist)
701
    result.Raise("Error checking bridges on destination node '%s'" %
702
                 target_node, prereq=True)
703

    
704

    
705
def _CheckInstanceBridgesExist(lu, instance, node=None):
706
  """Check that the brigdes needed by an instance exist.
707

708
  """
709
  if node is None:
710
    node = instance.primary_node
711
  _CheckNicsBridgesExist(lu, instance.nics, node)
712

    
713

    
714
def _GetNodeInstancesInner(cfg, fn):
715
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
716

    
717

    
718
def _GetNodeInstances(cfg, node_name):
719
  """Returns a list of all primary and secondary instances on a node.
720

721
  """
722

    
723
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
724

    
725

    
726
def _GetNodePrimaryInstances(cfg, node_name):
727
  """Returns primary instances on a node.
728

729
  """
730
  return _GetNodeInstancesInner(cfg,
731
                                lambda inst: node_name == inst.primary_node)
732

    
733

    
734
def _GetNodeSecondaryInstances(cfg, node_name):
735
  """Returns secondary instances on a node.
736

737
  """
738
  return _GetNodeInstancesInner(cfg,
739
                                lambda inst: node_name in inst.secondary_nodes)
740

    
741

    
742
def _GetStorageTypeArgs(cfg, storage_type):
743
  """Returns the arguments for a storage type.
744

745
  """
746
  # Special case for file storage
747
  if storage_type == constants.ST_FILE:
748
    # storage.FileStorage wants a list of storage directories
749
    return [[cfg.GetFileStorageDir()]]
750

    
751
  return []
752

    
753

    
754
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
755
  faulty = []
756

    
757
  for dev in instance.disks:
758
    cfg.SetDiskID(dev, node_name)
759

    
760
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
761
  result.Raise("Failed to get disk status from node %s" % node_name,
762
               prereq=prereq)
763

    
764
  for idx, bdev_status in enumerate(result.payload):
765
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
766
      faulty.append(idx)
767

    
768
  return faulty
769

    
770

    
771
class LUPostInitCluster(LogicalUnit):
772
  """Logical unit for running hooks after cluster initialization.
773

774
  """
775
  HPATH = "cluster-init"
776
  HTYPE = constants.HTYPE_CLUSTER
777
  _OP_REQP = []
778

    
779
  def BuildHooksEnv(self):
780
    """Build hooks env.
781

782
    """
783
    env = {"OP_TARGET": self.cfg.GetClusterName()}
784
    mn = self.cfg.GetMasterNode()
785
    return env, [], [mn]
786

    
787
  def CheckPrereq(self):
788
    """No prerequisites to check.
789

790
    """
791
    return True
792

    
793
  def Exec(self, feedback_fn):
794
    """Nothing to do.
795

796
    """
797
    return True
798

    
799

    
800
class LUDestroyCluster(NoHooksLU):
801
  """Logical unit for destroying the cluster.
802

803
  """
804
  _OP_REQP = []
805

    
806
  def CheckPrereq(self):
807
    """Check prerequisites.
808

809
    This checks whether the cluster is empty.
810

811
    Any errors are signaled by raising errors.OpPrereqError.
812

813
    """
814
    master = self.cfg.GetMasterNode()
815

    
816
    nodelist = self.cfg.GetNodeList()
817
    if len(nodelist) != 1 or nodelist[0] != master:
818
      raise errors.OpPrereqError("There are still %d node(s) in"
819
                                 " this cluster." % (len(nodelist) - 1))
820
    instancelist = self.cfg.GetInstanceList()
821
    if instancelist:
822
      raise errors.OpPrereqError("There are still %d instance(s) in"
823
                                 " this cluster." % len(instancelist))
824

    
825
  def Exec(self, feedback_fn):
826
    """Destroys the cluster.
827

828
    """
829
    master = self.cfg.GetMasterNode()
830
    result = self.rpc.call_node_stop_master(master, False)
831
    result.Raise("Could not disable the master role")
832
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
833
    utils.CreateBackup(priv_key)
834
    utils.CreateBackup(pub_key)
835
    return master
836

    
837

    
838
class LUVerifyCluster(LogicalUnit):
839
  """Verifies the cluster status.
840

841
  """
842
  HPATH = "cluster-verify"
843
  HTYPE = constants.HTYPE_CLUSTER
844
  _OP_REQP = ["skip_checks"]
845
  REQ_BGL = False
846

    
847
  def ExpandNames(self):
848
    self.needed_locks = {
849
      locking.LEVEL_NODE: locking.ALL_SET,
850
      locking.LEVEL_INSTANCE: locking.ALL_SET,
851
    }
852
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
853

    
854
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
855
                  node_result, feedback_fn, master_files,
856
                  drbd_map, vg_name):
857
    """Run multiple tests against a node.
858

859
    Test list:
860

861
      - compares ganeti version
862
      - checks vg existence and size > 20G
863
      - checks config file checksum
864
      - checks ssh to other nodes
865

866
    @type nodeinfo: L{objects.Node}
867
    @param nodeinfo: the node to check
868
    @param file_list: required list of files
869
    @param local_cksum: dictionary of local files and their checksums
870
    @param node_result: the results from the node
871
    @param feedback_fn: function used to accumulate results
872
    @param master_files: list of files that only masters should have
873
    @param drbd_map: the useddrbd minors for this node, in
874
        form of minor: (instance, must_exist) which correspond to instances
875
        and their running status
876
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
877

878
    """
879
    node = nodeinfo.name
880

    
881
    # main result, node_result should be a non-empty dict
882
    if not node_result or not isinstance(node_result, dict):
883
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
884
      return True
885

    
886
    # compares ganeti version
887
    local_version = constants.PROTOCOL_VERSION
888
    remote_version = node_result.get('version', None)
889
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
890
            len(remote_version) == 2):
891
      feedback_fn("  - ERROR: connection to %s failed" % (node))
892
      return True
893

    
894
    if local_version != remote_version[0]:
895
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
896
                  " node %s %s" % (local_version, node, remote_version[0]))
897
      return True
898

    
899
    # node seems compatible, we can actually try to look into its results
900

    
901
    bad = False
902

    
903
    # full package version
904
    if constants.RELEASE_VERSION != remote_version[1]:
905
      feedback_fn("  - WARNING: software version mismatch: master %s,"
906
                  " node %s %s" %
907
                  (constants.RELEASE_VERSION, node, remote_version[1]))
908

    
909
    # checks vg existence and size > 20G
910
    if vg_name is not None:
911
      vglist = node_result.get(constants.NV_VGLIST, None)
912
      if not vglist:
913
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
914
                        (node,))
915
        bad = True
916
      else:
917
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
918
                                              constants.MIN_VG_SIZE)
919
        if vgstatus:
920
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
921
          bad = True
922

    
923
    # checks config file checksum
924

    
925
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
926
    if not isinstance(remote_cksum, dict):
927
      bad = True
928
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
929
    else:
930
      for file_name in file_list:
931
        node_is_mc = nodeinfo.master_candidate
932
        must_have_file = file_name not in master_files
933
        if file_name not in remote_cksum:
934
          if node_is_mc or must_have_file:
935
            bad = True
936
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
937
        elif remote_cksum[file_name] != local_cksum[file_name]:
938
          if node_is_mc or must_have_file:
939
            bad = True
940
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
941
          else:
942
            # not candidate and this is not a must-have file
943
            bad = True
944
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
945
                        " candidates (and the file is outdated)" % file_name)
946
        else:
947
          # all good, except non-master/non-must have combination
948
          if not node_is_mc and not must_have_file:
949
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
950
                        " candidates" % file_name)
951

    
952
    # checks ssh to any
953

    
954
    if constants.NV_NODELIST not in node_result:
955
      bad = True
956
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
957
    else:
958
      if node_result[constants.NV_NODELIST]:
959
        bad = True
960
        for node in node_result[constants.NV_NODELIST]:
961
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
962
                          (node, node_result[constants.NV_NODELIST][node]))
963

    
964
    if constants.NV_NODENETTEST not in node_result:
965
      bad = True
966
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
967
    else:
968
      if node_result[constants.NV_NODENETTEST]:
969
        bad = True
970
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
971
        for node in nlist:
972
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
973
                          (node, node_result[constants.NV_NODENETTEST][node]))
974

    
975
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
976
    if isinstance(hyp_result, dict):
977
      for hv_name, hv_result in hyp_result.iteritems():
978
        if hv_result is not None:
979
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
980
                      (hv_name, hv_result))
981

    
982
    # check used drbd list
983
    if vg_name is not None:
984
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
985
      if not isinstance(used_minors, (tuple, list)):
986
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
987
                    str(used_minors))
988
      else:
989
        for minor, (iname, must_exist) in drbd_map.items():
990
          if minor not in used_minors and must_exist:
991
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
992
                        " not active" % (minor, iname))
993
            bad = True
994
        for minor in used_minors:
995
          if minor not in drbd_map:
996
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
997
                        minor)
998
            bad = True
999

    
1000
    return bad
1001

    
1002
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1003
                      node_instance, feedback_fn, n_offline):
1004
    """Verify an instance.
1005

1006
    This function checks to see if the required block devices are
1007
    available on the instance's node.
1008

1009
    """
1010
    bad = False
1011

    
1012
    node_current = instanceconfig.primary_node
1013

    
1014
    node_vol_should = {}
1015
    instanceconfig.MapLVsByNode(node_vol_should)
1016

    
1017
    for node in node_vol_should:
1018
      if node in n_offline:
1019
        # ignore missing volumes on offline nodes
1020
        continue
1021
      for volume in node_vol_should[node]:
1022
        if node not in node_vol_is or volume not in node_vol_is[node]:
1023
          feedback_fn("  - ERROR: volume %s missing on node %s" %
1024
                          (volume, node))
1025
          bad = True
1026

    
1027
    if instanceconfig.admin_up:
1028
      if ((node_current not in node_instance or
1029
          not instance in node_instance[node_current]) and
1030
          node_current not in n_offline):
1031
        feedback_fn("  - ERROR: instance %s not running on node %s" %
1032
                        (instance, node_current))
1033
        bad = True
1034

    
1035
    for node in node_instance:
1036
      if (not node == node_current):
1037
        if instance in node_instance[node]:
1038
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
1039
                          (instance, node))
1040
          bad = True
1041

    
1042
    return bad
1043

    
1044
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
1045
    """Verify if there are any unknown volumes in the cluster.
1046

1047
    The .os, .swap and backup volumes are ignored. All other volumes are
1048
    reported as unknown.
1049

1050
    """
1051
    bad = False
1052

    
1053
    for node in node_vol_is:
1054
      for volume in node_vol_is[node]:
1055
        if node not in node_vol_should or volume not in node_vol_should[node]:
1056
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
1057
                      (volume, node))
1058
          bad = True
1059
    return bad
1060

    
1061
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1062
    """Verify the list of running instances.
1063

1064
    This checks what instances are running but unknown to the cluster.
1065

1066
    """
1067
    bad = False
1068
    for node in node_instance:
1069
      for runninginstance in node_instance[node]:
1070
        if runninginstance not in instancelist:
1071
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
1072
                          (runninginstance, node))
1073
          bad = True
1074
    return bad
1075

    
1076
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1077
    """Verify N+1 Memory Resilience.
1078

1079
    Check that if one single node dies we can still start all the instances it
1080
    was primary for.
1081

1082
    """
1083
    bad = False
1084

    
1085
    for node, nodeinfo in node_info.iteritems():
1086
      # This code checks that every node which is now listed as secondary has
1087
      # enough memory to host all instances it is supposed to should a single
1088
      # other node in the cluster fail.
1089
      # FIXME: not ready for failover to an arbitrary node
1090
      # FIXME: does not support file-backed instances
1091
      # WARNING: we currently take into account down instances as well as up
1092
      # ones, considering that even if they're down someone might want to start
1093
      # them even in the event of a node failure.
1094
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1095
        needed_mem = 0
1096
        for instance in instances:
1097
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1098
          if bep[constants.BE_AUTO_BALANCE]:
1099
            needed_mem += bep[constants.BE_MEMORY]
1100
        if nodeinfo['mfree'] < needed_mem:
1101
          feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
1102
                      " failovers should node %s fail" % (node, prinode))
1103
          bad = True
1104
    return bad
1105

    
1106
  def CheckPrereq(self):
1107
    """Check prerequisites.
1108

1109
    Transform the list of checks we're going to skip into a set and check that
1110
    all its members are valid.
1111

1112
    """
1113
    self.skip_set = frozenset(self.op.skip_checks)
1114
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1115
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
1116

    
1117
  def BuildHooksEnv(self):
1118
    """Build hooks env.
1119

1120
    Cluster-Verify hooks just ran in the post phase and their failure makes
1121
    the output be logged in the verify output and the verification to fail.
1122

1123
    """
1124
    all_nodes = self.cfg.GetNodeList()
1125
    env = {
1126
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1127
      }
1128
    for node in self.cfg.GetAllNodesInfo().values():
1129
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1130

    
1131
    return env, [], all_nodes
1132

    
1133
  def Exec(self, feedback_fn):
1134
    """Verify integrity of cluster, performing various test on nodes.
1135

1136
    """
1137
    bad = False
1138
    feedback_fn("* Verifying global settings")
1139
    for msg in self.cfg.VerifyConfig():
1140
      feedback_fn("  - ERROR: %s" % msg)
1141

    
1142
    vg_name = self.cfg.GetVGName()
1143
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1144
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1145
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1146
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1147
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1148
                        for iname in instancelist)
1149
    i_non_redundant = [] # Non redundant instances
1150
    i_non_a_balanced = [] # Non auto-balanced instances
1151
    n_offline = [] # List of offline nodes
1152
    n_drained = [] # List of nodes being drained
1153
    node_volume = {}
1154
    node_instance = {}
1155
    node_info = {}
1156
    instance_cfg = {}
1157

    
1158
    # FIXME: verify OS list
1159
    # do local checksums
1160
    master_files = [constants.CLUSTER_CONF_FILE]
1161

    
1162
    file_names = ssconf.SimpleStore().GetFileList()
1163
    file_names.append(constants.SSL_CERT_FILE)
1164
    file_names.append(constants.RAPI_CERT_FILE)
1165
    file_names.extend(master_files)
1166

    
1167
    local_checksums = utils.FingerprintFiles(file_names)
1168

    
1169
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1170
    node_verify_param = {
1171
      constants.NV_FILELIST: file_names,
1172
      constants.NV_NODELIST: [node.name for node in nodeinfo
1173
                              if not node.offline],
1174
      constants.NV_HYPERVISOR: hypervisors,
1175
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1176
                                  node.secondary_ip) for node in nodeinfo
1177
                                 if not node.offline],
1178
      constants.NV_INSTANCELIST: hypervisors,
1179
      constants.NV_VERSION: None,
1180
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1181
      }
1182
    if vg_name is not None:
1183
      node_verify_param[constants.NV_VGLIST] = None
1184
      node_verify_param[constants.NV_LVLIST] = vg_name
1185
      node_verify_param[constants.NV_DRBDLIST] = None
1186
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1187
                                           self.cfg.GetClusterName())
1188

    
1189
    cluster = self.cfg.GetClusterInfo()
1190
    master_node = self.cfg.GetMasterNode()
1191
    all_drbd_map = self.cfg.ComputeDRBDMap()
1192

    
1193
    for node_i in nodeinfo:
1194
      node = node_i.name
1195

    
1196
      if node_i.offline:
1197
        feedback_fn("* Skipping offline node %s" % (node,))
1198
        n_offline.append(node)
1199
        continue
1200

    
1201
      if node == master_node:
1202
        ntype = "master"
1203
      elif node_i.master_candidate:
1204
        ntype = "master candidate"
1205
      elif node_i.drained:
1206
        ntype = "drained"
1207
        n_drained.append(node)
1208
      else:
1209
        ntype = "regular"
1210
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1211

    
1212
      msg = all_nvinfo[node].fail_msg
1213
      if msg:
1214
        feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1215
        bad = True
1216
        continue
1217

    
1218
      nresult = all_nvinfo[node].payload
1219
      node_drbd = {}
1220
      for minor, instance in all_drbd_map[node].items():
1221
        if instance not in instanceinfo:
1222
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1223
                      instance)
1224
          # ghost instance should not be running, but otherwise we
1225
          # don't give double warnings (both ghost instance and
1226
          # unallocated minor in use)
1227
          node_drbd[minor] = (instance, False)
1228
        else:
1229
          instance = instanceinfo[instance]
1230
          node_drbd[minor] = (instance.name, instance.admin_up)
1231
      result = self._VerifyNode(node_i, file_names, local_checksums,
1232
                                nresult, feedback_fn, master_files,
1233
                                node_drbd, vg_name)
1234
      bad = bad or result
1235

    
1236
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1237
      if vg_name is None:
1238
        node_volume[node] = {}
1239
      elif isinstance(lvdata, basestring):
1240
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1241
                    (node, utils.SafeEncode(lvdata)))
1242
        bad = True
1243
        node_volume[node] = {}
1244
      elif not isinstance(lvdata, dict):
1245
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1246
        bad = True
1247
        continue
1248
      else:
1249
        node_volume[node] = lvdata
1250

    
1251
      # node_instance
1252
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1253
      if not isinstance(idata, list):
1254
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1255
                    (node,))
1256
        bad = True
1257
        continue
1258

    
1259
      node_instance[node] = idata
1260

    
1261
      # node_info
1262
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1263
      if not isinstance(nodeinfo, dict):
1264
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1265
        bad = True
1266
        continue
1267

    
1268
      try:
1269
        node_info[node] = {
1270
          "mfree": int(nodeinfo['memory_free']),
1271
          "pinst": [],
1272
          "sinst": [],
1273
          # dictionary holding all instances this node is secondary for,
1274
          # grouped by their primary node. Each key is a cluster node, and each
1275
          # value is a list of instances which have the key as primary and the
1276
          # current node as secondary.  this is handy to calculate N+1 memory
1277
          # availability if you can only failover from a primary to its
1278
          # secondary.
1279
          "sinst-by-pnode": {},
1280
        }
1281
        # FIXME: devise a free space model for file based instances as well
1282
        if vg_name is not None:
1283
          if (constants.NV_VGLIST not in nresult or
1284
              vg_name not in nresult[constants.NV_VGLIST]):
1285
            feedback_fn("  - ERROR: node %s didn't return data for the"
1286
                        " volume group '%s' - it is either missing or broken" %
1287
                        (node, vg_name))
1288
            bad = True
1289
            continue
1290
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1291
      except (ValueError, KeyError):
1292
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1293
                    " from node %s" % (node,))
1294
        bad = True
1295
        continue
1296

    
1297
    node_vol_should = {}
1298

    
1299
    for instance in instancelist:
1300
      feedback_fn("* Verifying instance %s" % instance)
1301
      inst_config = instanceinfo[instance]
1302
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1303
                                     node_instance, feedback_fn, n_offline)
1304
      bad = bad or result
1305
      inst_nodes_offline = []
1306

    
1307
      inst_config.MapLVsByNode(node_vol_should)
1308

    
1309
      instance_cfg[instance] = inst_config
1310

    
1311
      pnode = inst_config.primary_node
1312
      if pnode in node_info:
1313
        node_info[pnode]['pinst'].append(instance)
1314
      elif pnode not in n_offline:
1315
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1316
                    " %s failed" % (instance, pnode))
1317
        bad = True
1318

    
1319
      if pnode in n_offline:
1320
        inst_nodes_offline.append(pnode)
1321

    
1322
      # If the instance is non-redundant we cannot survive losing its primary
1323
      # node, so we are not N+1 compliant. On the other hand we have no disk
1324
      # templates with more than one secondary so that situation is not well
1325
      # supported either.
1326
      # FIXME: does not support file-backed instances
1327
      if len(inst_config.secondary_nodes) == 0:
1328
        i_non_redundant.append(instance)
1329
      elif len(inst_config.secondary_nodes) > 1:
1330
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1331
                    % instance)
1332

    
1333
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1334
        i_non_a_balanced.append(instance)
1335

    
1336
      for snode in inst_config.secondary_nodes:
1337
        if snode in node_info:
1338
          node_info[snode]['sinst'].append(instance)
1339
          if pnode not in node_info[snode]['sinst-by-pnode']:
1340
            node_info[snode]['sinst-by-pnode'][pnode] = []
1341
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1342
        elif snode not in n_offline:
1343
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1344
                      " %s failed" % (instance, snode))
1345
          bad = True
1346
        if snode in n_offline:
1347
          inst_nodes_offline.append(snode)
1348

    
1349
      if inst_nodes_offline:
1350
        # warn that the instance lives on offline nodes, and set bad=True
1351
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1352
                    ", ".join(inst_nodes_offline))
1353
        bad = True
1354

    
1355
    feedback_fn("* Verifying orphan volumes")
1356
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1357
                                       feedback_fn)
1358
    bad = bad or result
1359

    
1360
    feedback_fn("* Verifying remaining instances")
1361
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1362
                                         feedback_fn)
1363
    bad = bad or result
1364

    
1365
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1366
      feedback_fn("* Verifying N+1 Memory redundancy")
1367
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1368
      bad = bad or result
1369

    
1370
    feedback_fn("* Other Notes")
1371
    if i_non_redundant:
1372
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1373
                  % len(i_non_redundant))
1374

    
1375
    if i_non_a_balanced:
1376
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1377
                  % len(i_non_a_balanced))
1378

    
1379
    if n_offline:
1380
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1381

    
1382
    if n_drained:
1383
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1384

    
1385
    return not bad
1386

    
1387
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1388
    """Analyze the post-hooks' result
1389

1390
    This method analyses the hook result, handles it, and sends some
1391
    nicely-formatted feedback back to the user.
1392

1393
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1394
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1395
    @param hooks_results: the results of the multi-node hooks rpc call
1396
    @param feedback_fn: function used send feedback back to the caller
1397
    @param lu_result: previous Exec result
1398
    @return: the new Exec result, based on the previous result
1399
        and hook results
1400

1401
    """
1402
    # We only really run POST phase hooks, and are only interested in
1403
    # their results
1404
    if phase == constants.HOOKS_PHASE_POST:
1405
      # Used to change hooks' output to proper indentation
1406
      indent_re = re.compile('^', re.M)
1407
      feedback_fn("* Hooks Results")
1408
      if not hooks_results:
1409
        feedback_fn("  - ERROR: general communication failure")
1410
        lu_result = 1
1411
      else:
1412
        for node_name in hooks_results:
1413
          show_node_header = True
1414
          res = hooks_results[node_name]
1415
          msg = res.fail_msg
1416
          if msg:
1417
            if res.offline:
1418
              # no need to warn or set fail return value
1419
              continue
1420
            feedback_fn("    Communication failure in hooks execution: %s" %
1421
                        msg)
1422
            lu_result = 1
1423
            continue
1424
          for script, hkr, output in res.payload:
1425
            if hkr == constants.HKR_FAIL:
1426
              # The node header is only shown once, if there are
1427
              # failing hooks on that node
1428
              if show_node_header:
1429
                feedback_fn("  Node %s:" % node_name)
1430
                show_node_header = False
1431
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1432
              output = indent_re.sub('      ', output)
1433
              feedback_fn("%s" % output)
1434
              lu_result = 1
1435

    
1436
      return lu_result
1437

    
1438

    
1439
class LUVerifyDisks(NoHooksLU):
1440
  """Verifies the cluster disks status.
1441

1442
  """
1443
  _OP_REQP = []
1444
  REQ_BGL = False
1445

    
1446
  def ExpandNames(self):
1447
    self.needed_locks = {
1448
      locking.LEVEL_NODE: locking.ALL_SET,
1449
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1450
    }
1451
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1452

    
1453
  def CheckPrereq(self):
1454
    """Check prerequisites.
1455

1456
    This has no prerequisites.
1457

1458
    """
1459
    pass
1460

    
1461
  def Exec(self, feedback_fn):
1462
    """Verify integrity of cluster disks.
1463

1464
    @rtype: tuple of three items
1465
    @return: a tuple of (dict of node-to-node_error, list of instances
1466
        which need activate-disks, dict of instance: (node, volume) for
1467
        missing volumes
1468

1469
    """
1470
    result = res_nodes, res_instances, res_missing = {}, [], {}
1471

    
1472
    vg_name = self.cfg.GetVGName()
1473
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1474
    instances = [self.cfg.GetInstanceInfo(name)
1475
                 for name in self.cfg.GetInstanceList()]
1476

    
1477
    nv_dict = {}
1478
    for inst in instances:
1479
      inst_lvs = {}
1480
      if (not inst.admin_up or
1481
          inst.disk_template not in constants.DTS_NET_MIRROR):
1482
        continue
1483
      inst.MapLVsByNode(inst_lvs)
1484
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1485
      for node, vol_list in inst_lvs.iteritems():
1486
        for vol in vol_list:
1487
          nv_dict[(node, vol)] = inst
1488

    
1489
    if not nv_dict:
1490
      return result
1491

    
1492
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1493

    
1494
    for node in nodes:
1495
      # node_volume
1496
      node_res = node_lvs[node]
1497
      if node_res.offline:
1498
        continue
1499
      msg = node_res.fail_msg
1500
      if msg:
1501
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1502
        res_nodes[node] = msg
1503
        continue
1504

    
1505
      lvs = node_res.payload
1506
      for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1507
        inst = nv_dict.pop((node, lv_name), None)
1508
        if (not lv_online and inst is not None
1509
            and inst.name not in res_instances):
1510
          res_instances.append(inst.name)
1511

    
1512
    # any leftover items in nv_dict are missing LVs, let's arrange the
1513
    # data better
1514
    for key, inst in nv_dict.iteritems():
1515
      if inst.name not in res_missing:
1516
        res_missing[inst.name] = []
1517
      res_missing[inst.name].append(key)
1518

    
1519
    return result
1520

    
1521

    
1522
class LURepairDiskSizes(NoHooksLU):
1523
  """Verifies the cluster disks sizes.
1524

1525
  """
1526
  _OP_REQP = ["instances"]
1527
  REQ_BGL = False
1528

    
1529
  def ExpandNames(self):
1530

    
1531
    if not isinstance(self.op.instances, list):
1532
      raise errors.OpPrereqError("Invalid argument type 'instances'")
1533

    
1534
    if self.op.instances:
1535
      self.wanted_names = []
1536
      for name in self.op.instances:
1537
        full_name = self.cfg.ExpandInstanceName(name)
1538
        if full_name is None:
1539
          raise errors.OpPrereqError("Instance '%s' not known" % name)
1540
        self.wanted_names.append(full_name)
1541
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1542
      self.needed_locks = {
1543
        locking.LEVEL_NODE: [],
1544
        locking.LEVEL_INSTANCE: self.wanted_names,
1545
        }
1546
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1547
    else:
1548
      self.wanted_names = None
1549
      self.needed_locks = {
1550
        locking.LEVEL_NODE: locking.ALL_SET,
1551
        locking.LEVEL_INSTANCE: locking.ALL_SET,
1552
        }
1553
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1554

    
1555
  def DeclareLocks(self, level):
1556
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
1557
      self._LockInstancesNodes(primary_only=True)
1558

    
1559
  def CheckPrereq(self):
1560
    """Check prerequisites.
1561

1562
    This only checks the optional instance list against the existing names.
1563

1564
    """
1565
    if self.wanted_names is None:
1566
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1567

    
1568
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1569
                             in self.wanted_names]
1570

    
1571
  def Exec(self, feedback_fn):
1572
    """Verify the size of cluster disks.
1573

1574
    """
1575
    # TODO: check child disks too
1576
    # TODO: check differences in size between primary/secondary nodes
1577
    per_node_disks = {}
1578
    for instance in self.wanted_instances:
1579
      pnode = instance.primary_node
1580
      if pnode not in per_node_disks:
1581
        per_node_disks[pnode] = []
1582
      for idx, disk in enumerate(instance.disks):
1583
        per_node_disks[pnode].append((instance, idx, disk))
1584

    
1585
    changed = []
1586
    for node, dskl in per_node_disks.items():
1587
      result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1588
      if result.failed:
1589
        self.LogWarning("Failure in blockdev_getsizes call to node"
1590
                        " %s, ignoring", node)
1591
        continue
1592
      if len(result.data) != len(dskl):
1593
        self.LogWarning("Invalid result from node %s, ignoring node results",
1594
                        node)
1595
        continue
1596
      for ((instance, idx, disk), size) in zip(dskl, result.data):
1597
        if size is None:
1598
          self.LogWarning("Disk %d of instance %s did not return size"
1599
                          " information, ignoring", idx, instance.name)
1600
          continue
1601
        if not isinstance(size, (int, long)):
1602
          self.LogWarning("Disk %d of instance %s did not return valid"
1603
                          " size information, ignoring", idx, instance.name)
1604
          continue
1605
        size = size >> 20
1606
        if size != disk.size:
1607
          self.LogInfo("Disk %d of instance %s has mismatched size,"
1608
                       " correcting: recorded %d, actual %d", idx,
1609
                       instance.name, disk.size, size)
1610
          disk.size = size
1611
          self.cfg.Update(instance)
1612
          changed.append((instance.name, idx, size))
1613
    return changed
1614

    
1615

    
1616
class LURenameCluster(LogicalUnit):
1617
  """Rename the cluster.
1618

1619
  """
1620
  HPATH = "cluster-rename"
1621
  HTYPE = constants.HTYPE_CLUSTER
1622
  _OP_REQP = ["name"]
1623

    
1624
  def BuildHooksEnv(self):
1625
    """Build hooks env.
1626

1627
    """
1628
    env = {
1629
      "OP_TARGET": self.cfg.GetClusterName(),
1630
      "NEW_NAME": self.op.name,
1631
      }
1632
    mn = self.cfg.GetMasterNode()
1633
    return env, [mn], [mn]
1634

    
1635
  def CheckPrereq(self):
1636
    """Verify that the passed name is a valid one.
1637

1638
    """
1639
    hostname = utils.HostInfo(self.op.name)
1640

    
1641
    new_name = hostname.name
1642
    self.ip = new_ip = hostname.ip
1643
    old_name = self.cfg.GetClusterName()
1644
    old_ip = self.cfg.GetMasterIP()
1645
    if new_name == old_name and new_ip == old_ip:
1646
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1647
                                 " cluster has changed")
1648
    if new_ip != old_ip:
1649
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1650
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1651
                                   " reachable on the network. Aborting." %
1652
                                   new_ip)
1653

    
1654
    self.op.name = new_name
1655

    
1656
  def Exec(self, feedback_fn):
1657
    """Rename the cluster.
1658

1659
    """
1660
    clustername = self.op.name
1661
    ip = self.ip
1662

    
1663
    # shutdown the master IP
1664
    master = self.cfg.GetMasterNode()
1665
    result = self.rpc.call_node_stop_master(master, False)
1666
    result.Raise("Could not disable the master role")
1667

    
1668
    try:
1669
      cluster = self.cfg.GetClusterInfo()
1670
      cluster.cluster_name = clustername
1671
      cluster.master_ip = ip
1672
      self.cfg.Update(cluster)
1673

    
1674
      # update the known hosts file
1675
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1676
      node_list = self.cfg.GetNodeList()
1677
      try:
1678
        node_list.remove(master)
1679
      except ValueError:
1680
        pass
1681
      result = self.rpc.call_upload_file(node_list,
1682
                                         constants.SSH_KNOWN_HOSTS_FILE)
1683
      for to_node, to_result in result.iteritems():
1684
        msg = to_result.fail_msg
1685
        if msg:
1686
          msg = ("Copy of file %s to node %s failed: %s" %
1687
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1688
          self.proc.LogWarning(msg)
1689

    
1690
    finally:
1691
      result = self.rpc.call_node_start_master(master, False, False)
1692
      msg = result.fail_msg
1693
      if msg:
1694
        self.LogWarning("Could not re-enable the master role on"
1695
                        " the master, please restart manually: %s", msg)
1696

    
1697

    
1698
def _RecursiveCheckIfLVMBased(disk):
1699
  """Check if the given disk or its children are lvm-based.
1700

1701
  @type disk: L{objects.Disk}
1702
  @param disk: the disk to check
1703
  @rtype: boolean
1704
  @return: boolean indicating whether a LD_LV dev_type was found or not
1705

1706
  """
1707
  if disk.children:
1708
    for chdisk in disk.children:
1709
      if _RecursiveCheckIfLVMBased(chdisk):
1710
        return True
1711
  return disk.dev_type == constants.LD_LV
1712

    
1713

    
1714
class LUSetClusterParams(LogicalUnit):
1715
  """Change the parameters of the cluster.
1716

1717
  """
1718
  HPATH = "cluster-modify"
1719
  HTYPE = constants.HTYPE_CLUSTER
1720
  _OP_REQP = []
1721
  REQ_BGL = False
1722

    
1723
  def CheckArguments(self):
1724
    """Check parameters
1725

1726
    """
1727
    if not hasattr(self.op, "candidate_pool_size"):
1728
      self.op.candidate_pool_size = None
1729
    if self.op.candidate_pool_size is not None:
1730
      try:
1731
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1732
      except (ValueError, TypeError), err:
1733
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1734
                                   str(err))
1735
      if self.op.candidate_pool_size < 1:
1736
        raise errors.OpPrereqError("At least one master candidate needed")
1737

    
1738
  def ExpandNames(self):
1739
    # FIXME: in the future maybe other cluster params won't require checking on
1740
    # all nodes to be modified.
1741
    self.needed_locks = {
1742
      locking.LEVEL_NODE: locking.ALL_SET,
1743
    }
1744
    self.share_locks[locking.LEVEL_NODE] = 1
1745

    
1746
  def BuildHooksEnv(self):
1747
    """Build hooks env.
1748

1749
    """
1750
    env = {
1751
      "OP_TARGET": self.cfg.GetClusterName(),
1752
      "NEW_VG_NAME": self.op.vg_name,
1753
      }
1754
    mn = self.cfg.GetMasterNode()
1755
    return env, [mn], [mn]
1756

    
1757
  def CheckPrereq(self):
1758
    """Check prerequisites.
1759

1760
    This checks whether the given params don't conflict and
1761
    if the given volume group is valid.
1762

1763
    """
1764
    if self.op.vg_name is not None and not self.op.vg_name:
1765
      instances = self.cfg.GetAllInstancesInfo().values()
1766
      for inst in instances:
1767
        for disk in inst.disks:
1768
          if _RecursiveCheckIfLVMBased(disk):
1769
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1770
                                       " lvm-based instances exist")
1771

    
1772
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1773

    
1774
    # if vg_name not None, checks given volume group on all nodes
1775
    if self.op.vg_name:
1776
      vglist = self.rpc.call_vg_list(node_list)
1777
      for node in node_list:
1778
        msg = vglist[node].fail_msg
1779
        if msg:
1780
          # ignoring down node
1781
          self.LogWarning("Error while gathering data on node %s"
1782
                          " (ignoring node): %s", node, msg)
1783
          continue
1784
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1785
                                              self.op.vg_name,
1786
                                              constants.MIN_VG_SIZE)
1787
        if vgstatus:
1788
          raise errors.OpPrereqError("Error on node '%s': %s" %
1789
                                     (node, vgstatus))
1790

    
1791
    self.cluster = cluster = self.cfg.GetClusterInfo()
1792
    # validate params changes
1793
    if self.op.beparams:
1794
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1795
      self.new_beparams = objects.FillDict(
1796
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1797

    
1798
    if self.op.nicparams:
1799
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1800
      self.new_nicparams = objects.FillDict(
1801
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1802
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1803

    
1804
    # hypervisor list/parameters
1805
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1806
    if self.op.hvparams:
1807
      if not isinstance(self.op.hvparams, dict):
1808
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1809
      for hv_name, hv_dict in self.op.hvparams.items():
1810
        if hv_name not in self.new_hvparams:
1811
          self.new_hvparams[hv_name] = hv_dict
1812
        else:
1813
          self.new_hvparams[hv_name].update(hv_dict)
1814

    
1815
    if self.op.enabled_hypervisors is not None:
1816
      self.hv_list = self.op.enabled_hypervisors
1817
      if not self.hv_list:
1818
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1819
                                   " least one member")
1820
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1821
      if invalid_hvs:
1822
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1823
                                   " entries: %s" %
1824
                                   utils.CommaJoin(invalid_hvs))
1825
    else:
1826
      self.hv_list = cluster.enabled_hypervisors
1827

    
1828
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1829
      # either the enabled list has changed, or the parameters have, validate
1830
      for hv_name, hv_params in self.new_hvparams.items():
1831
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1832
            (self.op.enabled_hypervisors and
1833
             hv_name in self.op.enabled_hypervisors)):
1834
          # either this is a new hypervisor, or its parameters have changed
1835
          hv_class = hypervisor.GetHypervisor(hv_name)
1836
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1837
          hv_class.CheckParameterSyntax(hv_params)
1838
          _CheckHVParams(self, node_list, hv_name, hv_params)
1839

    
1840
  def Exec(self, feedback_fn):
1841
    """Change the parameters of the cluster.
1842

1843
    """
1844
    if self.op.vg_name is not None:
1845
      new_volume = self.op.vg_name
1846
      if not new_volume:
1847
        new_volume = None
1848
      if new_volume != self.cfg.GetVGName():
1849
        self.cfg.SetVGName(new_volume)
1850
      else:
1851
        feedback_fn("Cluster LVM configuration already in desired"
1852
                    " state, not changing")
1853
    if self.op.hvparams:
1854
      self.cluster.hvparams = self.new_hvparams
1855
    if self.op.enabled_hypervisors is not None:
1856
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1857
    if self.op.beparams:
1858
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1859
    if self.op.nicparams:
1860
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1861

    
1862
    if self.op.candidate_pool_size is not None:
1863
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1864
      # we need to update the pool size here, otherwise the save will fail
1865
      _AdjustCandidatePool(self)
1866

    
1867
    self.cfg.Update(self.cluster)
1868

    
1869

    
1870
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1871
  """Distribute additional files which are part of the cluster configuration.
1872

1873
  ConfigWriter takes care of distributing the config and ssconf files, but
1874
  there are more files which should be distributed to all nodes. This function
1875
  makes sure those are copied.
1876

1877
  @param lu: calling logical unit
1878
  @param additional_nodes: list of nodes not in the config to distribute to
1879

1880
  """
1881
  # 1. Gather target nodes
1882
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1883
  dist_nodes = lu.cfg.GetNodeList()
1884
  if additional_nodes is not None:
1885
    dist_nodes.extend(additional_nodes)
1886
  if myself.name in dist_nodes:
1887
    dist_nodes.remove(myself.name)
1888
  # 2. Gather files to distribute
1889
  dist_files = set([constants.ETC_HOSTS,
1890
                    constants.SSH_KNOWN_HOSTS_FILE,
1891
                    constants.RAPI_CERT_FILE,
1892
                    constants.RAPI_USERS_FILE,
1893
                    constants.HMAC_CLUSTER_KEY,
1894
                   ])
1895

    
1896
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1897
  for hv_name in enabled_hypervisors:
1898
    hv_class = hypervisor.GetHypervisor(hv_name)
1899
    dist_files.update(hv_class.GetAncillaryFiles())
1900

    
1901
  # 3. Perform the files upload
1902
  for fname in dist_files:
1903
    if os.path.exists(fname):
1904
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1905
      for to_node, to_result in result.items():
1906
        msg = to_result.fail_msg
1907
        if msg:
1908
          msg = ("Copy of file %s to node %s failed: %s" %
1909
                 (fname, to_node, msg))
1910
          lu.proc.LogWarning(msg)
1911

    
1912

    
1913
class LURedistributeConfig(NoHooksLU):
1914
  """Force the redistribution of cluster configuration.
1915

1916
  This is a very simple LU.
1917

1918
  """
1919
  _OP_REQP = []
1920
  REQ_BGL = False
1921

    
1922
  def ExpandNames(self):
1923
    self.needed_locks = {
1924
      locking.LEVEL_NODE: locking.ALL_SET,
1925
    }
1926
    self.share_locks[locking.LEVEL_NODE] = 1
1927

    
1928
  def CheckPrereq(self):
1929
    """Check prerequisites.
1930

1931
    """
1932

    
1933
  def Exec(self, feedback_fn):
1934
    """Redistribute the configuration.
1935

1936
    """
1937
    self.cfg.Update(self.cfg.GetClusterInfo())
1938
    _RedistributeAncillaryFiles(self)
1939

    
1940

    
1941
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1942
  """Sleep and poll for an instance's disk to sync.
1943

1944
  """
1945
  if not instance.disks:
1946
    return True
1947

    
1948
  if not oneshot:
1949
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1950

    
1951
  node = instance.primary_node
1952

    
1953
  for dev in instance.disks:
1954
    lu.cfg.SetDiskID(dev, node)
1955

    
1956
  retries = 0
1957
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1958
  while True:
1959
    max_time = 0
1960
    done = True
1961
    cumul_degraded = False
1962
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1963
    msg = rstats.fail_msg
1964
    if msg:
1965
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1966
      retries += 1
1967
      if retries >= 10:
1968
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1969
                                 " aborting." % node)
1970
      time.sleep(6)
1971
      continue
1972
    rstats = rstats.payload
1973
    retries = 0
1974
    for i, mstat in enumerate(rstats):
1975
      if mstat is None:
1976
        lu.LogWarning("Can't compute data for node %s/%s",
1977
                           node, instance.disks[i].iv_name)
1978
        continue
1979

    
1980
      cumul_degraded = (cumul_degraded or
1981
                        (mstat.is_degraded and mstat.sync_percent is None))
1982
      if mstat.sync_percent is not None:
1983
        done = False
1984
        if mstat.estimated_time is not None:
1985
          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
1986
          max_time = mstat.estimated_time
1987
        else:
1988
          rem_time = "no time estimate"
1989
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1990
                        (instance.disks[i].iv_name, mstat.sync_percent, rem_time))
1991

    
1992
    # if we're done but degraded, let's do a few small retries, to
1993
    # make sure we see a stable and not transient situation; therefore
1994
    # we force restart of the loop
1995
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1996
      logging.info("Degraded disks found, %d retries left", degr_retries)
1997
      degr_retries -= 1
1998
      time.sleep(1)
1999
      continue
2000

    
2001
    if done or oneshot:
2002
      break
2003

    
2004
    time.sleep(min(60, max_time))
2005

    
2006
  if done:
2007
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2008
  return not cumul_degraded
2009

    
2010

    
2011
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2012
  """Check that mirrors are not degraded.
2013

2014
  The ldisk parameter, if True, will change the test from the
2015
  is_degraded attribute (which represents overall non-ok status for
2016
  the device(s)) to the ldisk (representing the local storage status).
2017

2018
  """
2019
  lu.cfg.SetDiskID(dev, node)
2020

    
2021
  result = True
2022

    
2023
  if on_primary or dev.AssembleOnSecondary():
2024
    rstats = lu.rpc.call_blockdev_find(node, dev)
2025
    msg = rstats.fail_msg
2026
    if msg:
2027
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2028
      result = False
2029
    elif not rstats.payload:
2030
      lu.LogWarning("Can't find disk on node %s", node)
2031
      result = False
2032
    else:
2033
      if ldisk:
2034
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2035
      else:
2036
        result = result and not rstats.payload.is_degraded
2037

    
2038
  if dev.children:
2039
    for child in dev.children:
2040
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2041

    
2042
  return result
2043

    
2044

    
2045
class LUDiagnoseOS(NoHooksLU):
2046
  """Logical unit for OS diagnose/query.
2047

2048
  """
2049
  _OP_REQP = ["output_fields", "names"]
2050
  REQ_BGL = False
2051
  _FIELDS_STATIC = utils.FieldSet()
2052
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2053

    
2054
  def ExpandNames(self):
2055
    if self.op.names:
2056
      raise errors.OpPrereqError("Selective OS query not supported")
2057

    
2058
    _CheckOutputFields(static=self._FIELDS_STATIC,
2059
                       dynamic=self._FIELDS_DYNAMIC,
2060
                       selected=self.op.output_fields)
2061

    
2062
    # Lock all nodes, in shared mode
2063
    # Temporary removal of locks, should be reverted later
2064
    # TODO: reintroduce locks when they are lighter-weight
2065
    self.needed_locks = {}
2066
    #self.share_locks[locking.LEVEL_NODE] = 1
2067
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2068

    
2069
  def CheckPrereq(self):
2070
    """Check prerequisites.
2071

2072
    """
2073

    
2074
  @staticmethod
2075
  def _DiagnoseByOS(node_list, rlist):
2076
    """Remaps a per-node return list into an a per-os per-node dictionary
2077

2078
    @param node_list: a list with the names of all nodes
2079
    @param rlist: a map with node names as keys and OS objects as values
2080

2081
    @rtype: dict
2082
    @return: a dictionary with osnames as keys and as value another map, with
2083
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
2084

2085
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2086
                                     (/srv/..., False, "invalid api")],
2087
                           "node2": [(/srv/..., True, "")]}
2088
          }
2089

2090
    """
2091
    all_os = {}
2092
    # we build here the list of nodes that didn't fail the RPC (at RPC
2093
    # level), so that nodes with a non-responding node daemon don't
2094
    # make all OSes invalid
2095
    good_nodes = [node_name for node_name in rlist
2096
                  if not rlist[node_name].fail_msg]
2097
    for node_name, nr in rlist.items():
2098
      if nr.fail_msg or not nr.payload:
2099
        continue
2100
      for name, path, status, diagnose in nr.payload:
2101
        if name not in all_os:
2102
          # build a list of nodes for this os containing empty lists
2103
          # for each node in node_list
2104
          all_os[name] = {}
2105
          for nname in good_nodes:
2106
            all_os[name][nname] = []
2107
        all_os[name][node_name].append((path, status, diagnose))
2108
    return all_os
2109

    
2110
  def Exec(self, feedback_fn):
2111
    """Compute the list of OSes.
2112

2113
    """
2114
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2115
    node_data = self.rpc.call_os_diagnose(valid_nodes)
2116
    pol = self._DiagnoseByOS(valid_nodes, node_data)
2117
    output = []
2118
    for os_name, os_data in pol.items():
2119
      row = []
2120
      for field in self.op.output_fields:
2121
        if field == "name":
2122
          val = os_name
2123
        elif field == "valid":
2124
          val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2125
        elif field == "node_status":
2126
          # this is just a copy of the dict
2127
          val = {}
2128
          for node_name, nos_list in os_data.items():
2129
            val[node_name] = nos_list
2130
        else:
2131
          raise errors.ParameterError(field)
2132
        row.append(val)
2133
      output.append(row)
2134

    
2135
    return output
2136

    
2137

    
2138
class LURemoveNode(LogicalUnit):
2139
  """Logical unit for removing a node.
2140

2141
  """
2142
  HPATH = "node-remove"
2143
  HTYPE = constants.HTYPE_NODE
2144
  _OP_REQP = ["node_name"]
2145

    
2146
  def BuildHooksEnv(self):
2147
    """Build hooks env.
2148

2149
    This doesn't run on the target node in the pre phase as a failed
2150
    node would then be impossible to remove.
2151

2152
    """
2153
    env = {
2154
      "OP_TARGET": self.op.node_name,
2155
      "NODE_NAME": self.op.node_name,
2156
      }
2157
    all_nodes = self.cfg.GetNodeList()
2158
    if self.op.node_name in all_nodes:
2159
      all_nodes.remove(self.op.node_name)
2160
    return env, all_nodes, all_nodes
2161

    
2162
  def CheckPrereq(self):
2163
    """Check prerequisites.
2164

2165
    This checks:
2166
     - the node exists in the configuration
2167
     - it does not have primary or secondary instances
2168
     - it's not the master
2169

2170
    Any errors are signaled by raising errors.OpPrereqError.
2171

2172
    """
2173
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2174
    if node is None:
2175
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2176

    
2177
    instance_list = self.cfg.GetInstanceList()
2178

    
2179
    masternode = self.cfg.GetMasterNode()
2180
    if node.name == masternode:
2181
      raise errors.OpPrereqError("Node is the master node,"
2182
                                 " you need to failover first.")
2183

    
2184
    for instance_name in instance_list:
2185
      instance = self.cfg.GetInstanceInfo(instance_name)
2186
      if node.name in instance.all_nodes:
2187
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2188
                                   " please remove first." % instance_name)
2189
    self.op.node_name = node.name
2190
    self.node = node
2191

    
2192
  def Exec(self, feedback_fn):
2193
    """Removes the node from the cluster.
2194

2195
    """
2196
    node = self.node
2197
    logging.info("Stopping the node daemon and removing configs from node %s",
2198
                 node.name)
2199

    
2200
    self.context.RemoveNode(node.name)
2201

    
2202
    # Run post hooks on the node before it's removed
2203
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2204
    try:
2205
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2206
    finally:
2207
      res = h_results[node.name]
2208
      if res.fail_msg:
2209
        if not res.offline:
2210
          self.LogError("Failed to start hooks on %s: %s" %
2211
                        (node.name, res.fail_msg))
2212
      for script, hkr, output in res.payload:
2213
        if hkr != constants.HKR_FAIL:
2214
          continue
2215
        if output:
2216
          self.LogWarning("On %s script %s failed, output:  %s" %
2217
                          (node.name, script, output))
2218
        else:
2219
          self.LogWarning("On %s script %s failed (no output)." %
2220
                          (node.name, script))
2221

    
2222
    result = self.rpc.call_node_leave_cluster(node.name)
2223
    msg = result.fail_msg
2224
    if msg:
2225
      self.LogWarning("Errors encountered on the remote node while leaving"
2226
                      " the cluster: %s", msg)
2227

    
2228
    # Promote nodes to master candidate as needed
2229
    _AdjustCandidatePool(self)
2230

    
2231

    
2232
class LUQueryNodes(NoHooksLU):
2233
  """Logical unit for querying nodes.
2234

2235
  """
2236
  _OP_REQP = ["output_fields", "names", "use_locking"]
2237
  REQ_BGL = False
2238
  _FIELDS_DYNAMIC = utils.FieldSet(
2239
    "dtotal", "dfree",
2240
    "mtotal", "mnode", "mfree",
2241
    "bootid",
2242
    "ctotal", "cnodes", "csockets",
2243
    )
2244

    
2245
  _FIELDS_STATIC = utils.FieldSet(
2246
    "name", "pinst_cnt", "sinst_cnt",
2247
    "pinst_list", "sinst_list",
2248
    "pip", "sip", "tags",
2249
    "serial_no", "ctime", "mtime",
2250
    "master_candidate",
2251
    "master",
2252
    "offline",
2253
    "drained",
2254
    "role",
2255
    )
2256

    
2257
  def ExpandNames(self):
2258
    _CheckOutputFields(static=self._FIELDS_STATIC,
2259
                       dynamic=self._FIELDS_DYNAMIC,
2260
                       selected=self.op.output_fields)
2261

    
2262
    self.needed_locks = {}
2263
    self.share_locks[locking.LEVEL_NODE] = 1
2264

    
2265
    if self.op.names:
2266
      self.wanted = _GetWantedNodes(self, self.op.names)
2267
    else:
2268
      self.wanted = locking.ALL_SET
2269

    
2270
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2271
    self.do_locking = self.do_node_query and self.op.use_locking
2272
    if self.do_locking:
2273
      # if we don't request only static fields, we need to lock the nodes
2274
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2275

    
2276

    
2277
  def CheckPrereq(self):
2278
    """Check prerequisites.
2279

2280
    """
2281
    # The validation of the node list is done in the _GetWantedNodes,
2282
    # if non empty, and if empty, there's no validation to do
2283
    pass
2284

    
2285
  def Exec(self, feedback_fn):
2286
    """Computes the list of nodes and their attributes.
2287

2288
    """
2289
    all_info = self.cfg.GetAllNodesInfo()
2290
    if self.do_locking:
2291
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2292
    elif self.wanted != locking.ALL_SET:
2293
      nodenames = self.wanted
2294
      missing = set(nodenames).difference(all_info.keys())
2295
      if missing:
2296
        raise errors.OpExecError(
2297
          "Some nodes were removed before retrieving their data: %s" % missing)
2298
    else:
2299
      nodenames = all_info.keys()
2300

    
2301
    nodenames = utils.NiceSort(nodenames)
2302
    nodelist = [all_info[name] for name in nodenames]
2303

    
2304
    # begin data gathering
2305

    
2306
    if self.do_node_query:
2307
      live_data = {}
2308
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2309
                                          self.cfg.GetHypervisorType())
2310
      for name in nodenames:
2311
        nodeinfo = node_data[name]
2312
        if not nodeinfo.fail_msg and nodeinfo.payload:
2313
          nodeinfo = nodeinfo.payload
2314
          fn = utils.TryConvert
2315
          live_data[name] = {
2316
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2317
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2318
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2319
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2320
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2321
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2322
            "bootid": nodeinfo.get('bootid', None),
2323
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2324
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2325
            }
2326
        else:
2327
          live_data[name] = {}
2328
    else:
2329
      live_data = dict.fromkeys(nodenames, {})
2330

    
2331
    node_to_primary = dict([(name, set()) for name in nodenames])
2332
    node_to_secondary = dict([(name, set()) for name in nodenames])
2333

    
2334
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2335
                             "sinst_cnt", "sinst_list"))
2336
    if inst_fields & frozenset(self.op.output_fields):
2337
      instancelist = self.cfg.GetInstanceList()
2338

    
2339
      for instance_name in instancelist:
2340
        inst = self.cfg.GetInstanceInfo(instance_name)
2341
        if inst.primary_node in node_to_primary:
2342
          node_to_primary[inst.primary_node].add(inst.name)
2343
        for secnode in inst.secondary_nodes:
2344
          if secnode in node_to_secondary:
2345
            node_to_secondary[secnode].add(inst.name)
2346

    
2347
    master_node = self.cfg.GetMasterNode()
2348

    
2349
    # end data gathering
2350

    
2351
    output = []
2352
    for node in nodelist:
2353
      node_output = []
2354
      for field in self.op.output_fields:
2355
        if field == "name":
2356
          val = node.name
2357
        elif field == "pinst_list":
2358
          val = list(node_to_primary[node.name])
2359
        elif field == "sinst_list":
2360
          val = list(node_to_secondary[node.name])
2361
        elif field == "pinst_cnt":
2362
          val = len(node_to_primary[node.name])
2363
        elif field == "sinst_cnt":
2364
          val = len(node_to_secondary[node.name])
2365
        elif field == "pip":
2366
          val = node.primary_ip
2367
        elif field == "sip":
2368
          val = node.secondary_ip
2369
        elif field == "tags":
2370
          val = list(node.GetTags())
2371
        elif field == "serial_no":
2372
          val = node.serial_no
2373
        elif field == "ctime":
2374
          val = node.ctime
2375
        elif field == "mtime":
2376
          val = node.mtime
2377
        elif field == "master_candidate":
2378
          val = node.master_candidate
2379
        elif field == "master":
2380
          val = node.name == master_node
2381
        elif field == "offline":
2382
          val = node.offline
2383
        elif field == "drained":
2384
          val = node.drained
2385
        elif self._FIELDS_DYNAMIC.Matches(field):
2386
          val = live_data[node.name].get(field, None)
2387
        elif field == "role":
2388
          if node.name == master_node:
2389
            val = "M"
2390
          elif node.master_candidate:
2391
            val = "C"
2392
          elif node.drained:
2393
            val = "D"
2394
          elif node.offline:
2395
            val = "O"
2396
          else:
2397
            val = "R"
2398
        else:
2399
          raise errors.ParameterError(field)
2400
        node_output.append(val)
2401
      output.append(node_output)
2402

    
2403
    return output
2404

    
2405

    
2406
class LUQueryNodeVolumes(NoHooksLU):
2407
  """Logical unit for getting volumes on node(s).
2408

2409
  """
2410
  _OP_REQP = ["nodes", "output_fields"]
2411
  REQ_BGL = False
2412
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2413
  _FIELDS_STATIC = utils.FieldSet("node")
2414

    
2415
  def ExpandNames(self):
2416
    _CheckOutputFields(static=self._FIELDS_STATIC,
2417
                       dynamic=self._FIELDS_DYNAMIC,
2418
                       selected=self.op.output_fields)
2419

    
2420
    self.needed_locks = {}
2421
    self.share_locks[locking.LEVEL_NODE] = 1
2422
    if not self.op.nodes:
2423
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2424
    else:
2425
      self.needed_locks[locking.LEVEL_NODE] = \
2426
        _GetWantedNodes(self, self.op.nodes)
2427

    
2428
  def CheckPrereq(self):
2429
    """Check prerequisites.
2430

2431
    This checks that the fields required are valid output fields.
2432

2433
    """
2434
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2435

    
2436
  def Exec(self, feedback_fn):
2437
    """Computes the list of nodes and their attributes.
2438

2439
    """
2440
    nodenames = self.nodes
2441
    volumes = self.rpc.call_node_volumes(nodenames)
2442

    
2443
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2444
             in self.cfg.GetInstanceList()]
2445

    
2446
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2447

    
2448
    output = []
2449
    for node in nodenames:
2450
      nresult = volumes[node]
2451
      if nresult.offline:
2452
        continue
2453
      msg = nresult.fail_msg
2454
      if msg:
2455
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2456
        continue
2457

    
2458
      node_vols = nresult.payload[:]
2459
      node_vols.sort(key=lambda vol: vol['dev'])
2460

    
2461
      for vol in node_vols:
2462
        node_output = []
2463
        for field in self.op.output_fields:
2464
          if field == "node":
2465
            val = node
2466
          elif field == "phys":
2467
            val = vol['dev']
2468
          elif field == "vg":
2469
            val = vol['vg']
2470
          elif field == "name":
2471
            val = vol['name']
2472
          elif field == "size":
2473
            val = int(float(vol['size']))
2474
          elif field == "instance":
2475
            for inst in ilist:
2476
              if node not in lv_by_node[inst]:
2477
                continue
2478
              if vol['name'] in lv_by_node[inst][node]:
2479
                val = inst.name
2480
                break
2481
            else:
2482
              val = '-'
2483
          else:
2484
            raise errors.ParameterError(field)
2485
          node_output.append(str(val))
2486

    
2487
        output.append(node_output)
2488

    
2489
    return output
2490

    
2491

    
2492
class LUQueryNodeStorage(NoHooksLU):
2493
  """Logical unit for getting information on storage units on node(s).
2494

2495
  """
2496
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
2497
  REQ_BGL = False
2498
  _FIELDS_STATIC = utils.FieldSet("node")
2499

    
2500
  def ExpandNames(self):
2501
    storage_type = self.op.storage_type
2502

    
2503
    if storage_type not in constants.VALID_STORAGE_FIELDS:
2504
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2505

    
2506
    dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2507

    
2508
    _CheckOutputFields(static=self._FIELDS_STATIC,
2509
                       dynamic=utils.FieldSet(*dynamic_fields),
2510
                       selected=self.op.output_fields)
2511

    
2512
    self.needed_locks = {}
2513
    self.share_locks[locking.LEVEL_NODE] = 1
2514

    
2515
    if self.op.nodes:
2516
      self.needed_locks[locking.LEVEL_NODE] = \
2517
        _GetWantedNodes(self, self.op.nodes)
2518
    else:
2519
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2520

    
2521
  def CheckPrereq(self):
2522
    """Check prerequisites.
2523

2524
    This checks that the fields required are valid output fields.
2525

2526
    """
2527
    self.op.name = getattr(self.op, "name", None)
2528

    
2529
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2530

    
2531
  def Exec(self, feedback_fn):
2532
    """Computes the list of nodes and their attributes.
2533

2534
    """
2535
    # Always get name to sort by
2536
    if constants.SF_NAME in self.op.output_fields:
2537
      fields = self.op.output_fields[:]
2538
    else:
2539
      fields = [constants.SF_NAME] + self.op.output_fields
2540

    
2541
    # Never ask for node as it's only known to the LU
2542
    while "node" in fields:
2543
      fields.remove("node")
2544

    
2545
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2546
    name_idx = field_idx[constants.SF_NAME]
2547

    
2548
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2549
    data = self.rpc.call_storage_list(self.nodes,
2550
                                      self.op.storage_type, st_args,
2551
                                      self.op.name, fields)
2552

    
2553
    result = []
2554

    
2555
    for node in utils.NiceSort(self.nodes):
2556
      nresult = data[node]
2557
      if nresult.offline:
2558
        continue
2559

    
2560
      msg = nresult.fail_msg
2561
      if msg:
2562
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2563
        continue
2564

    
2565
      rows = dict([(row[name_idx], row) for row in nresult.payload])
2566

    
2567
      for name in utils.NiceSort(rows.keys()):
2568
        row = rows[name]
2569

    
2570
        out = []
2571

    
2572
        for field in self.op.output_fields:
2573
          if field == "node":
2574
            val = node
2575
          elif field in field_idx:
2576
            val = row[field_idx[field]]
2577
          else:
2578
            raise errors.ParameterError(field)
2579

    
2580
          out.append(val)
2581

    
2582
        result.append(out)
2583

    
2584
    return result
2585

    
2586

    
2587
class LUModifyNodeStorage(NoHooksLU):
2588
  """Logical unit for modifying a storage volume on a node.
2589

2590
  """
2591
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2592
  REQ_BGL = False
2593

    
2594
  def CheckArguments(self):
2595
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2596
    if node_name is None:
2597
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2598

    
2599
    self.op.node_name = node_name
2600

    
2601
    storage_type = self.op.storage_type
2602
    if storage_type not in constants.VALID_STORAGE_FIELDS:
2603
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2604

    
2605
  def ExpandNames(self):
2606
    self.needed_locks = {
2607
      locking.LEVEL_NODE: self.op.node_name,
2608
      }
2609

    
2610
  def CheckPrereq(self):
2611
    """Check prerequisites.
2612

2613
    """
2614
    storage_type = self.op.storage_type
2615

    
2616
    try:
2617
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2618
    except KeyError:
2619
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
2620
                                 " modified" % storage_type)
2621

    
2622
    diff = set(self.op.changes.keys()) - modifiable
2623
    if diff:
2624
      raise errors.OpPrereqError("The following fields can not be modified for"
2625
                                 " storage units of type '%s': %r" %
2626
                                 (storage_type, list(diff)))
2627

    
2628
  def Exec(self, feedback_fn):
2629
    """Computes the list of nodes and their attributes.
2630

2631
    """
2632
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2633
    result = self.rpc.call_storage_modify(self.op.node_name,
2634
                                          self.op.storage_type, st_args,
2635
                                          self.op.name, self.op.changes)
2636
    result.Raise("Failed to modify storage unit '%s' on %s" %
2637
                 (self.op.name, self.op.node_name))
2638

    
2639

    
2640
class LUAddNode(LogicalUnit):
2641
  """Logical unit for adding node to the cluster.
2642

2643
  """
2644
  HPATH = "node-add"
2645
  HTYPE = constants.HTYPE_NODE
2646
  _OP_REQP = ["node_name"]
2647

    
2648
  def BuildHooksEnv(self):
2649
    """Build hooks env.
2650

2651
    This will run on all nodes before, and on all nodes + the new node after.
2652

2653
    """
2654
    env = {
2655
      "OP_TARGET": self.op.node_name,
2656
      "NODE_NAME": self.op.node_name,
2657
      "NODE_PIP": self.op.primary_ip,
2658
      "NODE_SIP": self.op.secondary_ip,
2659
      }
2660
    nodes_0 = self.cfg.GetNodeList()
2661
    nodes_1 = nodes_0 + [self.op.node_name, ]
2662
    return env, nodes_0, nodes_1
2663

    
2664
  def CheckPrereq(self):
2665
    """Check prerequisites.
2666

2667
    This checks:
2668
     - the new node is not already in the config
2669
     - it is resolvable
2670
     - its parameters (single/dual homed) matches the cluster
2671

2672
    Any errors are signaled by raising errors.OpPrereqError.
2673

2674
    """
2675
    node_name = self.op.node_name
2676
    cfg = self.cfg
2677

    
2678
    dns_data = utils.HostInfo(node_name)
2679

    
2680
    node = dns_data.name
2681
    primary_ip = self.op.primary_ip = dns_data.ip
2682
    secondary_ip = getattr(self.op, "secondary_ip", None)
2683
    if secondary_ip is None:
2684
      secondary_ip = primary_ip
2685
    if not utils.IsValidIP(secondary_ip):
2686
      raise errors.OpPrereqError("Invalid secondary IP given")
2687
    self.op.secondary_ip = secondary_ip
2688

    
2689
    node_list = cfg.GetNodeList()
2690
    if not self.op.readd and node in node_list:
2691
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2692
                                 node)
2693
    elif self.op.readd and node not in node_list:
2694
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2695

    
2696
    for existing_node_name in node_list:
2697
      existing_node = cfg.GetNodeInfo(existing_node_name)
2698

    
2699
      if self.op.readd and node == existing_node_name:
2700
        if (existing_node.primary_ip != primary_ip or
2701
            existing_node.secondary_ip != secondary_ip):
2702
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2703
                                     " address configuration as before")
2704
        continue
2705

    
2706
      if (existing_node.primary_ip == primary_ip or
2707
          existing_node.secondary_ip == primary_ip or
2708
          existing_node.primary_ip == secondary_ip or
2709
          existing_node.secondary_ip == secondary_ip):
2710
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2711
                                   " existing node %s" % existing_node.name)
2712

    
2713
    # check that the type of the node (single versus dual homed) is the
2714
    # same as for the master
2715
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2716
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2717
    newbie_singlehomed = secondary_ip == primary_ip
2718
    if master_singlehomed != newbie_singlehomed:
2719
      if master_singlehomed:
2720
        raise errors.OpPrereqError("The master has no private ip but the"
2721
                                   " new node has one")
2722
      else:
2723
        raise errors.OpPrereqError("The master has a private ip but the"
2724
                                   " new node doesn't have one")
2725

    
2726
    # checks reachability
2727
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2728
      raise errors.OpPrereqError("Node not reachable by ping")
2729

    
2730
    if not newbie_singlehomed:
2731
      # check reachability from my secondary ip to newbie's secondary ip
2732
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2733
                           source=myself.secondary_ip):
2734
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2735
                                   " based ping to noded port")
2736

    
2737
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2738
    if self.op.readd:
2739
      exceptions = [node]
2740
    else:
2741
      exceptions = []
2742
    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2743
    # the new node will increase mc_max with one, so:
2744
    mc_max = min(mc_max + 1, cp_size)
2745
    self.master_candidate = mc_now < mc_max
2746

    
2747
    if self.op.readd:
2748
      self.new_node = self.cfg.GetNodeInfo(node)
2749
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2750
    else:
2751
      self.new_node = objects.Node(name=node,
2752
                                   primary_ip=primary_ip,
2753
                                   secondary_ip=secondary_ip,
2754
                                   master_candidate=self.master_candidate,
2755
                                   offline=False, drained=False)
2756

    
2757
  def Exec(self, feedback_fn):
2758
    """Adds the new node to the cluster.
2759

2760
    """
2761
    new_node = self.new_node
2762
    node = new_node.name
2763

    
2764
    # for re-adds, reset the offline/drained/master-candidate flags;
2765
    # we need to reset here, otherwise offline would prevent RPC calls
2766
    # later in the procedure; this also means that if the re-add
2767
    # fails, we are left with a non-offlined, broken node
2768
    if self.op.readd:
2769
      new_node.drained = new_node.offline = False
2770
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2771
      # if we demote the node, we do cleanup later in the procedure
2772
      new_node.master_candidate = self.master_candidate
2773

    
2774
    # notify the user about any possible mc promotion
2775
    if new_node.master_candidate:
2776
      self.LogInfo("Node will be a master candidate")
2777

    
2778
    # check connectivity
2779
    result = self.rpc.call_version([node])[node]
2780
    result.Raise("Can't get version information from node %s" % node)
2781
    if constants.PROTOCOL_VERSION == result.payload:
2782
      logging.info("Communication to node %s fine, sw version %s match",
2783
                   node, result.payload)
2784
    else:
2785
      raise errors.OpExecError("Version mismatch master version %s,"
2786
                               " node version %s" %
2787
                               (constants.PROTOCOL_VERSION, result.payload))
2788

    
2789
    # setup ssh on node
2790
    logging.info("Copy ssh key to node %s", node)
2791
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2792
    keyarray = []
2793
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2794
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2795
                priv_key, pub_key]
2796

    
2797
    for i in keyfiles:
2798
      f = open(i, 'r')
2799
      try:
2800
        keyarray.append(f.read())
2801
      finally:
2802
        f.close()
2803

    
2804
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2805
                                    keyarray[2],
2806
                                    keyarray[3], keyarray[4], keyarray[5])
2807
    result.Raise("Cannot transfer ssh keys to the new node")
2808

    
2809
    # Add node to our /etc/hosts, and add key to known_hosts
2810
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2811
      utils.AddHostToEtcHosts(new_node.name)
2812

    
2813
    if new_node.secondary_ip != new_node.primary_ip:
2814
      result = self.rpc.call_node_has_ip_address(new_node.name,
2815
                                                 new_node.secondary_ip)
2816
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2817
                   prereq=True)
2818
      if not result.payload:
2819
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2820
                                 " you gave (%s). Please fix and re-run this"
2821
                                 " command." % new_node.secondary_ip)
2822

    
2823
    node_verify_list = [self.cfg.GetMasterNode()]
2824
    node_verify_param = {
2825
      'nodelist': [node],
2826
      # TODO: do a node-net-test as well?
2827
    }
2828

    
2829
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2830
                                       self.cfg.GetClusterName())
2831
    for verifier in node_verify_list:
2832
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2833
      nl_payload = result[verifier].payload['nodelist']
2834
      if nl_payload:
2835
        for failed in nl_payload:
2836
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2837
                      (verifier, nl_payload[failed]))
2838
        raise errors.OpExecError("ssh/hostname verification failed.")
2839

    
2840
    if self.op.readd:
2841
      _RedistributeAncillaryFiles(self)
2842
      self.context.ReaddNode(new_node)
2843
      # make sure we redistribute the config
2844
      self.cfg.Update(new_node)
2845
      # and make sure the new node will not have old files around
2846
      if not new_node.master_candidate:
2847
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2848
        msg = result.RemoteFailMsg()
2849
        if msg:
2850
          self.LogWarning("Node failed to demote itself from master"
2851
                          " candidate status: %s" % msg)
2852
    else:
2853
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2854
      self.context.AddNode(new_node)
2855

    
2856

    
2857
class LUSetNodeParams(LogicalUnit):
2858
  """Modifies the parameters of a node.
2859

2860
  """
2861
  HPATH = "node-modify"
2862
  HTYPE = constants.HTYPE_NODE
2863
  _OP_REQP = ["node_name"]
2864
  REQ_BGL = False
2865

    
2866
  def CheckArguments(self):
2867
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2868
    if node_name is None:
2869
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2870
    self.op.node_name = node_name
2871
    _CheckBooleanOpField(self.op, 'master_candidate')
2872
    _CheckBooleanOpField(self.op, 'offline')
2873
    _CheckBooleanOpField(self.op, 'drained')
2874
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2875
    if all_mods.count(None) == 3:
2876
      raise errors.OpPrereqError("Please pass at least one modification")
2877
    if all_mods.count(True) > 1:
2878
      raise errors.OpPrereqError("Can't set the node into more than one"
2879
                                 " state at the same time")
2880

    
2881
  def ExpandNames(self):
2882
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2883

    
2884
  def BuildHooksEnv(self):
2885
    """Build hooks env.
2886

2887
    This runs on the master node.
2888

2889
    """
2890
    env = {
2891
      "OP_TARGET": self.op.node_name,
2892
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2893
      "OFFLINE": str(self.op.offline),
2894
      "DRAINED": str(self.op.drained),
2895
      }
2896
    nl = [self.cfg.GetMasterNode(),
2897
          self.op.node_name]
2898
    return env, nl, nl
2899

    
2900
  def CheckPrereq(self):
2901
    """Check prerequisites.
2902

2903
    This only checks the instance list against the existing names.
2904

2905
    """
2906
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2907

    
2908
    if ((self.op.master_candidate == False or self.op.offline == True or
2909
         self.op.drained == True) and node.master_candidate):
2910
      # we will demote the node from master_candidate
2911
      if self.op.node_name == self.cfg.GetMasterNode():
2912
        raise errors.OpPrereqError("The master node has to be a"
2913
                                   " master candidate, online and not drained")
2914
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2915
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2916
      if num_candidates <= cp_size:
2917
        msg = ("Not enough master candidates (desired"
2918
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2919
        if self.op.force:
2920
          self.LogWarning(msg)
2921
        else:
2922
          raise errors.OpPrereqError(msg)
2923

    
2924
    if (self.op.master_candidate == True and
2925
        ((node.offline and not self.op.offline == False) or
2926
         (node.drained and not self.op.drained == False))):
2927
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2928
                                 " to master_candidate" % node.name)
2929

    
2930
    return
2931

    
2932
  def Exec(self, feedback_fn):
2933
    """Modifies a node.
2934

2935
    """
2936
    node = self.node
2937

    
2938
    result = []
2939
    changed_mc = False
2940

    
2941
    if self.op.offline is not None:
2942
      node.offline = self.op.offline
2943
      result.append(("offline", str(self.op.offline)))
2944
      if self.op.offline == True:
2945
        if node.master_candidate:
2946
          node.master_candidate = False
2947
          changed_mc = True
2948
          result.append(("master_candidate", "auto-demotion due to offline"))
2949
        if node.drained:
2950
          node.drained = False
2951
          result.append(("drained", "clear drained status due to offline"))
2952

    
2953
    if self.op.master_candidate is not None:
2954
      node.master_candidate = self.op.master_candidate
2955
      changed_mc = True
2956
      result.append(("master_candidate", str(self.op.master_candidate)))
2957
      if self.op.master_candidate == False:
2958
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2959
        msg = rrc.fail_msg
2960
        if msg:
2961
          self.LogWarning("Node failed to demote itself: %s" % msg)
2962

    
2963
    if self.op.drained is not None:
2964
      node.drained = self.op.drained
2965
      result.append(("drained", str(self.op.drained)))
2966
      if self.op.drained == True:
2967
        if node.master_candidate:
2968
          node.master_candidate = False
2969
          changed_mc = True
2970
          result.append(("master_candidate", "auto-demotion due to drain"))
2971
          rrc = self.rpc.call_node_demote_from_mc(node.name)
2972
          msg = rrc.RemoteFailMsg()
2973
          if msg:
2974
            self.LogWarning("Node failed to demote itself: %s" % msg)
2975
        if node.offline:
2976
          node.offline = False
2977
          result.append(("offline", "clear offline status due to drain"))
2978

    
2979
    # this will trigger configuration file update, if needed
2980
    self.cfg.Update(node)
2981
    # this will trigger job queue propagation or cleanup
2982
    if changed_mc:
2983
      self.context.ReaddNode(node)
2984

    
2985
    return result
2986

    
2987

    
2988
class LUPowercycleNode(NoHooksLU):
2989
  """Powercycles a node.
2990

2991
  """
2992
  _OP_REQP = ["node_name", "force"]
2993
  REQ_BGL = False
2994

    
2995
  def CheckArguments(self):
2996
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2997
    if node_name is None:
2998
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2999
    self.op.node_name = node_name
3000
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
3001
      raise errors.OpPrereqError("The node is the master and the force"
3002
                                 " parameter was not set")
3003

    
3004
  def ExpandNames(self):
3005
    """Locking for PowercycleNode.
3006

3007
    This is a last-resort option and shouldn't block on other
3008
    jobs. Therefore, we grab no locks.
3009

3010
    """
3011
    self.needed_locks = {}
3012

    
3013
  def CheckPrereq(self):
3014
    """Check prerequisites.
3015

3016
    This LU has no prereqs.
3017

3018
    """
3019
    pass
3020

    
3021
  def Exec(self, feedback_fn):
3022
    """Reboots a node.
3023

3024
    """
3025
    result = self.rpc.call_node_powercycle(self.op.node_name,
3026
                                           self.cfg.GetHypervisorType())
3027
    result.Raise("Failed to schedule the reboot")
3028
    return result.payload
3029

    
3030

    
3031
class LUQueryClusterInfo(NoHooksLU):
3032
  """Query cluster configuration.
3033

3034
  """
3035
  _OP_REQP = []
3036
  REQ_BGL = False
3037

    
3038
  def ExpandNames(self):
3039
    self.needed_locks = {}
3040

    
3041
  def CheckPrereq(self):
3042
    """No prerequsites needed for this LU.
3043

3044
    """
3045
    pass
3046

    
3047
  def Exec(self, feedback_fn):
3048
    """Return cluster config.
3049

3050
    """
3051
    cluster = self.cfg.GetClusterInfo()
3052
    result = {
3053
      "software_version": constants.RELEASE_VERSION,
3054
      "protocol_version": constants.PROTOCOL_VERSION,
3055
      "config_version": constants.CONFIG_VERSION,
3056
      "os_api_version": max(constants.OS_API_VERSIONS),
3057
      "export_version": constants.EXPORT_VERSION,
3058
      "architecture": (platform.architecture()[0], platform.machine()),
3059
      "name": cluster.cluster_name,
3060
      "master": cluster.master_node,
3061
      "default_hypervisor": cluster.enabled_hypervisors[0],
3062
      "enabled_hypervisors": cluster.enabled_hypervisors,
3063
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3064
                        for hypervisor_name in cluster.enabled_hypervisors]),
3065
      "beparams": cluster.beparams,
3066
      "nicparams": cluster.nicparams,
3067
      "candidate_pool_size": cluster.candidate_pool_size,
3068
      "master_netdev": cluster.master_netdev,
3069
      "volume_group_name": cluster.volume_group_name,
3070
      "file_storage_dir": cluster.file_storage_dir,
3071
      "ctime": cluster.ctime,
3072
      "mtime": cluster.mtime,
3073
      }
3074

    
3075
    return result
3076

    
3077

    
3078
class LUQueryConfigValues(NoHooksLU):
3079
  """Return configuration values.
3080

3081
  """
3082
  _OP_REQP = []
3083
  REQ_BGL = False
3084
  _FIELDS_DYNAMIC = utils.FieldSet()
3085
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
3086

    
3087
  def ExpandNames(self):
3088
    self.needed_locks = {}
3089

    
3090
    _CheckOutputFields(static=self._FIELDS_STATIC,
3091
                       dynamic=self._FIELDS_DYNAMIC,
3092
                       selected=self.op.output_fields)
3093

    
3094
  def CheckPrereq(self):
3095
    """No prerequisites.
3096

3097
    """
3098
    pass
3099

    
3100
  def Exec(self, feedback_fn):
3101
    """Dump a representation of the cluster config to the standard output.
3102

3103
    """
3104
    values = []
3105
    for field in self.op.output_fields:
3106
      if field == "cluster_name":
3107
        entry = self.cfg.GetClusterName()
3108
      elif field == "master_node":
3109
        entry = self.cfg.GetMasterNode()
3110
      elif field == "drain_flag":
3111
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3112
      else:
3113
        raise errors.ParameterError(field)
3114
      values.append(entry)
3115
    return values
3116

    
3117

    
3118
class LUActivateInstanceDisks(NoHooksLU):
3119
  """Bring up an instance's disks.
3120

3121
  """
3122
  _OP_REQP = ["instance_name"]
3123
  REQ_BGL = False
3124

    
3125
  def ExpandNames(self):
3126
    self._ExpandAndLockInstance()
3127
    self.needed_locks[locking.LEVEL_NODE] = []
3128
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3129

    
3130
  def DeclareLocks(self, level):
3131
    if level == locking.LEVEL_NODE:
3132
      self._LockInstancesNodes()
3133

    
3134
  def CheckPrereq(self):
3135
    """Check prerequisites.
3136

3137
    This checks that the instance is in the cluster.
3138

3139
    """
3140
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3141
    assert self.instance is not None, \
3142
      "Cannot retrieve locked instance %s" % self.op.instance_name
3143
    _CheckNodeOnline(self, self.instance.primary_node)
3144
    if not hasattr(self.op, "ignore_size"):
3145
      self.op.ignore_size = False
3146

    
3147
  def Exec(self, feedback_fn):
3148
    """Activate the disks.
3149

3150
    """
3151
    disks_ok, disks_info = \
3152
              _AssembleInstanceDisks(self, self.instance,
3153
                                     ignore_size=self.op.ignore_size)
3154
    if not disks_ok:
3155
      raise errors.OpExecError("Cannot activate block devices")
3156

    
3157
    return disks_info
3158

    
3159

    
3160
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3161
                           ignore_size=False):
3162
  """Prepare the block devices for an instance.
3163

3164
  This sets up the block devices on all nodes.
3165

3166
  @type lu: L{LogicalUnit}
3167
  @param lu: the logical unit on whose behalf we execute
3168
  @type instance: L{objects.Instance}
3169
  @param instance: the instance for whose disks we assemble
3170
  @type ignore_secondaries: boolean
3171
  @param ignore_secondaries: if true, errors on secondary nodes
3172
      won't result in an error return from the function
3173
  @type ignore_size: boolean
3174
  @param ignore_size: if true, the current known size of the disk
3175
      will not be used during the disk activation, useful for cases
3176
      when the size is wrong
3177
  @return: False if the operation failed, otherwise a list of
3178
      (host, instance_visible_name, node_visible_name)
3179
      with the mapping from node devices to instance devices
3180

3181
  """
3182
  device_info = []
3183
  disks_ok = True
3184
  iname = instance.name
3185
  # With the two passes mechanism we try to reduce the window of
3186
  # opportunity for the race condition of switching DRBD to primary
3187
  # before handshaking occured, but we do not eliminate it
3188

    
3189
  # The proper fix would be to wait (with some limits) until the
3190
  # connection has been made and drbd transitions from WFConnection
3191
  # into any other network-connected state (Connected, SyncTarget,
3192
  # SyncSource, etc.)
3193

    
3194
  # 1st pass, assemble on all nodes in secondary mode
3195
  for inst_disk in instance.disks:
3196
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3197
      if ignore_size:
3198
        node_disk = node_disk.Copy()
3199
        node_disk.UnsetSize()
3200
      lu.cfg.SetDiskID(node_disk, node)
3201
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3202
      msg = result.fail_msg
3203
      if msg:
3204
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3205
                           " (is_primary=False, pass=1): %s",
3206
                           inst_disk.iv_name, node, msg)
3207
        if not ignore_secondaries:
3208
          disks_ok = False
3209

    
3210
  # FIXME: race condition on drbd migration to primary
3211

    
3212
  # 2nd pass, do only the primary node
3213
  for inst_disk in instance.disks:
3214
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3215
      if node != instance.primary_node:
3216
        continue
3217
      if ignore_size:
3218
        node_disk = node_disk.Copy()
3219
        node_disk.UnsetSize()
3220
      lu.cfg.SetDiskID(node_disk, node)
3221
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3222
      msg = result.fail_msg
3223
      if msg:
3224
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3225
                           " (is_primary=True, pass=2): %s",
3226
                           inst_disk.iv_name, node, msg)
3227
        disks_ok = False
3228
    device_info.append((instance.primary_node, inst_disk.iv_name,
3229
                        result.payload))
3230

    
3231
  # leave the disks configured for the primary node
3232
  # this is a workaround that would be fixed better by
3233
  # improving the logical/physical id handling
3234
  for disk in instance.disks:
3235
    lu.cfg.SetDiskID(disk, instance.primary_node)
3236

    
3237
  return disks_ok, device_info
3238

    
3239

    
3240
def _StartInstanceDisks(lu, instance, force):
3241
  """Start the disks of an instance.
3242

3243
  """
3244
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3245
                                           ignore_secondaries=force)
3246
  if not disks_ok:
3247
    _ShutdownInstanceDisks(lu, instance)
3248
    if force is not None and not force:
3249
      lu.proc.LogWarning("", hint="If the message above refers to a"
3250
                         " secondary node,"
3251
                         " you can retry the operation using '--force'.")
3252
    raise errors.OpExecError("Disk consistency error")
3253

    
3254

    
3255
class LUDeactivateInstanceDisks(NoHooksLU):
3256
  """Shutdown an instance's disks.
3257

3258
  """
3259
  _OP_REQP = ["instance_name"]
3260
  REQ_BGL = False
3261

    
3262
  def ExpandNames(self):
3263
    self._ExpandAndLockInstance()
3264
    self.needed_locks[locking.LEVEL_NODE] = []
3265
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3266

    
3267
  def DeclareLocks(self, level):
3268
    if level == locking.LEVEL_NODE:
3269
      self._LockInstancesNodes()
3270

    
3271
  def CheckPrereq(self):
3272
    """Check prerequisites.
3273

3274
    This checks that the instance is in the cluster.
3275

3276
    """
3277
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3278
    assert self.instance is not None, \
3279
      "Cannot retrieve locked instance %s" % self.op.instance_name
3280

    
3281
  def Exec(self, feedback_fn):
3282
    """Deactivate the disks
3283

3284
    """
3285
    instance = self.instance
3286
    _SafeShutdownInstanceDisks(self, instance)
3287

    
3288

    
3289
def _SafeShutdownInstanceDisks(lu, instance):
3290
  """Shutdown block devices of an instance.
3291

3292
  This function checks if an instance is running, before calling
3293
  _ShutdownInstanceDisks.
3294

3295
  """
3296
  pnode = instance.primary_node
3297
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3298
  ins_l.Raise("Can't contact node %s" % pnode)
3299

    
3300
  if instance.name in ins_l.payload:
3301
    raise errors.OpExecError("Instance is running, can't shutdown"
3302
                             " block devices.")
3303

    
3304
  _ShutdownInstanceDisks(lu, instance)
3305

    
3306

    
3307
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3308
  """Shutdown block devices of an instance.
3309

3310
  This does the shutdown on all nodes of the instance.
3311

3312
  If the ignore_primary is false, errors on the primary node are
3313
  ignored.
3314

3315
  """
3316
  all_result = True
3317
  for disk in instance.disks:
3318
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3319
      lu.cfg.SetDiskID(top_disk, node)
3320
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3321
      msg = result.fail_msg
3322
      if msg:
3323
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3324
                      disk.iv_name, node, msg)
3325
        if not ignore_primary or node != instance.primary_node:
3326
          all_result = False
3327
  return all_result
3328

    
3329

    
3330
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3331
  """Checks if a node has enough free memory.
3332

3333
  This function check if a given node has the needed amount of free
3334
  memory. In case the node has less memory or we cannot get the
3335
  information from the node, this function raise an OpPrereqError
3336
  exception.
3337

3338
  @type lu: C{LogicalUnit}
3339
  @param lu: a logical unit from which we get configuration data
3340
  @type node: C{str}
3341
  @param node: the node to check
3342
  @type reason: C{str}
3343
  @param reason: string to use in the error message
3344
  @type requested: C{int}
3345
  @param requested: the amount of memory in MiB to check for
3346
  @type hypervisor_name: C{str}
3347
  @param hypervisor_name: the hypervisor to ask for memory stats
3348
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3349
      we cannot check the node
3350

3351
  """
3352
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3353
  nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3354
  free_mem = nodeinfo[node].payload.get('memory_free', None)
3355
  if not isinstance(free_mem, int):
3356
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3357
                               " was '%s'" % (node, free_mem))
3358
  if requested > free_mem:
3359
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3360
                               " needed %s MiB, available %s MiB" %
3361
                               (node, reason, requested, free_mem))
3362

    
3363

    
3364
class LUStartupInstance(LogicalUnit):
3365
  """Starts an instance.
3366

3367
  """
3368
  HPATH = "instance-start"
3369
  HTYPE = constants.HTYPE_INSTANCE
3370
  _OP_REQP = ["instance_name", "force"]
3371
  REQ_BGL = False
3372

    
3373
  def ExpandNames(self):
3374
    self._ExpandAndLockInstance()
3375

    
3376
  def BuildHooksEnv(self):
3377
    """Build hooks env.
3378

3379
    This runs on master, primary and secondary nodes of the instance.
3380

3381
    """
3382
    env = {
3383
      "FORCE": self.op.force,
3384
      }
3385
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3386
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3387
    return env, nl, nl
3388

    
3389
  def CheckPrereq(self):
3390
    """Check prerequisites.
3391

3392
    This checks that the instance is in the cluster.
3393

3394
    """
3395
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3396
    assert self.instance is not None, \
3397
      "Cannot retrieve locked instance %s" % self.op.instance_name
3398

    
3399
    # extra beparams
3400
    self.beparams = getattr(self.op, "beparams", {})
3401
    if self.beparams:
3402
      if not isinstance(self.beparams, dict):
3403
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3404
                                   " dict" % (type(self.beparams), ))
3405
      # fill the beparams dict
3406
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3407
      self.op.beparams = self.beparams
3408

    
3409
    # extra hvparams
3410
    self.hvparams = getattr(self.op, "hvparams", {})
3411
    if self.hvparams:
3412
      if not isinstance(self.hvparams, dict):
3413
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3414
                                   " dict" % (type(self.hvparams), ))
3415

    
3416
      # check hypervisor parameter syntax (locally)
3417
      cluster = self.cfg.GetClusterInfo()
3418
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3419
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3420
                                    instance.hvparams)
3421
      filled_hvp.update(self.hvparams)
3422
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3423
      hv_type.CheckParameterSyntax(filled_hvp)
3424
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3425
      self.op.hvparams = self.hvparams
3426

    
3427
    _CheckNodeOnline(self, instance.primary_node)
3428

    
3429
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3430
    # check bridges existence
3431
    _CheckInstanceBridgesExist(self, instance)
3432

    
3433
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3434
                                              instance.name,
3435
                                              instance.hypervisor)
3436
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3437
                      prereq=True)
3438
    if not remote_info.payload: # not running already
3439
      _CheckNodeFreeMemory(self, instance.primary_node,
3440
                           "starting instance %s" % instance.name,
3441
                           bep[constants.BE_MEMORY], instance.hypervisor)
3442

    
3443
  def Exec(self, feedback_fn):
3444
    """Start the instance.
3445

3446
    """
3447
    instance = self.instance
3448
    force = self.op.force
3449

    
3450
    self.cfg.MarkInstanceUp(instance.name)
3451

    
3452
    node_current = instance.primary_node
3453

    
3454
    _StartInstanceDisks(self, instance, force)
3455

    
3456
    result = self.rpc.call_instance_start(node_current, instance,
3457
                                          self.hvparams, self.beparams)
3458
    msg = result.fail_msg
3459
    if msg:
3460
      _ShutdownInstanceDisks(self, instance)
3461
      raise errors.OpExecError("Could not start instance: %s" % msg)
3462

    
3463

    
3464
class LURebootInstance(LogicalUnit):
3465
  """Reboot an instance.
3466

3467
  """
3468
  HPATH = "instance-reboot"
3469
  HTYPE = constants.HTYPE_INSTANCE
3470
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3471
  REQ_BGL = False
3472

    
3473
  def ExpandNames(self):
3474
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3475
                                   constants.INSTANCE_REBOOT_HARD,
3476
                                   constants.INSTANCE_REBOOT_FULL]:
3477
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3478
                                  (constants.INSTANCE_REBOOT_SOFT,
3479
                                   constants.INSTANCE_REBOOT_HARD,
3480
                                   constants.INSTANCE_REBOOT_FULL))
3481
    self._ExpandAndLockInstance()
3482

    
3483
  def BuildHooksEnv(self):
3484
    """Build hooks env.
3485

3486
    This runs on master, primary and secondary nodes of the instance.
3487

3488
    """
3489
    env = {
3490
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3491
      "REBOOT_TYPE": self.op.reboot_type,
3492
      }
3493
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3494
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3495
    return env, nl, nl
3496

    
3497
  def CheckPrereq(self):
3498
    """Check prerequisites.
3499

3500
    This checks that the instance is in the cluster.
3501

3502
    """
3503
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3504
    assert self.instance is not None, \
3505
      "Cannot retrieve locked instance %s" % self.op.instance_name
3506

    
3507
    _CheckNodeOnline(self, instance.primary_node)
3508

    
3509
    # check bridges existence
3510
    _CheckInstanceBridgesExist(self, instance)
3511

    
3512
  def Exec(self, feedback_fn):
3513
    """Reboot the instance.
3514

3515
    """
3516
    instance = self.instance
3517
    ignore_secondaries = self.op.ignore_secondaries
3518
    reboot_type = self.op.reboot_type
3519

    
3520
    node_current = instance.primary_node
3521

    
3522
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3523
                       constants.INSTANCE_REBOOT_HARD]:
3524
      for disk in instance.disks:
3525
        self.cfg.SetDiskID(disk, node_current)
3526
      result = self.rpc.call_instance_reboot(node_current, instance,
3527
                                             reboot_type)
3528
      result.Raise("Could not reboot instance")
3529
    else:
3530
      result = self.rpc.call_instance_shutdown(node_current, instance)
3531
      result.Raise("Could not shutdown instance for full reboot")
3532
      _ShutdownInstanceDisks(self, instance)
3533
      _StartInstanceDisks(self, instance, ignore_secondaries)
3534
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3535
      msg = result.fail_msg
3536
      if msg:
3537
        _ShutdownInstanceDisks(self, instance)
3538
        raise errors.OpExecError("Could not start instance for"
3539
                                 " full reboot: %s" % msg)
3540

    
3541
    self.cfg.MarkInstanceUp(instance.name)
3542

    
3543

    
3544
class LUShutdownInstance(LogicalUnit):
3545
  """Shutdown an instance.
3546

3547
  """
3548
  HPATH = "instance-stop"
3549
  HTYPE = constants.HTYPE_INSTANCE
3550
  _OP_REQP = ["instance_name"]
3551
  REQ_BGL = False
3552

    
3553
  def ExpandNames(self):
3554
    self._ExpandAndLockInstance()
3555

    
3556
  def BuildHooksEnv(self):
3557
    """Build hooks env.
3558

3559
    This runs on master, primary and secondary nodes of the instance.
3560

3561
    """
3562
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3563
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3564
    return env, nl, nl
3565

    
3566
  def CheckPrereq(self):
3567
    """Check prerequisites.
3568

3569
    This checks that the instance is in the cluster.
3570

3571
    """
3572
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3573
    assert self.instance is not None, \
3574
      "Cannot retrieve locked instance %s" % self.op.instance_name
3575
    _CheckNodeOnline(self, self.instance.primary_node)
3576

    
3577
  def Exec(self, feedback_fn):
3578
    """Shutdown the instance.
3579

3580
    """
3581
    instance = self.instance
3582
    node_current = instance.primary_node
3583
    self.cfg.MarkInstanceDown(instance.name)
3584
    result = self.rpc.call_instance_shutdown(node_current, instance)
3585
    msg = result.fail_msg
3586
    if msg:
3587
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3588

    
3589
    _ShutdownInstanceDisks(self, instance)
3590

    
3591

    
3592
class LUReinstallInstance(LogicalUnit):
3593
  """Reinstall an instance.
3594

3595
  """
3596
  HPATH = "instance-reinstall"
3597
  HTYPE = constants.HTYPE_INSTANCE
3598
  _OP_REQP = ["instance_name"]
3599
  REQ_BGL = False
3600

    
3601
  def ExpandNames(self):
3602
    self._ExpandAndLockInstance()
3603

    
3604
  def BuildHooksEnv(self):
3605
    """Build hooks env.
3606

3607
    This runs on master, primary and secondary nodes of the instance.
3608

3609
    """
3610
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3611
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3612
    return env, nl, nl
3613

    
3614
  def CheckPrereq(self):
3615
    """Check prerequisites.
3616

3617
    This checks that the instance is in the cluster and is not running.
3618

3619
    """
3620
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3621
    assert instance is not None, \
3622
      "Cannot retrieve locked instance %s" % self.op.instance_name
3623
    _CheckNodeOnline(self, instance.primary_node)
3624

    
3625
    if instance.disk_template == constants.DT_DISKLESS:
3626
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3627
                                 self.op.instance_name)
3628
    if instance.admin_up:
3629
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3630
                                 self.op.instance_name)
3631
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3632
                                              instance.name,
3633
                                              instance.hypervisor)
3634
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3635
                      prereq=True)
3636
    if remote_info.payload:
3637
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3638
                                 (self.op.instance_name,
3639
                                  instance.primary_node))
3640

    
3641
    self.op.os_type = getattr(self.op, "os_type", None)
3642
    if self.op.os_type is not None:
3643
      # OS verification
3644
      pnode = self.cfg.GetNodeInfo(
3645
        self.cfg.ExpandNodeName(instance.primary_node))
3646
      if pnode is None:
3647
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3648
                                   self.op.pnode)
3649
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3650
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3651
                   (self.op.os_type, pnode.name), prereq=True)
3652

    
3653
    self.instance = instance
3654

    
3655
  def Exec(self, feedback_fn):
3656
    """Reinstall the instance.
3657

3658
    """
3659
    inst = self.instance
3660

    
3661
    if self.op.os_type is not None:
3662
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3663
      inst.os = self.op.os_type
3664
      self.cfg.Update(inst)
3665

    
3666
    _StartInstanceDisks(self, inst, None)
3667
    try:
3668
      feedback_fn("Running the instance OS create scripts...")
3669
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3670
      result.Raise("Could not install OS for instance %s on node %s" %
3671
                   (inst.name, inst.primary_node))
3672
    finally:
3673
      _ShutdownInstanceDisks(self, inst)
3674

    
3675

    
3676
class LURecreateInstanceDisks(LogicalUnit):
3677
  """Recreate an instance's missing disks.
3678

3679
  """
3680
  HPATH = "instance-recreate-disks"
3681
  HTYPE = constants.HTYPE_INSTANCE
3682
  _OP_REQP = ["instance_name", "disks"]
3683
  REQ_BGL = False
3684

    
3685
  def CheckArguments(self):
3686
    """Check the arguments.
3687

3688
    """
3689
    if not isinstance(self.op.disks, list):
3690
      raise errors.OpPrereqError("Invalid disks parameter")
3691
    for item in self.op.disks:
3692
      if (not isinstance(item, int) or
3693
          item < 0):
3694
        raise errors.OpPrereqError("Invalid disk specification '%s'" %
3695
                                   str(item))
3696

    
3697
  def ExpandNames(self):
3698
    self._ExpandAndLockInstance()
3699

    
3700
  def BuildHooksEnv(self):
3701
    """Build hooks env.
3702

3703
    This runs on master, primary and secondary nodes of the instance.
3704

3705
    """
3706
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3707
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3708
    return env, nl, nl
3709

    
3710
  def CheckPrereq(self):
3711
    """Check prerequisites.
3712

3713
    This checks that the instance is in the cluster and is not running.
3714

3715
    """
3716
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3717
    assert instance is not None, \
3718
      "Cannot retrieve locked instance %s" % self.op.instance_name
3719
    _CheckNodeOnline(self, instance.primary_node)
3720

    
3721
    if instance.disk_template == constants.DT_DISKLESS:
3722
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3723
                                 self.op.instance_name)
3724
    if instance.admin_up:
3725
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3726
                                 self.op.instance_name)
3727
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3728
                                              instance.name,
3729
                                              instance.hypervisor)
3730
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3731
                      prereq=True)
3732
    if remote_info.payload:
3733
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3734
                                 (self.op.instance_name,
3735
                                  instance.primary_node))
3736

    
3737
    if not self.op.disks:
3738
      self.op.disks = range(len(instance.disks))
3739
    else:
3740
      for idx in self.op.disks:
3741
        if idx >= len(instance.disks):
3742
          raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3743

    
3744
    self.instance = instance
3745

    
3746
  def Exec(self, feedback_fn):
3747
    """Recreate the disks.
3748

3749
    """
3750
    to_skip = []
3751
    for idx, disk in enumerate(self.instance.disks):
3752
      if idx not in self.op.disks: # disk idx has not been passed in
3753
        to_skip.append(idx)
3754
        continue
3755

    
3756
    _CreateDisks(self, self.instance, to_skip=to_skip)
3757

    
3758

    
3759
class LURenameInstance(LogicalUnit):
3760
  """Rename an instance.
3761

3762
  """
3763
  HPATH = "instance-rename"
3764
  HTYPE = constants.HTYPE_INSTANCE
3765
  _OP_REQP = ["instance_name", "new_name"]
3766

    
3767
  def BuildHooksEnv(self):
3768
    """Build hooks env.
3769

3770
    This runs on master, primary and secondary nodes of the instance.
3771

3772
    """
3773
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3774
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3775
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3776
    return env, nl, nl
3777

    
3778
  def CheckPrereq(self):
3779
    """Check prerequisites.
3780

3781
    This checks that the instance is in the cluster and is not running.
3782

3783
    """
3784
    instance = self.cfg.GetInstanceInfo(
3785
      self.cfg.ExpandInstanceName(self.op.instance_name))
3786
    if instance is None:
3787
      raise errors.OpPrereqError("Instance '%s' not known" %
3788
                                 self.op.instance_name)
3789
    _CheckNodeOnline(self, instance.primary_node)
3790

    
3791
    if instance.admin_up:
3792
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3793
                                 self.op.instance_name)
3794
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3795
                                              instance.name,
3796
                                              instance.hypervisor)
3797
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3798
                      prereq=True)
3799
    if remote_info.payload:
3800
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3801
                                 (self.op.instance_name,
3802
                                  instance.primary_node))
3803
    self.instance = instance
3804

    
3805
    # new name verification
3806
    name_info = utils.HostInfo(self.op.new_name)
3807

    
3808
    self.op.new_name = new_name = name_info.name
3809
    instance_list = self.cfg.GetInstanceList()
3810
    if new_name in instance_list:
3811
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3812
                                 new_name)
3813

    
3814
    if not getattr(self.op, "ignore_ip", False):
3815
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3816
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3817
                                   (name_info.ip, new_name))
3818

    
3819

    
3820
  def Exec(self, feedback_fn):
3821
    """Reinstall the instance.
3822

3823
    """
3824
    inst = self.instance
3825
    old_name = inst.name
3826

    
3827
    if inst.disk_template == constants.DT_FILE:
3828
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3829

    
3830
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3831
    # Change the instance lock. This is definitely safe while we hold the BGL
3832
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3833
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3834

    
3835
    # re-read the instance from the configuration after rename
3836
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3837

    
3838
    if inst.disk_template == constants.DT_FILE:
3839
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3840
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3841
                                                     old_file_storage_dir,
3842
                                                     new_file_storage_dir)
3843
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
3844
                   " (but the instance has been renamed in Ganeti)" %
3845
                   (inst.primary_node, old_file_storage_dir,
3846
                    new_file_storage_dir))
3847

    
3848
    _StartInstanceDisks(self, inst, None)
3849
    try:
3850
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3851
                                                 old_name)
3852
      msg = result.fail_msg
3853
      if msg:
3854
        msg = ("Could not run OS rename script for instance %s on node %s"
3855
               " (but the instance has been renamed in Ganeti): %s" %
3856
               (inst.name, inst.primary_node, msg))
3857
        self.proc.LogWarning(msg)
3858
    finally:
3859
      _ShutdownInstanceDisks(self, inst)
3860

    
3861

    
3862
class LURemoveInstance(LogicalUnit):
3863
  """Remove an instance.
3864

3865
  """
3866
  HPATH = "instance-remove"
3867
  HTYPE = constants.HTYPE_INSTANCE
3868
  _OP_REQP = ["instance_name", "ignore_failures"]
3869
  REQ_BGL = False
3870

    
3871
  def ExpandNames(self):
3872
    self._ExpandAndLockInstance()
3873
    self.needed_locks[locking.LEVEL_NODE] = []
3874
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3875

    
3876
  def DeclareLocks(self, level):
3877
    if level == locking.LEVEL_NODE:
3878
      self._LockInstancesNodes()
3879

    
3880
  def BuildHooksEnv(self):
3881
    """Build hooks env.
3882

3883
    This runs on master, primary and secondary nodes of the instance.
3884

3885
    """
3886
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3887
    nl = [self.cfg.GetMasterNode()]
3888
    return env, nl, nl
3889

    
3890
  def CheckPrereq(self):
3891
    """Check prerequisites.
3892

3893
    This checks that the instance is in the cluster.
3894

3895
    """
3896
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3897
    assert self.instance is not None, \
3898
      "Cannot retrieve locked instance %s" % self.op.instance_name
3899

    
3900
  def Exec(self, feedback_fn):
3901
    """Remove the instance.
3902

3903
    """
3904
    instance = self.instance
3905
    logging.info("Shutting down instance %s on node %s",
3906
                 instance.name, instance.primary_node)
3907

    
3908
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3909
    msg = result.fail_msg
3910
    if msg:
3911
      if self.op.ignore_failures:
3912
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3913
      else:
3914
        raise errors.OpExecError("Could not shutdown instance %s on"
3915
                                 " node %s: %s" %
3916
                                 (instance.name, instance.primary_node, msg))
3917

    
3918
    logging.info("Removing block devices for instance %s", instance.name)
3919

    
3920
    if not _RemoveDisks(self, instance):
3921
      if self.op.ignore_failures:
3922
        feedback_fn("Warning: can't remove instance's disks")
3923
      else:
3924
        raise errors.OpExecError("Can't remove instance's disks")
3925

    
3926
    logging.info("Removing instance %s out of cluster config", instance.name)
3927

    
3928
    self.cfg.RemoveInstance(instance.name)
3929
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3930

    
3931

    
3932
class LUQueryInstances(NoHooksLU):
3933
  """Logical unit for querying instances.
3934

3935
  """
3936
  _OP_REQP = ["output_fields", "names", "use_locking"]
3937
  REQ_BGL = False
3938
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3939
                                    "admin_state",
3940
                                    "disk_template", "ip", "mac", "bridge",
3941
                                    "nic_mode", "nic_link",
3942
                                    "sda_size", "sdb_size", "vcpus", "tags",
3943
                                    "network_port", "beparams",
3944
                                    r"(disk)\.(size)/([0-9]+)",
3945
                                    r"(disk)\.(sizes)", "disk_usage",
3946
                                    r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3947
                                    r"(nic)\.(bridge)/([0-9]+)",
3948
                                    r"(nic)\.(macs|ips|modes|links|bridges)",
3949
                                    r"(disk|nic)\.(count)",
3950
                                    "serial_no", "hypervisor", "hvparams",
3951
                                    "ctime", "mtime",
3952
                                    ] +
3953
                                  ["hv/%s" % name
3954
                                   for name in constants.HVS_PARAMETERS] +
3955
                                  ["be/%s" % name
3956
                                   for name in constants.BES_PARAMETERS])
3957
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3958

    
3959

    
3960
  def ExpandNames(self):
3961
    _CheckOutputFields(static=self._FIELDS_STATIC,
3962
                       dynamic=self._FIELDS_DYNAMIC,
3963
                       selected=self.op.output_fields)
3964

    
3965
    self.needed_locks = {}
3966
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3967
    self.share_locks[locking.LEVEL_NODE] = 1
3968

    
3969
    if self.op.names:
3970
      self.wanted = _GetWantedInstances(self, self.op.names)
3971
    else:
3972
      self.wanted = locking.ALL_SET
3973

    
3974
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3975
    self.do_locking = self.do_node_query and self.op.use_locking
3976
    if self.do_locking:
3977
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3978
      self.needed_locks[locking.LEVEL_NODE] = []
3979
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3980

    
3981
  def DeclareLocks(self, level):
3982
    if level == locking.LEVEL_NODE and self.do_locking:
3983
      self._LockInstancesNodes()
3984

    
3985
  def CheckPrereq(self):
3986
    """Check prerequisites.
3987

3988
    """
3989
    pass
3990

    
3991
  def Exec(self, feedback_fn):
3992
    """Computes the list of nodes and their attributes.
3993

3994
    """
3995
    all_info = self.cfg.GetAllInstancesInfo()
3996
    if self.wanted == locking.ALL_SET:
3997
      # caller didn't specify instance names, so ordering is not important
3998
      if self.do_locking:
3999
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4000
      else:
4001
        instance_names = all_info.keys()
4002
      instance_names = utils.