Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 56aa9fd5

History | View | Annotate | Download (215.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = lu.cfg.GetInstanceList()
396
  return utils.NiceSort(wanted)
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419
                          memory, vcpus, nics):
420
  """Builds instance related env variables for hooks
421

422
  This builds the hook environment from individual variables.
423

424
  @type name: string
425
  @param name: the name of the instance
426
  @type primary_node: string
427
  @param primary_node: the name of the instance's primary node
428
  @type secondary_nodes: list
429
  @param secondary_nodes: list of secondary nodes as strings
430
  @type os_type: string
431
  @param os_type: the name of the instance's OS
432
  @type status: string
433
  @param status: the desired status of the instances
434
  @type memory: string
435
  @param memory: the memory size of the instance
436
  @type vcpus: string
437
  @param vcpus: the count of VCPUs the instance has
438
  @type nics: list
439
  @param nics: list of tuples (ip, bridge, mac) representing
440
      the NICs the instance  has
441
  @rtype: dict
442
  @return: the hook environment for this instance
443

444
  """
445
  env = {
446
    "OP_TARGET": name,
447
    "INSTANCE_NAME": name,
448
    "INSTANCE_PRIMARY": primary_node,
449
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450
    "INSTANCE_OS_TYPE": os_type,
451
    "INSTANCE_STATUS": status,
452
    "INSTANCE_MEMORY": memory,
453
    "INSTANCE_VCPUS": vcpus,
454
  }
455

    
456
  if nics:
457
    nic_count = len(nics)
458
    for idx, (ip, bridge, mac) in enumerate(nics):
459
      if ip is None:
460
        ip = ""
461
      env["INSTANCE_NIC%d_IP" % idx] = ip
462
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
464
  else:
465
    nic_count = 0
466

    
467
  env["INSTANCE_NIC_COUNT"] = nic_count
468

    
469
  return env
470

    
471

    
472
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473
  """Builds instance related env variables for hooks from an object.
474

475
  @type lu: L{LogicalUnit}
476
  @param lu: the logical unit on whose behalf we execute
477
  @type instance: L{objects.Instance}
478
  @param instance: the instance for which we should build the
479
      environment
480
  @type override: dict
481
  @param override: dictionary with key/values that will override
482
      our values
483
  @rtype: dict
484
  @return: the hook environment dictionary
485

486
  """
487
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
488
  args = {
489
    'name': instance.name,
490
    'primary_node': instance.primary_node,
491
    'secondary_nodes': instance.secondary_nodes,
492
    'os_type': instance.os,
493
    'status': instance.os,
494
    'memory': bep[constants.BE_MEMORY],
495
    'vcpus': bep[constants.BE_VCPUS],
496
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
497
  }
498
  if override:
499
    args.update(override)
500
  return _BuildInstanceHookEnv(**args)
501

    
502

    
503
def _CheckInstanceBridgesExist(lu, instance):
504
  """Check that the brigdes needed by an instance exist.
505

506
  """
507
  # check bridges existance
508
  brlist = [nic.bridge for nic in instance.nics]
509
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
510
  result.Raise()
511
  if not result.data:
512
    raise errors.OpPrereqError("One or more target bridges %s does not"
513
                               " exist on destination node '%s'" %
514
                               (brlist, instance.primary_node))
515

    
516

    
517
class LUDestroyCluster(NoHooksLU):
518
  """Logical unit for destroying the cluster.
519

520
  """
521
  _OP_REQP = []
522

    
523
  def CheckPrereq(self):
524
    """Check prerequisites.
525

526
    This checks whether the cluster is empty.
527

528
    Any errors are signalled by raising errors.OpPrereqError.
529

530
    """
531
    master = self.cfg.GetMasterNode()
532

    
533
    nodelist = self.cfg.GetNodeList()
534
    if len(nodelist) != 1 or nodelist[0] != master:
535
      raise errors.OpPrereqError("There are still %d node(s) in"
536
                                 " this cluster." % (len(nodelist) - 1))
537
    instancelist = self.cfg.GetInstanceList()
538
    if instancelist:
539
      raise errors.OpPrereqError("There are still %d instance(s) in"
540
                                 " this cluster." % len(instancelist))
541

    
542
  def Exec(self, feedback_fn):
543
    """Destroys the cluster.
544

545
    """
546
    master = self.cfg.GetMasterNode()
547
    result = self.rpc.call_node_stop_master(master, False)
548
    result.Raise()
549
    if not result.data:
550
      raise errors.OpExecError("Could not disable the master role")
551
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552
    utils.CreateBackup(priv_key)
553
    utils.CreateBackup(pub_key)
554
    return master
555

    
556

    
557
class LUVerifyCluster(LogicalUnit):
558
  """Verifies the cluster status.
559

560
  """
561
  HPATH = "cluster-verify"
562
  HTYPE = constants.HTYPE_CLUSTER
563
  _OP_REQP = ["skip_checks"]
564
  REQ_BGL = False
565

    
566
  def ExpandNames(self):
567
    self.needed_locks = {
568
      locking.LEVEL_NODE: locking.ALL_SET,
569
      locking.LEVEL_INSTANCE: locking.ALL_SET,
570
    }
571
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
572

    
573
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574
                  node_result, feedback_fn, master_files):
575
    """Run multiple tests against a node.
576

577
    Test list:
578

579
      - compares ganeti version
580
      - checks vg existance and size > 20G
581
      - checks config file checksum
582
      - checks ssh to other nodes
583

584
    @type nodeinfo: L{objects.Node}
585
    @param nodeinfo: the node to check
586
    @param file_list: required list of files
587
    @param local_cksum: dictionary of local files and their checksums
588
    @param node_result: the results from the node
589
    @param feedback_fn: function used to accumulate results
590
    @param master_files: list of files that only masters should have
591

592
    """
593
    node = nodeinfo.name
594

    
595
    # main result, node_result should be a non-empty dict
596
    if not node_result or not isinstance(node_result, dict):
597
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
598
      return True
599

    
600
    # compares ganeti version
601
    local_version = constants.PROTOCOL_VERSION
602
    remote_version = node_result.get('version', None)
603
    if not remote_version:
604
      feedback_fn("  - ERROR: connection to %s failed" % (node))
605
      return True
606

    
607
    if local_version != remote_version:
608
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
609
                      (local_version, node, remote_version))
610
      return True
611

    
612
    # checks vg existance and size > 20G
613

    
614
    bad = False
615
    vglist = node_result.get(constants.NV_VGLIST, None)
616
    if not vglist:
617
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
618
                      (node,))
619
      bad = True
620
    else:
621
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622
                                            constants.MIN_VG_SIZE)
623
      if vgstatus:
624
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
625
        bad = True
626

    
627
    # checks config file checksum
628

    
629
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
630
    if not isinstance(remote_cksum, dict):
631
      bad = True
632
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
633
    else:
634
      for file_name in file_list:
635
        node_is_mc = nodeinfo.master_candidate
636
        must_have_file = file_name not in master_files
637
        if file_name not in remote_cksum:
638
          if node_is_mc or must_have_file:
639
            bad = True
640
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
641
        elif remote_cksum[file_name] != local_cksum[file_name]:
642
          if node_is_mc or must_have_file:
643
            bad = True
644
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
645
          else:
646
            # not candidate and this is not a must-have file
647
            bad = True
648
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
649
                        " '%s'" % file_name)
650
        else:
651
          # all good, except non-master/non-must have combination
652
          if not node_is_mc and not must_have_file:
653
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
654
                        " candidates" % file_name)
655

    
656
    # checks ssh to any
657

    
658
    if constants.NV_NODELIST not in node_result:
659
      bad = True
660
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
661
    else:
662
      if node_result[constants.NV_NODELIST]:
663
        bad = True
664
        for node in node_result[constants.NV_NODELIST]:
665
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
666
                          (node, node_result[constants.NV_NODELIST][node]))
667

    
668
    if constants.NV_NODENETTEST not in node_result:
669
      bad = True
670
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
671
    else:
672
      if node_result[constants.NV_NODENETTEST]:
673
        bad = True
674
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
675
        for node in nlist:
676
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
677
                          (node, node_result[constants.NV_NODENETTEST][node]))
678

    
679
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680
    if isinstance(hyp_result, dict):
681
      for hv_name, hv_result in hyp_result.iteritems():
682
        if hv_result is not None:
683
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
684
                      (hv_name, hv_result))
685
    return bad
686

    
687
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688
                      node_instance, feedback_fn):
689
    """Verify an instance.
690

691
    This function checks to see if the required block devices are
692
    available on the instance's node.
693

694
    """
695
    bad = False
696

    
697
    node_current = instanceconfig.primary_node
698

    
699
    node_vol_should = {}
700
    instanceconfig.MapLVsByNode(node_vol_should)
701

    
702
    for node in node_vol_should:
703
      for volume in node_vol_should[node]:
704
        if node not in node_vol_is or volume not in node_vol_is[node]:
705
          feedback_fn("  - ERROR: volume %s missing on node %s" %
706
                          (volume, node))
707
          bad = True
708

    
709
    if not instanceconfig.status == 'down':
710
      if (node_current not in node_instance or
711
          not instance in node_instance[node_current]):
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758
    """Verify N+1 Memory Resilience.
759

760
    Check that if one single node dies we can still start all the instances it
761
    was primary for.
762

763
    """
764
    bad = False
765

    
766
    for node, nodeinfo in node_info.iteritems():
767
      # This code checks that every node which is now listed as secondary has
768
      # enough memory to host all instances it is supposed to should a single
769
      # other node in the cluster fail.
770
      # FIXME: not ready for failover to an arbitrary node
771
      # FIXME: does not support file-backed instances
772
      # WARNING: we currently take into account down instances as well as up
773
      # ones, considering that even if they're down someone might want to start
774
      # them even in the event of a node failure.
775
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776
        needed_mem = 0
777
        for instance in instances:
778
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779
          if bep[constants.BE_AUTO_BALANCE]:
780
            needed_mem += bep[constants.BE_MEMORY]
781
        if nodeinfo['mfree'] < needed_mem:
782
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
783
                      " failovers should node %s fail" % (node, prinode))
784
          bad = True
785
    return bad
786

    
787
  def CheckPrereq(self):
788
    """Check prerequisites.
789

790
    Transform the list of checks we're going to skip into a set and check that
791
    all its members are valid.
792

793
    """
794
    self.skip_set = frozenset(self.op.skip_checks)
795
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
797

    
798
  def BuildHooksEnv(self):
799
    """Build hooks env.
800

801
    Cluster-Verify hooks just rone in the post phase and their failure makes
802
    the output be logged in the verify output and the verification to fail.
803

804
    """
805
    all_nodes = self.cfg.GetNodeList()
806
    # TODO: populate the environment with useful information for verify hooks
807
    env = {}
808
    return env, [], all_nodes
809

    
810
  def Exec(self, feedback_fn):
811
    """Verify integrity of cluster, performing various test on nodes.
812

813
    """
814
    bad = False
815
    feedback_fn("* Verifying global settings")
816
    for msg in self.cfg.VerifyConfig():
817
      feedback_fn("  - ERROR: %s" % msg)
818

    
819
    vg_name = self.cfg.GetVGName()
820
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
822
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824
    i_non_redundant = [] # Non redundant instances
825
    i_non_a_balanced = [] # Non auto-balanced instances
826
    node_volume = {}
827
    node_instance = {}
828
    node_info = {}
829
    instance_cfg = {}
830

    
831
    # FIXME: verify OS list
832
    # do local checksums
833
    master_files = [constants.CLUSTER_CONF_FILE]
834

    
835
    file_names = ssconf.SimpleStore().GetFileList()
836
    file_names.append(constants.SSL_CERT_FILE)
837
    file_names.extend(master_files)
838

    
839
    local_checksums = utils.FingerprintFiles(file_names)
840

    
841
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842
    node_verify_param = {
843
      constants.NV_FILELIST: file_names,
844
      constants.NV_NODELIST: nodelist,
845
      constants.NV_HYPERVISOR: hypervisors,
846
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847
                                  node.secondary_ip) for node in nodeinfo],
848
      constants.NV_LVLIST: vg_name,
849
      constants.NV_INSTANCELIST: hypervisors,
850
      constants.NV_VGLIST: None,
851
      constants.NV_VERSION: None,
852
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
853
      }
854
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855
                                           self.cfg.GetClusterName())
856

    
857
    cluster = self.cfg.GetClusterInfo()
858
    master_node = self.cfg.GetMasterNode()
859
    for node_i in nodeinfo:
860
      node = node_i.name
861
      nresult = all_nvinfo[node].data
862

    
863
      if node == master_node:
864
        ntype = "master"
865
      elif node_i.master_candidate:
866
        ntype = "master candidate"
867
      else:
868
        ntype = "regular"
869
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
870

    
871
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
872
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
873
        bad = True
874
        continue
875

    
876
      result = self._VerifyNode(node_i, file_names, local_checksums,
877
                                nresult, feedback_fn, master_files)
878
      bad = bad or result
879

    
880
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881
      if isinstance(lvdata, basestring):
882
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
883
                    (node, lvdata.encode('string_escape')))
884
        bad = True
885
        node_volume[node] = {}
886
      elif not isinstance(lvdata, dict):
887
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
888
        bad = True
889
        continue
890
      else:
891
        node_volume[node] = lvdata
892

    
893
      # node_instance
894
      idata = nresult.get(constants.NV_INSTANCELIST, None)
895
      if not isinstance(idata, list):
896
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
897
                    (node,))
898
        bad = True
899
        continue
900

    
901
      node_instance[node] = idata
902

    
903
      # node_info
904
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
905
      if not isinstance(nodeinfo, dict):
906
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
907
        bad = True
908
        continue
909

    
910
      try:
911
        node_info[node] = {
912
          "mfree": int(nodeinfo['memory_free']),
913
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
914
          "pinst": [],
915
          "sinst": [],
916
          # dictionary holding all instances this node is secondary for,
917
          # grouped by their primary node. Each key is a cluster node, and each
918
          # value is a list of instances which have the key as primary and the
919
          # current node as secondary.  this is handy to calculate N+1 memory
920
          # availability if you can only failover from a primary to its
921
          # secondary.
922
          "sinst-by-pnode": {},
923
        }
924
      except ValueError:
925
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926
        bad = True
927
        continue
928

    
929
    node_vol_should = {}
930

    
931
    for instance in instancelist:
932
      feedback_fn("* Verifying instance %s" % instance)
933
      inst_config = self.cfg.GetInstanceInfo(instance)
934
      result =  self._VerifyInstance(instance, inst_config, node_volume,
935
                                     node_instance, feedback_fn)
936
      bad = bad or result
937

    
938
      inst_config.MapLVsByNode(node_vol_should)
939

    
940
      instance_cfg[instance] = inst_config
941

    
942
      pnode = inst_config.primary_node
943
      if pnode in node_info:
944
        node_info[pnode]['pinst'].append(instance)
945
      else:
946
        feedback_fn("  - ERROR: instance %s, connection to primary node"
947
                    " %s failed" % (instance, pnode))
948
        bad = True
949

    
950
      # If the instance is non-redundant we cannot survive losing its primary
951
      # node, so we are not N+1 compliant. On the other hand we have no disk
952
      # templates with more than one secondary so that situation is not well
953
      # supported either.
954
      # FIXME: does not support file-backed instances
955
      if len(inst_config.secondary_nodes) == 0:
956
        i_non_redundant.append(instance)
957
      elif len(inst_config.secondary_nodes) > 1:
958
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
959
                    % instance)
960

    
961
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962
        i_non_a_balanced.append(instance)
963

    
964
      for snode in inst_config.secondary_nodes:
965
        if snode in node_info:
966
          node_info[snode]['sinst'].append(instance)
967
          if pnode not in node_info[snode]['sinst-by-pnode']:
968
            node_info[snode]['sinst-by-pnode'][pnode] = []
969
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
970
        else:
971
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
972
                      " %s failed" % (instance, snode))
973

    
974
    feedback_fn("* Verifying orphan volumes")
975
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976
                                       feedback_fn)
977
    bad = bad or result
978

    
979
    feedback_fn("* Verifying remaining instances")
980
    result = self._VerifyOrphanInstances(instancelist, node_instance,
981
                                         feedback_fn)
982
    bad = bad or result
983

    
984
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985
      feedback_fn("* Verifying N+1 Memory redundancy")
986
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
987
      bad = bad or result
988

    
989
    feedback_fn("* Other Notes")
990
    if i_non_redundant:
991
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
992
                  % len(i_non_redundant))
993

    
994
    if i_non_a_balanced:
995
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
996
                  % len(i_non_a_balanced))
997

    
998
    return not bad
999

    
1000
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001
    """Analize the post-hooks' result
1002

1003
    This method analyses the hook result, handles it, and sends some
1004
    nicely-formatted feedback back to the user.
1005

1006
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008
    @param hooks_results: the results of the multi-node hooks rpc call
1009
    @param feedback_fn: function used send feedback back to the caller
1010
    @param lu_result: previous Exec result
1011
    @return: the new Exec result, based on the previous result
1012
        and hook results
1013

1014
    """
1015
    # We only really run POST phase hooks, and are only interested in
1016
    # their results
1017
    if phase == constants.HOOKS_PHASE_POST:
1018
      # Used to change hooks' output to proper indentation
1019
      indent_re = re.compile('^', re.M)
1020
      feedback_fn("* Hooks Results")
1021
      if not hooks_results:
1022
        feedback_fn("  - ERROR: general communication failure")
1023
        lu_result = 1
1024
      else:
1025
        for node_name in hooks_results:
1026
          show_node_header = True
1027
          res = hooks_results[node_name]
1028
          if res.failed or res.data is False or not isinstance(res.data, list):
1029
            feedback_fn("    Communication failure in hooks execution")
1030
            lu_result = 1
1031
            continue
1032
          for script, hkr, output in res.data:
1033
            if hkr == constants.HKR_FAIL:
1034
              # The node header is only shown once, if there are
1035
              # failing hooks on that node
1036
              if show_node_header:
1037
                feedback_fn("  Node %s:" % node_name)
1038
                show_node_header = False
1039
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1040
              output = indent_re.sub('      ', output)
1041
              feedback_fn("%s" % output)
1042
              lu_result = 1
1043

    
1044
      return lu_result
1045

    
1046

    
1047
class LUVerifyDisks(NoHooksLU):
1048
  """Verifies the cluster disks status.
1049

1050
  """
1051
  _OP_REQP = []
1052
  REQ_BGL = False
1053

    
1054
  def ExpandNames(self):
1055
    self.needed_locks = {
1056
      locking.LEVEL_NODE: locking.ALL_SET,
1057
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1058
    }
1059
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1060

    
1061
  def CheckPrereq(self):
1062
    """Check prerequisites.
1063

1064
    This has no prerequisites.
1065

1066
    """
1067
    pass
1068

    
1069
  def Exec(self, feedback_fn):
1070
    """Verify integrity of cluster disks.
1071

1072
    """
1073
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1074

    
1075
    vg_name = self.cfg.GetVGName()
1076
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1077
    instances = [self.cfg.GetInstanceInfo(name)
1078
                 for name in self.cfg.GetInstanceList()]
1079

    
1080
    nv_dict = {}
1081
    for inst in instances:
1082
      inst_lvs = {}
1083
      if (inst.status != "up" or
1084
          inst.disk_template not in constants.DTS_NET_MIRROR):
1085
        continue
1086
      inst.MapLVsByNode(inst_lvs)
1087
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088
      for node, vol_list in inst_lvs.iteritems():
1089
        for vol in vol_list:
1090
          nv_dict[(node, vol)] = inst
1091

    
1092
    if not nv_dict:
1093
      return result
1094

    
1095
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1096

    
1097
    to_act = set()
1098
    for node in nodes:
1099
      # node_volume
1100
      lvs = node_lvs[node]
1101
      if lvs.failed:
1102
        self.LogWarning("Connection to node %s failed: %s" %
1103
                        (node, lvs.data))
1104
        continue
1105
      lvs = lvs.data
1106
      if isinstance(lvs, basestring):
1107
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108
        res_nlvm[node] = lvs
1109
      elif not isinstance(lvs, dict):
1110
        logging.warning("Connection to node %s failed or invalid data"
1111
                        " returned", node)
1112
        res_nodes.append(node)
1113
        continue
1114

    
1115
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116
        inst = nv_dict.pop((node, lv_name), None)
1117
        if (not lv_online and inst is not None
1118
            and inst.name not in res_instances):
1119
          res_instances.append(inst.name)
1120

    
1121
    # any leftover items in nv_dict are missing LVs, let's arrange the
1122
    # data better
1123
    for key, inst in nv_dict.iteritems():
1124
      if inst.name not in res_missing:
1125
        res_missing[inst.name] = []
1126
      res_missing[inst.name].append(key)
1127

    
1128
    return result
1129

    
1130

    
1131
class LURenameCluster(LogicalUnit):
1132
  """Rename the cluster.
1133

1134
  """
1135
  HPATH = "cluster-rename"
1136
  HTYPE = constants.HTYPE_CLUSTER
1137
  _OP_REQP = ["name"]
1138

    
1139
  def BuildHooksEnv(self):
1140
    """Build hooks env.
1141

1142
    """
1143
    env = {
1144
      "OP_TARGET": self.cfg.GetClusterName(),
1145
      "NEW_NAME": self.op.name,
1146
      }
1147
    mn = self.cfg.GetMasterNode()
1148
    return env, [mn], [mn]
1149

    
1150
  def CheckPrereq(self):
1151
    """Verify that the passed name is a valid one.
1152

1153
    """
1154
    hostname = utils.HostInfo(self.op.name)
1155

    
1156
    new_name = hostname.name
1157
    self.ip = new_ip = hostname.ip
1158
    old_name = self.cfg.GetClusterName()
1159
    old_ip = self.cfg.GetMasterIP()
1160
    if new_name == old_name and new_ip == old_ip:
1161
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162
                                 " cluster has changed")
1163
    if new_ip != old_ip:
1164
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166
                                   " reachable on the network. Aborting." %
1167
                                   new_ip)
1168

    
1169
    self.op.name = new_name
1170

    
1171
  def Exec(self, feedback_fn):
1172
    """Rename the cluster.
1173

1174
    """
1175
    clustername = self.op.name
1176
    ip = self.ip
1177

    
1178
    # shutdown the master IP
1179
    master = self.cfg.GetMasterNode()
1180
    result = self.rpc.call_node_stop_master(master, False)
1181
    if result.failed or not result.data:
1182
      raise errors.OpExecError("Could not disable the master role")
1183

    
1184
    try:
1185
      cluster = self.cfg.GetClusterInfo()
1186
      cluster.cluster_name = clustername
1187
      cluster.master_ip = ip
1188
      self.cfg.Update(cluster)
1189

    
1190
      # update the known hosts file
1191
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192
      node_list = self.cfg.GetNodeList()
1193
      try:
1194
        node_list.remove(master)
1195
      except ValueError:
1196
        pass
1197
      result = self.rpc.call_upload_file(node_list,
1198
                                         constants.SSH_KNOWN_HOSTS_FILE)
1199
      for to_node, to_result in result.iteritems():
1200
        if to_result.failed or not to_result.data:
1201
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1202

    
1203
    finally:
1204
      result = self.rpc.call_node_start_master(master, False)
1205
      if result.failed or not result.data:
1206
        self.LogWarning("Could not re-enable the master role on"
1207
                        " the master, please restart manually.")
1208

    
1209

    
1210
def _RecursiveCheckIfLVMBased(disk):
1211
  """Check if the given disk or its children are lvm-based.
1212

1213
  @type disk: L{objects.Disk}
1214
  @param disk: the disk to check
1215
  @rtype: booleean
1216
  @return: boolean indicating whether a LD_LV dev_type was found or not
1217

1218
  """
1219
  if disk.children:
1220
    for chdisk in disk.children:
1221
      if _RecursiveCheckIfLVMBased(chdisk):
1222
        return True
1223
  return disk.dev_type == constants.LD_LV
1224

    
1225

    
1226
class LUSetClusterParams(LogicalUnit):
1227
  """Change the parameters of the cluster.
1228

1229
  """
1230
  HPATH = "cluster-modify"
1231
  HTYPE = constants.HTYPE_CLUSTER
1232
  _OP_REQP = []
1233
  REQ_BGL = False
1234

    
1235
  def CheckParameters(self):
1236
    """Check parameters
1237

1238
    """
1239
    if not hasattr(self.op, "candidate_pool_size"):
1240
      self.op.candidate_pool_size = None
1241
    if self.op.candidate_pool_size is not None:
1242
      try:
1243
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244
      except ValueError, err:
1245
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1246
                                   str(err))
1247
      if self.op.candidate_pool_size < 1:
1248
        raise errors.OpPrereqError("At least one master candidate needed")
1249

    
1250
  def ExpandNames(self):
1251
    # FIXME: in the future maybe other cluster params won't require checking on
1252
    # all nodes to be modified.
1253
    self.needed_locks = {
1254
      locking.LEVEL_NODE: locking.ALL_SET,
1255
    }
1256
    self.share_locks[locking.LEVEL_NODE] = 1
1257

    
1258
  def BuildHooksEnv(self):
1259
    """Build hooks env.
1260

1261
    """
1262
    env = {
1263
      "OP_TARGET": self.cfg.GetClusterName(),
1264
      "NEW_VG_NAME": self.op.vg_name,
1265
      }
1266
    mn = self.cfg.GetMasterNode()
1267
    return env, [mn], [mn]
1268

    
1269
  def CheckPrereq(self):
1270
    """Check prerequisites.
1271

1272
    This checks whether the given params don't conflict and
1273
    if the given volume group is valid.
1274

1275
    """
1276
    # FIXME: This only works because there is only one parameter that can be
1277
    # changed or removed.
1278
    if self.op.vg_name is not None and not self.op.vg_name:
1279
      instances = self.cfg.GetAllInstancesInfo().values()
1280
      for inst in instances:
1281
        for disk in inst.disks:
1282
          if _RecursiveCheckIfLVMBased(disk):
1283
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1284
                                       " lvm-based instances exist")
1285

    
1286
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1287

    
1288
    # if vg_name not None, checks given volume group on all nodes
1289
    if self.op.vg_name:
1290
      vglist = self.rpc.call_vg_list(node_list)
1291
      for node in node_list:
1292
        if vglist[node].failed:
1293
          # ignoring down node
1294
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1295
          continue
1296
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1297
                                              self.op.vg_name,
1298
                                              constants.MIN_VG_SIZE)
1299
        if vgstatus:
1300
          raise errors.OpPrereqError("Error on node '%s': %s" %
1301
                                     (node, vgstatus))
1302

    
1303
    self.cluster = cluster = self.cfg.GetClusterInfo()
1304
    # validate beparams changes
1305
    if self.op.beparams:
1306
      utils.CheckBEParams(self.op.beparams)
1307
      self.new_beparams = cluster.FillDict(
1308
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1309

    
1310
    # hypervisor list/parameters
1311
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312
    if self.op.hvparams:
1313
      if not isinstance(self.op.hvparams, dict):
1314
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315
      for hv_name, hv_dict in self.op.hvparams.items():
1316
        if hv_name not in self.new_hvparams:
1317
          self.new_hvparams[hv_name] = hv_dict
1318
        else:
1319
          self.new_hvparams[hv_name].update(hv_dict)
1320

    
1321
    if self.op.enabled_hypervisors is not None:
1322
      self.hv_list = self.op.enabled_hypervisors
1323
    else:
1324
      self.hv_list = cluster.enabled_hypervisors
1325

    
1326
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327
      # either the enabled list has changed, or the parameters have, validate
1328
      for hv_name, hv_params in self.new_hvparams.items():
1329
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330
            (self.op.enabled_hypervisors and
1331
             hv_name in self.op.enabled_hypervisors)):
1332
          # either this is a new hypervisor, or its parameters have changed
1333
          hv_class = hypervisor.GetHypervisor(hv_name)
1334
          hv_class.CheckParameterSyntax(hv_params)
1335
          _CheckHVParams(self, node_list, hv_name, hv_params)
1336

    
1337
  def Exec(self, feedback_fn):
1338
    """Change the parameters of the cluster.
1339

1340
    """
1341
    if self.op.vg_name is not None:
1342
      if self.op.vg_name != self.cfg.GetVGName():
1343
        self.cfg.SetVGName(self.op.vg_name)
1344
      else:
1345
        feedback_fn("Cluster LVM configuration already in desired"
1346
                    " state, not changing")
1347
    if self.op.hvparams:
1348
      self.cluster.hvparams = self.new_hvparams
1349
    if self.op.enabled_hypervisors is not None:
1350
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351
    if self.op.beparams:
1352
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353
    if self.op.candidate_pool_size is not None:
1354
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1355

    
1356
    self.cfg.Update(self.cluster)
1357

    
1358
    # we want to update nodes after the cluster so that if any errors
1359
    # happen, we have recorded and saved the cluster info
1360
    if self.op.candidate_pool_size is not None:
1361
      node_info = self.cfg.GetAllNodesInfo().values()
1362
      num_candidates = len([node for node in node_info
1363
                            if node.master_candidate])
1364
      num_nodes = len(node_info)
1365
      if num_candidates < self.op.candidate_pool_size:
1366
        random.shuffle(node_info)
1367
        for node in node_info:
1368
          if num_candidates >= self.op.candidate_pool_size:
1369
            break
1370
          if node.master_candidate:
1371
            continue
1372
          node.master_candidate = True
1373
          self.LogInfo("Promoting node %s to master candidate", node.name)
1374
          self.cfg.Update(node)
1375
          self.context.ReaddNode(node)
1376
          num_candidates += 1
1377
      elif num_candidates > self.op.candidate_pool_size:
1378
        self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379
                     " of candidate_pool_size (%d)" %
1380
                     (num_candidates, self.op.candidate_pool_size))
1381

    
1382

    
1383
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384
  """Sleep and poll for an instance's disk to sync.
1385

1386
  """
1387
  if not instance.disks:
1388
    return True
1389

    
1390
  if not oneshot:
1391
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1392

    
1393
  node = instance.primary_node
1394

    
1395
  for dev in instance.disks:
1396
    lu.cfg.SetDiskID(dev, node)
1397

    
1398
  retries = 0
1399
  while True:
1400
    max_time = 0
1401
    done = True
1402
    cumul_degraded = False
1403
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404
    if rstats.failed or not rstats.data:
1405
      lu.LogWarning("Can't get any data from node %s", node)
1406
      retries += 1
1407
      if retries >= 10:
1408
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1409
                                 " aborting." % node)
1410
      time.sleep(6)
1411
      continue
1412
    rstats = rstats.data
1413
    retries = 0
1414
    for i in range(len(rstats)):
1415
      mstat = rstats[i]
1416
      if mstat is None:
1417
        lu.LogWarning("Can't compute data for node %s/%s",
1418
                           node, instance.disks[i].iv_name)
1419
        continue
1420
      # we ignore the ldisk parameter
1421
      perc_done, est_time, is_degraded, _ = mstat
1422
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423
      if perc_done is not None:
1424
        done = False
1425
        if est_time is not None:
1426
          rem_time = "%d estimated seconds remaining" % est_time
1427
          max_time = est_time
1428
        else:
1429
          rem_time = "no time estimate"
1430
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431
                        (instance.disks[i].iv_name, perc_done, rem_time))
1432
    if done or oneshot:
1433
      break
1434

    
1435
    time.sleep(min(60, max_time))
1436

    
1437
  if done:
1438
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439
  return not cumul_degraded
1440

    
1441

    
1442
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443
  """Check that mirrors are not degraded.
1444

1445
  The ldisk parameter, if True, will change the test from the
1446
  is_degraded attribute (which represents overall non-ok status for
1447
  the device(s)) to the ldisk (representing the local storage status).
1448

1449
  """
1450
  lu.cfg.SetDiskID(dev, node)
1451
  if ldisk:
1452
    idx = 6
1453
  else:
1454
    idx = 5
1455

    
1456
  result = True
1457
  if on_primary or dev.AssembleOnSecondary():
1458
    rstats = lu.rpc.call_blockdev_find(node, dev)
1459
    if rstats.failed or not rstats.data:
1460
      logging.warning("Node %s: disk degraded, not found or node down", node)
1461
      result = False
1462
    else:
1463
      result = result and (not rstats.data[idx])
1464
  if dev.children:
1465
    for child in dev.children:
1466
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1467

    
1468
  return result
1469

    
1470

    
1471
class LUDiagnoseOS(NoHooksLU):
1472
  """Logical unit for OS diagnose/query.
1473

1474
  """
1475
  _OP_REQP = ["output_fields", "names"]
1476
  REQ_BGL = False
1477
  _FIELDS_STATIC = utils.FieldSet()
1478
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1479

    
1480
  def ExpandNames(self):
1481
    if self.op.names:
1482
      raise errors.OpPrereqError("Selective OS query not supported")
1483

    
1484
    _CheckOutputFields(static=self._FIELDS_STATIC,
1485
                       dynamic=self._FIELDS_DYNAMIC,
1486
                       selected=self.op.output_fields)
1487

    
1488
    # Lock all nodes, in shared mode
1489
    self.needed_locks = {}
1490
    self.share_locks[locking.LEVEL_NODE] = 1
1491
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1492

    
1493
  def CheckPrereq(self):
1494
    """Check prerequisites.
1495

1496
    """
1497

    
1498
  @staticmethod
1499
  def _DiagnoseByOS(node_list, rlist):
1500
    """Remaps a per-node return list into an a per-os per-node dictionary
1501

1502
    @param node_list: a list with the names of all nodes
1503
    @param rlist: a map with node names as keys and OS objects as values
1504

1505
    @rtype: dict
1506
    @returns: a dictionary with osnames as keys and as value another map, with
1507
        nodes as keys and list of OS objects as values, eg::
1508

1509
          {"debian-etch": {"node1": [<object>,...],
1510
                           "node2": [<object>,]}
1511
          }
1512

1513
    """
1514
    all_os = {}
1515
    for node_name, nr in rlist.iteritems():
1516
      if nr.failed or not nr.data:
1517
        continue
1518
      for os_obj in nr.data:
1519
        if os_obj.name not in all_os:
1520
          # build a list of nodes for this os containing empty lists
1521
          # for each node in node_list
1522
          all_os[os_obj.name] = {}
1523
          for nname in node_list:
1524
            all_os[os_obj.name][nname] = []
1525
        all_os[os_obj.name][node_name].append(os_obj)
1526
    return all_os
1527

    
1528
  def Exec(self, feedback_fn):
1529
    """Compute the list of OSes.
1530

1531
    """
1532
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1533
    node_data = self.rpc.call_os_diagnose(node_list)
1534
    if node_data == False:
1535
      raise errors.OpExecError("Can't gather the list of OSes")
1536
    pol = self._DiagnoseByOS(node_list, node_data)
1537
    output = []
1538
    for os_name, os_data in pol.iteritems():
1539
      row = []
1540
      for field in self.op.output_fields:
1541
        if field == "name":
1542
          val = os_name
1543
        elif field == "valid":
1544
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1545
        elif field == "node_status":
1546
          val = {}
1547
          for node_name, nos_list in os_data.iteritems():
1548
            val[node_name] = [(v.status, v.path) for v in nos_list]
1549
        else:
1550
          raise errors.ParameterError(field)
1551
        row.append(val)
1552
      output.append(row)
1553

    
1554
    return output
1555

    
1556

    
1557
class LURemoveNode(LogicalUnit):
1558
  """Logical unit for removing a node.
1559

1560
  """
1561
  HPATH = "node-remove"
1562
  HTYPE = constants.HTYPE_NODE
1563
  _OP_REQP = ["node_name"]
1564

    
1565
  def BuildHooksEnv(self):
1566
    """Build hooks env.
1567

1568
    This doesn't run on the target node in the pre phase as a failed
1569
    node would then be impossible to remove.
1570

1571
    """
1572
    env = {
1573
      "OP_TARGET": self.op.node_name,
1574
      "NODE_NAME": self.op.node_name,
1575
      }
1576
    all_nodes = self.cfg.GetNodeList()
1577
    all_nodes.remove(self.op.node_name)
1578
    return env, all_nodes, all_nodes
1579

    
1580
  def CheckPrereq(self):
1581
    """Check prerequisites.
1582

1583
    This checks:
1584
     - the node exists in the configuration
1585
     - it does not have primary or secondary instances
1586
     - it's not the master
1587

1588
    Any errors are signalled by raising errors.OpPrereqError.
1589

1590
    """
1591
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1592
    if node is None:
1593
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1594

    
1595
    instance_list = self.cfg.GetInstanceList()
1596

    
1597
    masternode = self.cfg.GetMasterNode()
1598
    if node.name == masternode:
1599
      raise errors.OpPrereqError("Node is the master node,"
1600
                                 " you need to failover first.")
1601

    
1602
    for instance_name in instance_list:
1603
      instance = self.cfg.GetInstanceInfo(instance_name)
1604
      if node.name == instance.primary_node:
1605
        raise errors.OpPrereqError("Instance %s still running on the node,"
1606
                                   " please remove first." % instance_name)
1607
      if node.name in instance.secondary_nodes:
1608
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609
                                   " please remove first." % instance_name)
1610
    self.op.node_name = node.name
1611
    self.node = node
1612

    
1613
  def Exec(self, feedback_fn):
1614
    """Removes the node from the cluster.
1615

1616
    """
1617
    node = self.node
1618
    logging.info("Stopping the node daemon and removing configs from node %s",
1619
                 node.name)
1620

    
1621
    self.context.RemoveNode(node.name)
1622

    
1623
    self.rpc.call_node_leave_cluster(node.name)
1624

    
1625
    # Promote nodes to master candidate as needed
1626
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1627
    node_info = self.cfg.GetAllNodesInfo().values()
1628
    num_candidates = len([n for n in node_info
1629
                          if n.master_candidate])
1630
    num_nodes = len(node_info)
1631
    random.shuffle(node_info)
1632
    for node in node_info:
1633
      if num_candidates >= cp_size or num_candidates >= num_nodes:
1634
        break
1635
      if node.master_candidate:
1636
        continue
1637
      node.master_candidate = True
1638
      self.LogInfo("Promoting node %s to master candidate", node.name)
1639
      self.cfg.Update(node)
1640
      self.context.ReaddNode(node)
1641
      num_candidates += 1
1642

    
1643

    
1644
class LUQueryNodes(NoHooksLU):
1645
  """Logical unit for querying nodes.
1646

1647
  """
1648
  _OP_REQP = ["output_fields", "names"]
1649
  REQ_BGL = False
1650
  _FIELDS_DYNAMIC = utils.FieldSet(
1651
    "dtotal", "dfree",
1652
    "mtotal", "mnode", "mfree",
1653
    "bootid",
1654
    "ctotal",
1655
    )
1656

    
1657
  _FIELDS_STATIC = utils.FieldSet(
1658
    "name", "pinst_cnt", "sinst_cnt",
1659
    "pinst_list", "sinst_list",
1660
    "pip", "sip", "tags",
1661
    "serial_no",
1662
    "master_candidate",
1663
    "master",
1664
    "offline",
1665
    )
1666

    
1667
  def ExpandNames(self):
1668
    _CheckOutputFields(static=self._FIELDS_STATIC,
1669
                       dynamic=self._FIELDS_DYNAMIC,
1670
                       selected=self.op.output_fields)
1671

    
1672
    self.needed_locks = {}
1673
    self.share_locks[locking.LEVEL_NODE] = 1
1674

    
1675
    if self.op.names:
1676
      self.wanted = _GetWantedNodes(self, self.op.names)
1677
    else:
1678
      self.wanted = locking.ALL_SET
1679

    
1680
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1681
    if self.do_locking:
1682
      # if we don't request only static fields, we need to lock the nodes
1683
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1684

    
1685

    
1686
  def CheckPrereq(self):
1687
    """Check prerequisites.
1688

1689
    """
1690
    # The validation of the node list is done in the _GetWantedNodes,
1691
    # if non empty, and if empty, there's no validation to do
1692
    pass
1693

    
1694
  def Exec(self, feedback_fn):
1695
    """Computes the list of nodes and their attributes.
1696

1697
    """
1698
    all_info = self.cfg.GetAllNodesInfo()
1699
    if self.do_locking:
1700
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1701
    elif self.wanted != locking.ALL_SET:
1702
      nodenames = self.wanted
1703
      missing = set(nodenames).difference(all_info.keys())
1704
      if missing:
1705
        raise errors.OpExecError(
1706
          "Some nodes were removed before retrieving their data: %s" % missing)
1707
    else:
1708
      nodenames = all_info.keys()
1709

    
1710
    nodenames = utils.NiceSort(nodenames)
1711
    nodelist = [all_info[name] for name in nodenames]
1712

    
1713
    # begin data gathering
1714

    
1715
    if self.do_locking:
1716
      live_data = {}
1717
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1718
                                          self.cfg.GetHypervisorType())
1719
      for name in nodenames:
1720
        nodeinfo = node_data[name]
1721
        if not nodeinfo.failed and nodeinfo.data:
1722
          nodeinfo = nodeinfo.data
1723
          fn = utils.TryConvert
1724
          live_data[name] = {
1725
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1726
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1727
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1728
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1729
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1730
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1731
            "bootid": nodeinfo.get('bootid', None),
1732
            }
1733
        else:
1734
          live_data[name] = {}
1735
    else:
1736
      live_data = dict.fromkeys(nodenames, {})
1737

    
1738
    node_to_primary = dict([(name, set()) for name in nodenames])
1739
    node_to_secondary = dict([(name, set()) for name in nodenames])
1740

    
1741
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1742
                             "sinst_cnt", "sinst_list"))
1743
    if inst_fields & frozenset(self.op.output_fields):
1744
      instancelist = self.cfg.GetInstanceList()
1745

    
1746
      for instance_name in instancelist:
1747
        inst = self.cfg.GetInstanceInfo(instance_name)
1748
        if inst.primary_node in node_to_primary:
1749
          node_to_primary[inst.primary_node].add(inst.name)
1750
        for secnode in inst.secondary_nodes:
1751
          if secnode in node_to_secondary:
1752
            node_to_secondary[secnode].add(inst.name)
1753

    
1754
    master_node = self.cfg.GetMasterNode()
1755

    
1756
    # end data gathering
1757

    
1758
    output = []
1759
    for node in nodelist:
1760
      node_output = []
1761
      for field in self.op.output_fields:
1762
        if field == "name":
1763
          val = node.name
1764
        elif field == "pinst_list":
1765
          val = list(node_to_primary[node.name])
1766
        elif field == "sinst_list":
1767
          val = list(node_to_secondary[node.name])
1768
        elif field == "pinst_cnt":
1769
          val = len(node_to_primary[node.name])
1770
        elif field == "sinst_cnt":
1771
          val = len(node_to_secondary[node.name])
1772
        elif field == "pip":
1773
          val = node.primary_ip
1774
        elif field == "sip":
1775
          val = node.secondary_ip
1776
        elif field == "tags":
1777
          val = list(node.GetTags())
1778
        elif field == "serial_no":
1779
          val = node.serial_no
1780
        elif field == "master_candidate":
1781
          val = node.master_candidate
1782
        elif field == "master":
1783
          val = node.name == master_node
1784
        elif field == "offline":
1785
          val = node.offline
1786
        elif self._FIELDS_DYNAMIC.Matches(field):
1787
          val = live_data[node.name].get(field, None)
1788
        else:
1789
          raise errors.ParameterError(field)
1790
        node_output.append(val)
1791
      output.append(node_output)
1792

    
1793
    return output
1794

    
1795

    
1796
class LUQueryNodeVolumes(NoHooksLU):
1797
  """Logical unit for getting volumes on node(s).
1798

1799
  """
1800
  _OP_REQP = ["nodes", "output_fields"]
1801
  REQ_BGL = False
1802
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1803
  _FIELDS_STATIC = utils.FieldSet("node")
1804

    
1805
  def ExpandNames(self):
1806
    _CheckOutputFields(static=self._FIELDS_STATIC,
1807
                       dynamic=self._FIELDS_DYNAMIC,
1808
                       selected=self.op.output_fields)
1809

    
1810
    self.needed_locks = {}
1811
    self.share_locks[locking.LEVEL_NODE] = 1
1812
    if not self.op.nodes:
1813
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1814
    else:
1815
      self.needed_locks[locking.LEVEL_NODE] = \
1816
        _GetWantedNodes(self, self.op.nodes)
1817

    
1818
  def CheckPrereq(self):
1819
    """Check prerequisites.
1820

1821
    This checks that the fields required are valid output fields.
1822

1823
    """
1824
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1825

    
1826
  def Exec(self, feedback_fn):
1827
    """Computes the list of nodes and their attributes.
1828

1829
    """
1830
    nodenames = self.nodes
1831
    volumes = self.rpc.call_node_volumes(nodenames)
1832

    
1833
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1834
             in self.cfg.GetInstanceList()]
1835

    
1836
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1837

    
1838
    output = []
1839
    for node in nodenames:
1840
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1841
        continue
1842

    
1843
      node_vols = volumes[node].data[:]
1844
      node_vols.sort(key=lambda vol: vol['dev'])
1845

    
1846
      for vol in node_vols:
1847
        node_output = []
1848
        for field in self.op.output_fields:
1849
          if field == "node":
1850
            val = node
1851
          elif field == "phys":
1852
            val = vol['dev']
1853
          elif field == "vg":
1854
            val = vol['vg']
1855
          elif field == "name":
1856
            val = vol['name']
1857
          elif field == "size":
1858
            val = int(float(vol['size']))
1859
          elif field == "instance":
1860
            for inst in ilist:
1861
              if node not in lv_by_node[inst]:
1862
                continue
1863
              if vol['name'] in lv_by_node[inst][node]:
1864
                val = inst.name
1865
                break
1866
            else:
1867
              val = '-'
1868
          else:
1869
            raise errors.ParameterError(field)
1870
          node_output.append(str(val))
1871

    
1872
        output.append(node_output)
1873

    
1874
    return output
1875

    
1876

    
1877
class LUAddNode(LogicalUnit):
1878
  """Logical unit for adding node to the cluster.
1879

1880
  """
1881
  HPATH = "node-add"
1882
  HTYPE = constants.HTYPE_NODE
1883
  _OP_REQP = ["node_name"]
1884

    
1885
  def BuildHooksEnv(self):
1886
    """Build hooks env.
1887

1888
    This will run on all nodes before, and on all nodes + the new node after.
1889

1890
    """
1891
    env = {
1892
      "OP_TARGET": self.op.node_name,
1893
      "NODE_NAME": self.op.node_name,
1894
      "NODE_PIP": self.op.primary_ip,
1895
      "NODE_SIP": self.op.secondary_ip,
1896
      }
1897
    nodes_0 = self.cfg.GetNodeList()
1898
    nodes_1 = nodes_0 + [self.op.node_name, ]
1899
    return env, nodes_0, nodes_1
1900

    
1901
  def CheckPrereq(self):
1902
    """Check prerequisites.
1903

1904
    This checks:
1905
     - the new node is not already in the config
1906
     - it is resolvable
1907
     - its parameters (single/dual homed) matches the cluster
1908

1909
    Any errors are signalled by raising errors.OpPrereqError.
1910

1911
    """
1912
    node_name = self.op.node_name
1913
    cfg = self.cfg
1914

    
1915
    dns_data = utils.HostInfo(node_name)
1916

    
1917
    node = dns_data.name
1918
    primary_ip = self.op.primary_ip = dns_data.ip
1919
    secondary_ip = getattr(self.op, "secondary_ip", None)
1920
    if secondary_ip is None:
1921
      secondary_ip = primary_ip
1922
    if not utils.IsValidIP(secondary_ip):
1923
      raise errors.OpPrereqError("Invalid secondary IP given")
1924
    self.op.secondary_ip = secondary_ip
1925

    
1926
    node_list = cfg.GetNodeList()
1927
    if not self.op.readd and node in node_list:
1928
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1929
                                 node)
1930
    elif self.op.readd and node not in node_list:
1931
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1932

    
1933
    for existing_node_name in node_list:
1934
      existing_node = cfg.GetNodeInfo(existing_node_name)
1935

    
1936
      if self.op.readd and node == existing_node_name:
1937
        if (existing_node.primary_ip != primary_ip or
1938
            existing_node.secondary_ip != secondary_ip):
1939
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1940
                                     " address configuration as before")
1941
        continue
1942

    
1943
      if (existing_node.primary_ip == primary_ip or
1944
          existing_node.secondary_ip == primary_ip or
1945
          existing_node.primary_ip == secondary_ip or
1946
          existing_node.secondary_ip == secondary_ip):
1947
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1948
                                   " existing node %s" % existing_node.name)
1949

    
1950
    # check that the type of the node (single versus dual homed) is the
1951
    # same as for the master
1952
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1953
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1954
    newbie_singlehomed = secondary_ip == primary_ip
1955
    if master_singlehomed != newbie_singlehomed:
1956
      if master_singlehomed:
1957
        raise errors.OpPrereqError("The master has no private ip but the"
1958
                                   " new node has one")
1959
      else:
1960
        raise errors.OpPrereqError("The master has a private ip but the"
1961
                                   " new node doesn't have one")
1962

    
1963
    # checks reachablity
1964
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1965
      raise errors.OpPrereqError("Node not reachable by ping")
1966

    
1967
    if not newbie_singlehomed:
1968
      # check reachability from my secondary ip to newbie's secondary ip
1969
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1970
                           source=myself.secondary_ip):
1971
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1972
                                   " based ping to noded port")
1973

    
1974
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1975
    node_info = self.cfg.GetAllNodesInfo().values()
1976
    num_candidates = len([n for n in node_info
1977
                          if n.master_candidate])
1978
    master_candidate = num_candidates < cp_size
1979

    
1980
    self.new_node = objects.Node(name=node,
1981
                                 primary_ip=primary_ip,
1982
                                 secondary_ip=secondary_ip,
1983
                                 master_candidate=master_candidate,
1984
                                 offline=False)
1985

    
1986
  def Exec(self, feedback_fn):
1987
    """Adds the new node to the cluster.
1988

1989
    """
1990
    new_node = self.new_node
1991
    node = new_node.name
1992

    
1993
    # check connectivity
1994
    result = self.rpc.call_version([node])[node]
1995
    result.Raise()
1996
    if result.data:
1997
      if constants.PROTOCOL_VERSION == result.data:
1998
        logging.info("Communication to node %s fine, sw version %s match",
1999
                     node, result.data)
2000
      else:
2001
        raise errors.OpExecError("Version mismatch master version %s,"
2002
                                 " node version %s" %
2003
                                 (constants.PROTOCOL_VERSION, result.data))
2004
    else:
2005
      raise errors.OpExecError("Cannot get version from the new node")
2006

    
2007
    # setup ssh on node
2008
    logging.info("Copy ssh key to node %s", node)
2009
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2010
    keyarray = []
2011
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2012
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2013
                priv_key, pub_key]
2014

    
2015
    for i in keyfiles:
2016
      f = open(i, 'r')
2017
      try:
2018
        keyarray.append(f.read())
2019
      finally:
2020
        f.close()
2021

    
2022
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2023
                                    keyarray[2],
2024
                                    keyarray[3], keyarray[4], keyarray[5])
2025

    
2026
    if result.failed or not result.data:
2027
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2028

    
2029
    # Add node to our /etc/hosts, and add key to known_hosts
2030
    utils.AddHostToEtcHosts(new_node.name)
2031

    
2032
    if new_node.secondary_ip != new_node.primary_ip:
2033
      result = self.rpc.call_node_has_ip_address(new_node.name,
2034
                                                 new_node.secondary_ip)
2035
      if result.failed or not result.data:
2036
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2037
                                 " you gave (%s). Please fix and re-run this"
2038
                                 " command." % new_node.secondary_ip)
2039

    
2040
    node_verify_list = [self.cfg.GetMasterNode()]
2041
    node_verify_param = {
2042
      'nodelist': [node],
2043
      # TODO: do a node-net-test as well?
2044
    }
2045

    
2046
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2047
                                       self.cfg.GetClusterName())
2048
    for verifier in node_verify_list:
2049
      if result[verifier].failed or not result[verifier].data:
2050
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2051
                                 " for remote verification" % verifier)
2052
      if result[verifier].data['nodelist']:
2053
        for failed in result[verifier].data['nodelist']:
2054
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2055
                      (verifier, result[verifier]['nodelist'][failed]))
2056
        raise errors.OpExecError("ssh/hostname verification failed.")
2057

    
2058
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2059
    # including the node just added
2060
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2061
    dist_nodes = self.cfg.GetNodeList()
2062
    if not self.op.readd:
2063
      dist_nodes.append(node)
2064
    if myself.name in dist_nodes:
2065
      dist_nodes.remove(myself.name)
2066

    
2067
    logging.debug("Copying hosts and known_hosts to all nodes")
2068
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2069
      result = self.rpc.call_upload_file(dist_nodes, fname)
2070
      for to_node, to_result in result.iteritems():
2071
        if to_result.failed or not to_result.data:
2072
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2073

    
2074
    to_copy = []
2075
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2076
      to_copy.append(constants.VNC_PASSWORD_FILE)
2077
    for fname in to_copy:
2078
      result = self.rpc.call_upload_file([node], fname)
2079
      if result[node].failed or not result[node]:
2080
        logging.error("Could not copy file %s to node %s", fname, node)
2081

    
2082
    if self.op.readd:
2083
      self.context.ReaddNode(new_node)
2084
    else:
2085
      self.context.AddNode(new_node)
2086

    
2087

    
2088
class LUSetNodeParams(LogicalUnit):
2089
  """Modifies the parameters of a node.
2090

2091
  """
2092
  HPATH = "node-modify"
2093
  HTYPE = constants.HTYPE_NODE
2094
  _OP_REQP = ["node_name"]
2095
  REQ_BGL = False
2096

    
2097
  def CheckArguments(self):
2098
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2099
    if node_name is None:
2100
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2101
    self.op.node_name = node_name
2102
    if not hasattr(self.op, 'master_candidate'):
2103
      raise errors.OpPrereqError("Please pass at least one modification")
2104
    self.op.master_candidate = bool(self.op.master_candidate)
2105

    
2106
  def ExpandNames(self):
2107
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2108

    
2109
  def BuildHooksEnv(self):
2110
    """Build hooks env.
2111

2112
    This runs on the master node.
2113

2114
    """
2115
    env = {
2116
      "OP_TARGET": self.op.node_name,
2117
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2118
      }
2119
    nl = [self.cfg.GetMasterNode(),
2120
          self.op.node_name]
2121
    return env, nl, nl
2122

    
2123
  def CheckPrereq(self):
2124
    """Check prerequisites.
2125

2126
    This only checks the instance list against the existing names.
2127

2128
    """
2129
    force = self.force = self.op.force
2130

    
2131
    if self.op.master_candidate == False:
2132
      if self.op.node_name == self.cfg.GetMasterNode():
2133
        raise errors.OpPrereqError("The master node has to be a"
2134
                                   " master candidate")
2135
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2136
      node_info = self.cfg.GetAllNodesInfo().values()
2137
      num_candidates = len([node for node in node_info
2138
                            if node.master_candidate])
2139
      if num_candidates <= cp_size:
2140
        msg = ("Not enough master candidates (desired"
2141
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2142
        if force:
2143
          self.LogWarning(msg)
2144
        else:
2145
          raise errors.OpPrereqError(msg)
2146

    
2147
    return
2148

    
2149
  def Exec(self, feedback_fn):
2150
    """Modifies a node.
2151

2152
    """
2153
    node = self.cfg.GetNodeInfo(self.op.node_name)
2154

    
2155
    result = []
2156

    
2157
    if self.op.master_candidate is not None:
2158
      node.master_candidate = self.op.master_candidate
2159
      result.append(("master_candidate", str(self.op.master_candidate)))
2160
      if self.op.master_candidate == False:
2161
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2162
        if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2163
            or len(rrc.data) != 2):
2164
          self.LogWarning("Node rpc error: %s" % rrc.error)
2165
        elif not rrc.data[0]:
2166
          self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2167

    
2168
    # this will trigger configuration file update, if needed
2169
    self.cfg.Update(node)
2170
    # this will trigger job queue propagation or cleanup
2171
    if self.op.node_name != self.cfg.GetMasterNode():
2172
      self.context.ReaddNode(node)
2173

    
2174
    return result
2175

    
2176

    
2177
class LUQueryClusterInfo(NoHooksLU):
2178
  """Query cluster configuration.
2179

2180
  """
2181
  _OP_REQP = []
2182
  REQ_BGL = False
2183

    
2184
  def ExpandNames(self):
2185
    self.needed_locks = {}
2186

    
2187
  def CheckPrereq(self):
2188
    """No prerequsites needed for this LU.
2189

2190
    """
2191
    pass
2192

    
2193
  def Exec(self, feedback_fn):
2194
    """Return cluster config.
2195

2196
    """
2197
    cluster = self.cfg.GetClusterInfo()
2198
    result = {
2199
      "software_version": constants.RELEASE_VERSION,
2200
      "protocol_version": constants.PROTOCOL_VERSION,
2201
      "config_version": constants.CONFIG_VERSION,
2202
      "os_api_version": constants.OS_API_VERSION,
2203
      "export_version": constants.EXPORT_VERSION,
2204
      "architecture": (platform.architecture()[0], platform.machine()),
2205
      "name": cluster.cluster_name,
2206
      "master": cluster.master_node,
2207
      "default_hypervisor": cluster.default_hypervisor,
2208
      "enabled_hypervisors": cluster.enabled_hypervisors,
2209
      "hvparams": cluster.hvparams,
2210
      "beparams": cluster.beparams,
2211
      "candidate_pool_size": cluster.candidate_pool_size,
2212
      }
2213

    
2214
    return result
2215

    
2216

    
2217
class LUQueryConfigValues(NoHooksLU):
2218
  """Return configuration values.
2219

2220
  """
2221
  _OP_REQP = []
2222
  REQ_BGL = False
2223
  _FIELDS_DYNAMIC = utils.FieldSet()
2224
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2225

    
2226
  def ExpandNames(self):
2227
    self.needed_locks = {}
2228

    
2229
    _CheckOutputFields(static=self._FIELDS_STATIC,
2230
                       dynamic=self._FIELDS_DYNAMIC,
2231
                       selected=self.op.output_fields)
2232

    
2233
  def CheckPrereq(self):
2234
    """No prerequisites.
2235

2236
    """
2237
    pass
2238

    
2239
  def Exec(self, feedback_fn):
2240
    """Dump a representation of the cluster config to the standard output.
2241

2242
    """
2243
    values = []
2244
    for field in self.op.output_fields:
2245
      if field == "cluster_name":
2246
        entry = self.cfg.GetClusterName()
2247
      elif field == "master_node":
2248
        entry = self.cfg.GetMasterNode()
2249
      elif field == "drain_flag":
2250
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2251
      else:
2252
        raise errors.ParameterError(field)
2253
      values.append(entry)
2254
    return values
2255

    
2256

    
2257
class LUActivateInstanceDisks(NoHooksLU):
2258
  """Bring up an instance's disks.
2259

2260
  """
2261
  _OP_REQP = ["instance_name"]
2262
  REQ_BGL = False
2263

    
2264
  def ExpandNames(self):
2265
    self._ExpandAndLockInstance()
2266
    self.needed_locks[locking.LEVEL_NODE] = []
2267
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2268

    
2269
  def DeclareLocks(self, level):
2270
    if level == locking.LEVEL_NODE:
2271
      self._LockInstancesNodes()
2272

    
2273
  def CheckPrereq(self):
2274
    """Check prerequisites.
2275

2276
    This checks that the instance is in the cluster.
2277

2278
    """
2279
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2280
    assert self.instance is not None, \
2281
      "Cannot retrieve locked instance %s" % self.op.instance_name
2282

    
2283
  def Exec(self, feedback_fn):
2284
    """Activate the disks.
2285

2286
    """
2287
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2288
    if not disks_ok:
2289
      raise errors.OpExecError("Cannot activate block devices")
2290

    
2291
    return disks_info
2292

    
2293

    
2294
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2295
  """Prepare the block devices for an instance.
2296

2297
  This sets up the block devices on all nodes.
2298

2299
  @type lu: L{LogicalUnit}
2300
  @param lu: the logical unit on whose behalf we execute
2301
  @type instance: L{objects.Instance}
2302
  @param instance: the instance for whose disks we assemble
2303
  @type ignore_secondaries: boolean
2304
  @param ignore_secondaries: if true, errors on secondary nodes
2305
      won't result in an error return from the function
2306
  @return: False if the operation failed, otherwise a list of
2307
      (host, instance_visible_name, node_visible_name)
2308
      with the mapping from node devices to instance devices
2309

2310
  """
2311
  device_info = []
2312
  disks_ok = True
2313
  iname = instance.name
2314
  # With the two passes mechanism we try to reduce the window of
2315
  # opportunity for the race condition of switching DRBD to primary
2316
  # before handshaking occured, but we do not eliminate it
2317

    
2318
  # The proper fix would be to wait (with some limits) until the
2319
  # connection has been made and drbd transitions from WFConnection
2320
  # into any other network-connected state (Connected, SyncTarget,
2321
  # SyncSource, etc.)
2322

    
2323
  # 1st pass, assemble on all nodes in secondary mode
2324
  for inst_disk in instance.disks:
2325
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2326
      lu.cfg.SetDiskID(node_disk, node)
2327
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2328
      if result.failed or not result:
2329
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2330
                           " (is_primary=False, pass=1)",
2331
                           inst_disk.iv_name, node)
2332
        if not ignore_secondaries:
2333
          disks_ok = False
2334

    
2335
  # FIXME: race condition on drbd migration to primary
2336

    
2337
  # 2nd pass, do only the primary node
2338
  for inst_disk in instance.disks:
2339
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2340
      if node != instance.primary_node:
2341
        continue
2342
      lu.cfg.SetDiskID(node_disk, node)
2343
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2344
      if result.failed or not result:
2345
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2346
                           " (is_primary=True, pass=2)",
2347
                           inst_disk.iv_name, node)
2348
        disks_ok = False
2349
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2350

    
2351
  # leave the disks configured for the primary node
2352
  # this is a workaround that would be fixed better by
2353
  # improving the logical/physical id handling
2354
  for disk in instance.disks:
2355
    lu.cfg.SetDiskID(disk, instance.primary_node)
2356

    
2357
  return disks_ok, device_info
2358

    
2359

    
2360
def _StartInstanceDisks(lu, instance, force):
2361
  """Start the disks of an instance.
2362

2363
  """
2364
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2365
                                           ignore_secondaries=force)
2366
  if not disks_ok:
2367
    _ShutdownInstanceDisks(lu, instance)
2368
    if force is not None and not force:
2369
      lu.proc.LogWarning("", hint="If the message above refers to a"
2370
                         " secondary node,"
2371
                         " you can retry the operation using '--force'.")
2372
    raise errors.OpExecError("Disk consistency error")
2373

    
2374

    
2375
class LUDeactivateInstanceDisks(NoHooksLU):
2376
  """Shutdown an instance's disks.
2377

2378
  """
2379
  _OP_REQP = ["instance_name"]
2380
  REQ_BGL = False
2381

    
2382
  def ExpandNames(self):
2383
    self._ExpandAndLockInstance()
2384
    self.needed_locks[locking.LEVEL_NODE] = []
2385
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2386

    
2387
  def DeclareLocks(self, level):
2388
    if level == locking.LEVEL_NODE:
2389
      self._LockInstancesNodes()
2390

    
2391
  def CheckPrereq(self):
2392
    """Check prerequisites.
2393

2394
    This checks that the instance is in the cluster.
2395

2396
    """
2397
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2398
    assert self.instance is not None, \
2399
      "Cannot retrieve locked instance %s" % self.op.instance_name
2400

    
2401
  def Exec(self, feedback_fn):
2402
    """Deactivate the disks
2403

2404
    """
2405
    instance = self.instance
2406
    _SafeShutdownInstanceDisks(self, instance)
2407

    
2408

    
2409
def _SafeShutdownInstanceDisks(lu, instance):
2410
  """Shutdown block devices of an instance.
2411

2412
  This function checks if an instance is running, before calling
2413
  _ShutdownInstanceDisks.
2414

2415
  """
2416
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2417
                                      [instance.hypervisor])
2418
  ins_l = ins_l[instance.primary_node]
2419
  if ins_l.failed or not isinstance(ins_l.data, list):
2420
    raise errors.OpExecError("Can't contact node '%s'" %
2421
                             instance.primary_node)
2422

    
2423
  if instance.name in ins_l.data:
2424
    raise errors.OpExecError("Instance is running, can't shutdown"
2425
                             " block devices.")
2426

    
2427
  _ShutdownInstanceDisks(lu, instance)
2428

    
2429

    
2430
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2431
  """Shutdown block devices of an instance.
2432

2433
  This does the shutdown on all nodes of the instance.
2434

2435
  If the ignore_primary is false, errors on the primary node are
2436
  ignored.
2437

2438
  """
2439
  result = True
2440
  for disk in instance.disks:
2441
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2442
      lu.cfg.SetDiskID(top_disk, node)
2443
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2444
      if result.failed or not result.data:
2445
        logging.error("Could not shutdown block device %s on node %s",
2446
                      disk.iv_name, node)
2447
        if not ignore_primary or node != instance.primary_node:
2448
          result = False
2449
  return result
2450

    
2451

    
2452
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2453
  """Checks if a node has enough free memory.
2454

2455
  This function check if a given node has the needed amount of free
2456
  memory. In case the node has less memory or we cannot get the
2457
  information from the node, this function raise an OpPrereqError
2458
  exception.
2459

2460
  @type lu: C{LogicalUnit}
2461
  @param lu: a logical unit from which we get configuration data
2462
  @type node: C{str}
2463
  @param node: the node to check
2464
  @type reason: C{str}
2465
  @param reason: string to use in the error message
2466
  @type requested: C{int}
2467
  @param requested: the amount of memory in MiB to check for
2468
  @type hypervisor: C{str}
2469
  @param hypervisor: the hypervisor to ask for memory stats
2470
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2471
      we cannot check the node
2472

2473
  """
2474
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2475
  nodeinfo[node].Raise()
2476
  free_mem = nodeinfo[node].data.get('memory_free')
2477
  if not isinstance(free_mem, int):
2478
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2479
                             " was '%s'" % (node, free_mem))
2480
  if requested > free_mem:
2481
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2482
                             " needed %s MiB, available %s MiB" %
2483
                             (node, reason, requested, free_mem))
2484

    
2485

    
2486
class LUStartupInstance(LogicalUnit):
2487
  """Starts an instance.
2488

2489
  """
2490
  HPATH = "instance-start"
2491
  HTYPE = constants.HTYPE_INSTANCE
2492
  _OP_REQP = ["instance_name", "force"]
2493
  REQ_BGL = False
2494

    
2495
  def ExpandNames(self):
2496
    self._ExpandAndLockInstance()
2497

    
2498
  def BuildHooksEnv(self):
2499
    """Build hooks env.
2500

2501
    This runs on master, primary and secondary nodes of the instance.
2502

2503
    """
2504
    env = {
2505
      "FORCE": self.op.force,
2506
      }
2507
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2508
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2509
          list(self.instance.secondary_nodes))
2510
    return env, nl, nl
2511

    
2512
  def CheckPrereq(self):
2513
    """Check prerequisites.
2514

2515
    This checks that the instance is in the cluster.
2516

2517
    """
2518
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2519
    assert self.instance is not None, \
2520
      "Cannot retrieve locked instance %s" % self.op.instance_name
2521

    
2522
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2523
    # check bridges existance
2524
    _CheckInstanceBridgesExist(self, instance)
2525

    
2526
    _CheckNodeFreeMemory(self, instance.primary_node,
2527
                         "starting instance %s" % instance.name,
2528
                         bep[constants.BE_MEMORY], instance.hypervisor)
2529

    
2530
  def Exec(self, feedback_fn):
2531
    """Start the instance.
2532

2533
    """
2534
    instance = self.instance
2535
    force = self.op.force
2536
    extra_args = getattr(self.op, "extra_args", "")
2537

    
2538
    self.cfg.MarkInstanceUp(instance.name)
2539

    
2540
    node_current = instance.primary_node
2541

    
2542
    _StartInstanceDisks(self, instance, force)
2543

    
2544
    result = self.rpc.call_instance_start(node_current, instance, extra_args)
2545
    if result.failed or not result.data:
2546
      _ShutdownInstanceDisks(self, instance)
2547
      raise errors.OpExecError("Could not start instance")
2548

    
2549

    
2550
class LURebootInstance(LogicalUnit):
2551
  """Reboot an instance.
2552

2553
  """
2554
  HPATH = "instance-reboot"
2555
  HTYPE = constants.HTYPE_INSTANCE
2556
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2557
  REQ_BGL = False
2558

    
2559
  def ExpandNames(self):
2560
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2561
                                   constants.INSTANCE_REBOOT_HARD,
2562
                                   constants.INSTANCE_REBOOT_FULL]:
2563
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2564
                                  (constants.INSTANCE_REBOOT_SOFT,
2565
                                   constants.INSTANCE_REBOOT_HARD,
2566
                                   constants.INSTANCE_REBOOT_FULL))
2567
    self._ExpandAndLockInstance()
2568

    
2569
  def BuildHooksEnv(self):
2570
    """Build hooks env.
2571

2572
    This runs on master, primary and secondary nodes of the instance.
2573

2574
    """
2575
    env = {
2576
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2577
      }
2578
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2579
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2580
          list(self.instance.secondary_nodes))
2581
    return env, nl, nl
2582

    
2583
  def CheckPrereq(self):
2584
    """Check prerequisites.
2585

2586
    This checks that the instance is in the cluster.
2587

2588
    """
2589
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2590
    assert self.instance is not None, \
2591
      "Cannot retrieve locked instance %s" % self.op.instance_name
2592

    
2593
    # check bridges existance
2594
    _CheckInstanceBridgesExist(self, instance)
2595

    
2596
  def Exec(self, feedback_fn):
2597
    """Reboot the instance.
2598

2599
    """
2600
    instance = self.instance
2601
    ignore_secondaries = self.op.ignore_secondaries
2602
    reboot_type = self.op.reboot_type
2603
    extra_args = getattr(self.op, "extra_args", "")
2604

    
2605
    node_current = instance.primary_node
2606

    
2607
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2608
                       constants.INSTANCE_REBOOT_HARD]:
2609
      result = self.rpc.call_instance_reboot(node_current, instance,
2610
                                             reboot_type, extra_args)
2611
      if result.failed or not result.data:
2612
        raise errors.OpExecError("Could not reboot instance")
2613
    else:
2614
      if not self.rpc.call_instance_shutdown(node_current, instance):
2615
        raise errors.OpExecError("could not shutdown instance for full reboot")
2616
      _ShutdownInstanceDisks(self, instance)
2617
      _StartInstanceDisks(self, instance, ignore_secondaries)
2618
      result = self.rpc.call_instance_start(node_current, instance, extra_args)
2619
      if result.failed or not result.data:
2620
        _ShutdownInstanceDisks(self, instance)
2621
        raise errors.OpExecError("Could not start instance for full reboot")
2622

    
2623
    self.cfg.MarkInstanceUp(instance.name)
2624

    
2625

    
2626
class LUShutdownInstance(LogicalUnit):
2627
  """Shutdown an instance.
2628

2629
  """
2630
  HPATH = "instance-stop"
2631
  HTYPE = constants.HTYPE_INSTANCE
2632
  _OP_REQP = ["instance_name"]
2633
  REQ_BGL = False
2634

    
2635
  def ExpandNames(self):
2636
    self._ExpandAndLockInstance()
2637

    
2638
  def BuildHooksEnv(self):
2639
    """Build hooks env.
2640

2641
    This runs on master, primary and secondary nodes of the instance.
2642

2643
    """
2644
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2645
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2646
          list(self.instance.secondary_nodes))
2647
    return env, nl, nl
2648

    
2649
  def CheckPrereq(self):
2650
    """Check prerequisites.
2651

2652
    This checks that the instance is in the cluster.
2653

2654
    """
2655
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2656
    assert self.instance is not None, \
2657
      "Cannot retrieve locked instance %s" % self.op.instance_name
2658

    
2659
  def Exec(self, feedback_fn):
2660
    """Shutdown the instance.
2661

2662
    """
2663
    instance = self.instance
2664
    node_current = instance.primary_node
2665
    self.cfg.MarkInstanceDown(instance.name)
2666
    result = self.rpc.call_instance_shutdown(node_current, instance)
2667
    if result.failed or not result.data:
2668
      self.proc.LogWarning("Could not shutdown instance")
2669

    
2670
    _ShutdownInstanceDisks(self, instance)
2671

    
2672

    
2673
class LUReinstallInstance(LogicalUnit):
2674
  """Reinstall an instance.
2675

2676
  """
2677
  HPATH = "instance-reinstall"
2678
  HTYPE = constants.HTYPE_INSTANCE
2679
  _OP_REQP = ["instance_name"]
2680
  REQ_BGL = False
2681

    
2682
  def ExpandNames(self):
2683
    self._ExpandAndLockInstance()
2684

    
2685
  def BuildHooksEnv(self):
2686
    """Build hooks env.
2687

2688
    This runs on master, primary and secondary nodes of the instance.
2689

2690
    """
2691
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2692
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2693
          list(self.instance.secondary_nodes))
2694
    return env, nl, nl
2695

    
2696
  def CheckPrereq(self):
2697
    """Check prerequisites.
2698

2699
    This checks that the instance is in the cluster and is not running.
2700

2701
    """
2702
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2703
    assert instance is not None, \
2704
      "Cannot retrieve locked instance %s" % self.op.instance_name
2705

    
2706
    if instance.disk_template == constants.DT_DISKLESS:
2707
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2708
                                 self.op.instance_name)
2709
    if instance.status != "down":
2710
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2711
                                 self.op.instance_name)
2712
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2713
                                              instance.name,
2714
                                              instance.hypervisor)
2715
    if remote_info.failed or remote_info.data:
2716
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2717
                                 (self.op.instance_name,
2718
                                  instance.primary_node))
2719

    
2720
    self.op.os_type = getattr(self.op, "os_type", None)
2721
    if self.op.os_type is not None:
2722
      # OS verification
2723
      pnode = self.cfg.GetNodeInfo(
2724
        self.cfg.ExpandNodeName(instance.primary_node))
2725
      if pnode is None:
2726
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2727
                                   self.op.pnode)
2728
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2729
      result.Raise()
2730
      if not isinstance(result.data, objects.OS):
2731
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2732
                                   " primary node"  % self.op.os_type)
2733

    
2734
    self.instance = instance
2735

    
2736
  def Exec(self, feedback_fn):
2737
    """Reinstall the instance.
2738

2739
    """
2740
    inst = self.instance
2741

    
2742
    if self.op.os_type is not None:
2743
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2744
      inst.os = self.op.os_type
2745
      self.cfg.Update(inst)
2746

    
2747
    _StartInstanceDisks(self, inst, None)
2748
    try:
2749
      feedback_fn("Running the instance OS create scripts...")
2750
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2751
      result.Raise()
2752
      if not result.data:
2753
        raise errors.OpExecError("Could not install OS for instance %s"
2754
                                 " on node %s" %
2755
                                 (inst.name, inst.primary_node))
2756
    finally:
2757
      _ShutdownInstanceDisks(self, inst)
2758

    
2759

    
2760
class LURenameInstance(LogicalUnit):
2761
  """Rename an instance.
2762

2763
  """
2764
  HPATH = "instance-rename"
2765
  HTYPE = constants.HTYPE_INSTANCE
2766
  _OP_REQP = ["instance_name", "new_name"]
2767

    
2768
  def BuildHooksEnv(self):
2769
    """Build hooks env.
2770

2771
    This runs on master, primary and secondary nodes of the instance.
2772

2773
    """
2774
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2775
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2776
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2777
          list(self.instance.secondary_nodes))
2778
    return env, nl, nl
2779

    
2780
  def CheckPrereq(self):
2781
    """Check prerequisites.
2782

2783
    This checks that the instance is in the cluster and is not running.
2784

2785
    """
2786
    instance = self.cfg.GetInstanceInfo(
2787
      self.cfg.ExpandInstanceName(self.op.instance_name))
2788
    if instance is None:
2789
      raise errors.OpPrereqError("Instance '%s' not known" %
2790
                                 self.op.instance_name)
2791
    if instance.status != "down":
2792
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2793
                                 self.op.instance_name)
2794
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2795
                                              instance.name,
2796
                                              instance.hypervisor)
2797
    remote_info.Raise()
2798
    if remote_info.data:
2799
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2800
                                 (self.op.instance_name,
2801
                                  instance.primary_node))
2802
    self.instance = instance
2803

    
2804
    # new name verification
2805
    name_info = utils.HostInfo(self.op.new_name)
2806

    
2807
    self.op.new_name = new_name = name_info.name
2808
    instance_list = self.cfg.GetInstanceList()
2809
    if new_name in instance_list:
2810
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2811
                                 new_name)
2812

    
2813
    if not getattr(self.op, "ignore_ip", False):
2814
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2815
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2816
                                   (name_info.ip, new_name))
2817

    
2818

    
2819
  def Exec(self, feedback_fn):
2820
    """Reinstall the instance.
2821

2822
    """
2823
    inst = self.instance
2824
    old_name = inst.name
2825

    
2826
    if inst.disk_template == constants.DT_FILE:
2827
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2828

    
2829
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2830
    # Change the instance lock. This is definitely safe while we hold the BGL
2831
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2832
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2833

    
2834
    # re-read the instance from the configuration after rename
2835
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2836

    
2837
    if inst.disk_template == constants.DT_FILE:
2838
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2839
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2840
                                                     old_file_storage_dir,
2841
                                                     new_file_storage_dir)
2842
      result.Raise()
2843
      if not result.data:
2844
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2845
                                 " directory '%s' to '%s' (but the instance"
2846
                                 " has been renamed in Ganeti)" % (
2847
                                 inst.primary_node, old_file_storage_dir,
2848
                                 new_file_storage_dir))
2849

    
2850
      if not result.data[0]:
2851
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2852
                                 " (but the instance has been renamed in"
2853
                                 " Ganeti)" % (old_file_storage_dir,
2854
                                               new_file_storage_dir))
2855

    
2856
    _StartInstanceDisks(self, inst, None)
2857
    try:
2858
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2859
                                                 old_name)
2860
      if result.failed or not result.data:
2861
        msg = ("Could not run OS rename script for instance %s on node %s"
2862
               " (but the instance has been renamed in Ganeti)" %
2863
               (inst.name, inst.primary_node))
2864
        self.proc.LogWarning(msg)
2865
    finally:
2866
      _ShutdownInstanceDisks(self, inst)
2867

    
2868

    
2869
class LURemoveInstance(LogicalUnit):
2870
  """Remove an instance.
2871

2872
  """
2873
  HPATH = "instance-remove"
2874
  HTYPE = constants.HTYPE_INSTANCE
2875
  _OP_REQP = ["instance_name", "ignore_failures"]
2876
  REQ_BGL = False
2877

    
2878
  def ExpandNames(self):
2879
    self._ExpandAndLockInstance()
2880
    self.needed_locks[locking.LEVEL_NODE] = []
2881
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2882

    
2883
  def DeclareLocks(self, level):
2884
    if level == locking.LEVEL_NODE:
2885
      self._LockInstancesNodes()
2886

    
2887
  def BuildHooksEnv(self):
2888
    """Build hooks env.
2889

2890
    This runs on master, primary and secondary nodes of the instance.
2891

2892
    """
2893
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2894
    nl = [self.cfg.GetMasterNode()]
2895
    return env, nl, nl
2896

    
2897
  def CheckPrereq(self):
2898
    """Check prerequisites.
2899

2900
    This checks that the instance is in the cluster.
2901

2902
    """
2903
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2904
    assert self.instance is not None, \
2905
      "Cannot retrieve locked instance %s" % self.op.instance_name
2906

    
2907
  def Exec(self, feedback_fn):
2908
    """Remove the instance.
2909

2910
    """
2911
    instance = self.instance
2912
    logging.info("Shutting down instance %s on node %s",
2913
                 instance.name, instance.primary_node)
2914

    
2915
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2916
    if result.failed or not result.data:
2917
      if self.op.ignore_failures:
2918
        feedback_fn("Warning: can't shutdown instance")
2919
      else:
2920
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2921
                                 (instance.name, instance.primary_node))
2922

    
2923
    logging.info("Removing block devices for instance %s", instance.name)
2924

    
2925
    if not _RemoveDisks(self, instance):
2926
      if self.op.ignore_failures:
2927
        feedback_fn("Warning: can't remove instance's disks")
2928
      else:
2929
        raise errors.OpExecError("Can't remove instance's disks")
2930

    
2931
    logging.info("Removing instance %s out of cluster config", instance.name)
2932

    
2933
    self.cfg.RemoveInstance(instance.name)
2934
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2935

    
2936

    
2937
class LUQueryInstances(NoHooksLU):
2938
  """Logical unit for querying instances.
2939

2940
  """
2941
  _OP_REQP = ["output_fields", "names"]
2942
  REQ_BGL = False
2943
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2944
                                    "admin_state", "admin_ram",
2945
                                    "disk_template", "ip", "mac", "bridge",
2946
                                    "sda_size", "sdb_size", "vcpus", "tags",
2947
                                    "network_port", "beparams",
2948
                                    "(disk).(size)/([0-9]+)",
2949
                                    "(disk).(sizes)",
2950
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2951
                                    "(nic).(macs|ips|bridges)",
2952
                                    "(disk|nic).(count)",
2953
                                    "serial_no", "hypervisor", "hvparams",] +
2954
                                  ["hv/%s" % name
2955
                                   for name in constants.HVS_PARAMETERS] +
2956
                                  ["be/%s" % name
2957
                                   for name in constants.BES_PARAMETERS])
2958
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2959

    
2960

    
2961
  def ExpandNames(self):
2962
    _CheckOutputFields(static=self._FIELDS_STATIC,
2963
                       dynamic=self._FIELDS_DYNAMIC,
2964
                       selected=self.op.output_fields)
2965

    
2966
    self.needed_locks = {}
2967
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2968
    self.share_locks[locking.LEVEL_NODE] = 1
2969

    
2970
    if self.op.names:
2971
      self.wanted = _GetWantedInstances(self, self.op.names)
2972
    else:
2973
      self.wanted = locking.ALL_SET
2974

    
2975
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2976
    if self.do_locking:
2977
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2978
      self.needed_locks[locking.LEVEL_NODE] = []
2979
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2980

    
2981
  def DeclareLocks(self, level):
2982
    if level == locking.LEVEL_NODE and self.do_locking:
2983
      self._LockInstancesNodes()
2984

    
2985
  def CheckPrereq(self):
2986
    """Check prerequisites.
2987

2988
    """
2989
    pass
2990

    
2991
  def Exec(self, feedback_fn):
2992
    """Computes the list of nodes and their attributes.
2993

2994
    """
2995
    all_info = self.cfg.GetAllInstancesInfo()
2996
    if self.do_locking:
2997
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2998
    elif self.wanted != locking.ALL_SET:
2999
      instance_names = self.wanted
3000
      missing = set(instance_names).difference(all_info.keys())
3001
      if missing:
3002
        raise errors.OpExecError(
3003
          "Some instances were removed before retrieving their data: %s"
3004
          % missing)
3005
    else:
3006
      instance_names = all_info.keys()
3007

    
3008
    instance_names = utils.NiceSort(instance_names)
3009
    instance_list = [all_info[iname] for iname in instance_names]
3010

    
3011
    # begin data gathering
3012

    
3013
    nodes = frozenset([inst.primary_node for inst in instance_list])
3014
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3015

    
3016
    bad_nodes = []
3017
    off_nodes = []
3018
    if self.do_locking:
3019
      live_data = {}
3020
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3021
      for name in nodes:
3022
        result = node_data[name]
3023
        if result.offline:
3024
          # offline nodes will be in both lists
3025
          off_nodes.append(name)
3026
        if result.failed:
3027
          bad_nodes.append(name)
3028
        else:
3029
          if result.data:
3030
            live_data.update(result.data)
3031
            # else no instance is alive
3032
    else:
3033
      live_data = dict([(name, {}) for name in instance_names])
3034

    
3035
    # end data gathering
3036

    
3037
    HVPREFIX = "hv/"
3038
    BEPREFIX = "be/"
3039
    output = []
3040
    for instance in instance_list:
3041
      iout = []
3042
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3043
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3044
      for field in self.op.output_fields:
3045
        st_match = self._FIELDS_STATIC.Matches(field)
3046
        if field == "name":
3047
          val = instance.name
3048
        elif field == "os":
3049
          val = instance.os
3050
        elif field == "pnode":
3051
          val = instance.primary_node
3052
        elif field == "snodes":
3053
          val = list(instance.secondary_nodes)
3054
        elif field == "admin_state":
3055
          val = (instance.status != "down")
3056
        elif field == "oper_state":
3057
          if instance.primary_node in bad_nodes:
3058
            val = None
3059
          else:
3060
            val = bool(live_data.get(instance.name))
3061
        elif field == "status":
3062
          if instance.primary_node in off_nodes:
3063
            val = "ERROR_nodeoffline"
3064
          elif instance.primary_node in bad_nodes:
3065
            val = "ERROR_nodedown"
3066
          else:
3067
            running = bool(live_data.get(instance.name))
3068
            if running:
3069
              if instance.status != "down":
3070
                val = "running"
3071
              else:
3072
                val = "ERROR_up"
3073
            else:
3074
              if instance.status != "down":
3075
                val = "ERROR_down"
3076
              else:
3077
                val = "ADMIN_down"
3078
        elif field == "oper_ram":
3079
          if instance.primary_node in bad_nodes:
3080
            val = None
3081
          elif instance.name in live_data:
3082
            val = live_data[instance.name].get("memory", "?")
3083
          else:
3084
            val = "-"
3085
        elif field == "disk_template":
3086
          val = instance.disk_template
3087
        elif field == "ip":
3088
          val = instance.nics[0].ip
3089
        elif field == "bridge":
3090
          val = instance.nics[0].bridge
3091
        elif field == "mac":
3092
          val = instance.nics[0].mac
3093
        elif field == "sda_size" or field == "sdb_size":
3094
          idx = ord(field[2]) - ord('a')
3095
          try:
3096
            val = instance.FindDisk(idx).size
3097
          except errors.OpPrereqError:
3098
            val = None
3099
        elif field == "tags":
3100
          val = list(instance.GetTags())
3101
        elif field == "serial_no":
3102
          val = instance.serial_no
3103
        elif field == "network_port":
3104
          val = instance.network_port
3105
        elif field == "hypervisor":
3106
          val = instance.hypervisor
3107
        elif field == "hvparams":
3108
          val = i_hv
3109
        elif (field.startswith(HVPREFIX) and
3110
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3111
          val = i_hv.get(field[len(HVPREFIX):], None)
3112
        elif field == "beparams":
3113
          val = i_be
3114
        elif (field.startswith(BEPREFIX) and
3115
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3116
          val = i_be.get(field[len(BEPREFIX):], None)
3117
        elif st_match and st_match.groups():
3118
          # matches a variable list
3119
          st_groups = st_match.groups()
3120
          if st_groups and st_groups[0] == "disk":
3121
            if st_groups[1] == "count":
3122
              val = len(instance.disks)
3123
            elif st_groups[1] == "sizes":
3124
              val = [disk.size for disk in instance.disks]
3125
            elif st_groups[1] == "size":
3126
              try:
3127
                val = instance.FindDisk(st_groups[2]).size
3128
              except errors.OpPrereqError:
3129
                val = None
3130
            else:
3131
              assert False, "Unhandled disk parameter"
3132
          elif st_groups[0] == "nic":
3133
            if st_groups[1] == "count":
3134
              val = len(instance.nics)
3135
            elif st_groups[1] == "macs":
3136
              val = [nic.mac for nic in instance.nics]
3137
            elif st_groups[1] == "ips":
3138
              val = [nic.ip for nic in instance.nics]
3139
            elif st_groups[1] == "bridges":
3140
              val = [nic.bridge for nic in instance.nics]
3141
            else:
3142
              # index-based item
3143
              nic_idx = int(st_groups[2])
3144
              if nic_idx >= len(instance.nics):
3145
                val = None
3146
              else:
3147
                if st_groups[1] == "mac":
3148
                  val = instance.nics[nic_idx].mac
3149
                elif st_groups[1] == "ip":
3150
                  val = instance.nics[nic_idx].ip
3151
                elif st_groups[1] == "bridge":
3152
                  val = instance.nics[nic_idx].bridge
3153
                else:
3154
                  assert False, "Unhandled NIC parameter"
3155
          else:
3156
            assert False, "Unhandled variable parameter"
3157
        else:
3158
          raise errors.ParameterError(field)
3159
        iout.append(val)
3160
      output.append(iout)
3161

    
3162
    return output
3163

    
3164

    
3165
class LUFailoverInstance(LogicalUnit):
3166
  """Failover an instance.
3167

3168
  """
3169
  HPATH = "instance-failover"
3170
  HTYPE = constants.HTYPE_INSTANCE
3171
  _OP_REQP = ["instance_name", "ignore_consistency"]
3172
  REQ_BGL = False
3173

    
3174
  def ExpandNames(self):
3175
    self._ExpandAndLockInstance()
3176
    self.needed_locks[locking.LEVEL_NODE] = []
3177
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3178

    
3179
  def DeclareLocks(self, level):
3180
    if level == locking.LEVEL_NODE:
3181
      self._LockInstancesNodes()
3182

    
3183
  def BuildHooksEnv(self):
3184
    """Build hooks env.
3185

3186
    This runs on master, primary and secondary nodes of the instance.
3187

3188
    """
3189
    env = {
3190
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3191
      }
3192
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3193
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3194
    return env, nl, nl
3195

    
3196
  def CheckPrereq(self):
3197
    """Check prerequisites.
3198

3199
    This checks that the instance is in the cluster.
3200

3201
    """
3202
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3203
    assert self.instance is not None, \
3204
      "Cannot retrieve locked instance %s" % self.op.instance_name
3205

    
3206
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3207
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3208
      raise errors.OpPrereqError("Instance's disk layout is not"
3209
                                 " network mirrored, cannot failover.")
3210

    
3211
    secondary_nodes = instance.secondary_nodes
3212
    if not secondary_nodes:
3213
      raise errors.ProgrammerError("no secondary node but using "
3214
                                   "a mirrored disk template")
3215

    
3216
    target_node = secondary_nodes[0]
3217
    # check memory requirements on the secondary node
3218
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3219
                         instance.name, bep[constants.BE_MEMORY],
3220
                         instance.hypervisor)
3221

    
3222
    # check bridge existance
3223
    brlist = [nic.bridge for nic in instance.nics]
3224
    result = self.rpc.call_bridges_exist(target_node, brlist)
3225
    result.Raise()
3226
    if not result.data:
3227
      raise errors.OpPrereqError("One or more target bridges %s does not"
3228
                                 " exist on destination node '%s'" %
3229
                                 (brlist, target_node))
3230

    
3231
  def Exec(self, feedback_fn):
3232
    """Failover an instance.
3233

3234
    The failover is done by shutting it down on its present node and
3235
    starting it on the secondary.
3236

3237
    """
3238
    instance = self.instance
3239

    
3240
    source_node = instance.primary_node
3241
    target_node = instance.secondary_nodes[0]
3242

    
3243
    feedback_fn("* checking disk consistency between source and target")
3244
    for dev in instance.disks:
3245
      # for drbd, these are drbd over lvm
3246
      if not _CheckDiskConsistency(self, dev, target_node, False):
3247
        if instance.status == "up" and not self.op.ignore_consistency:
3248
          raise errors.OpExecError("Disk %s is degraded on target node,"
3249
                                   " aborting failover." % dev.iv_name)
3250

    
3251
    feedback_fn("* shutting down instance on source node")
3252
    logging.info("Shutting down instance %s on node %s",
3253
                 instance.name, source_node)
3254

    
3255
    result = self.rpc.call_instance_shutdown(source_node, instance)
3256
    if result.failed or not result.data:
3257
      if self.op.ignore_consistency:
3258
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3259
                             " Proceeding"
3260
                             " anyway. Please make sure node %s is down",
3261
                             instance.name, source_node, source_node)
3262
      else:
3263
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3264
                                 (instance.name, source_node))
3265

    
3266
    feedback_fn("* deactivating the instance's disks on source node")
3267
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3268
      raise errors.OpExecError("Can't shut down the instance's disks.")
3269

    
3270
    instance.primary_node = target_node
3271
    # distribute new instance config to the other nodes
3272
    self.cfg.Update(instance)
3273

    
3274
    # Only start the instance if it's marked as up
3275
    if instance.status == "up":
3276
      feedback_fn("* activating the instance's disks on target node")
3277
      logging.info("Starting instance %s on node %s",
3278
                   instance.name, target_node)
3279

    
3280
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3281
                                               ignore_secondaries=True)
3282
      if not disks_ok:
3283
        _ShutdownInstanceDisks(self, instance)
3284
        raise errors.OpExecError("Can't activate the instance's disks")
3285

    
3286
      feedback_fn("* starting the instance on the target node")
3287
      result = self.rpc.call_instance_start(target_node, instance, None)
3288
      if result.failed or not result.data:
3289
        _ShutdownInstanceDisks(self, instance)
3290
        raise errors.OpExecError("Could not start instance %s on node %s." %
3291
                                 (instance.name, target_node))
3292

    
3293

    
3294
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3295
  """Create a tree of block devices on the primary node.
3296

3297
  This always creates all devices.
3298

3299
  """
3300
  if device.children:
3301
    for child in device.children:
3302
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3303
        return False
3304

    
3305
  lu.cfg.SetDiskID(device, node)
3306
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3307
                                       instance.name, True, info)
3308
  if new_id.failed or not new_id.data:
3309
    return False
3310
  if device.physical_id is None:
3311
    device.physical_id = new_id
3312
  return True
3313

    
3314

    
3315
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3316
  """Create a tree of block devices on a secondary node.
3317

3318
  If this device type has to be created on secondaries, create it and
3319
  all its children.
3320

3321
  If not, just recurse to children keeping the same 'force' value.
3322

3323
  """
3324
  if device.CreateOnSecondary():
3325
    force = True
3326
  if device.children:
3327
    for child in device.children:
3328
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3329
                                        child, force, info):
3330
        return False
3331

    
3332
  if not force:
3333
    return True
3334
  lu.cfg.SetDiskID(device, node)
3335
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3336
                                       instance.name, False, info)
3337
  if new_id.failed or not new_id.data:
3338
    return False
3339
  if device.physical_id is None:
3340
    device.physical_id = new_id
3341
  return True
3342

    
3343

    
3344
def _GenerateUniqueNames(lu, exts):
3345
  """Generate a suitable LV name.
3346

3347
  This will generate a logical volume name for the given instance.
3348

3349
  """
3350
  results = []
3351
  for val in exts:
3352
    new_id = lu.cfg.GenerateUniqueID()
3353
    results.append("%s%s" % (new_id, val))
3354
  return results
3355

    
3356

    
3357
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3358
                         p_minor, s_minor):
3359
  """Generate a drbd8 device complete with its children.
3360

3361
  """
3362
  port = lu.cfg.AllocatePort()
3363
  vgname = lu.cfg.GetVGName()
3364
  shared_secret = lu.cfg.GenerateDRBDSecret()
3365
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3366
                          logical_id=(vgname, names[0]))
3367
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3368
                          logical_id=(vgname, names[1]))
3369
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3370
                          logical_id=(primary, secondary, port,
3371
                                      p_minor, s_minor,
3372
                                      shared_secret),
3373
                          children=[dev_data, dev_meta],
3374
                          iv_name=iv_name)
3375
  return drbd_dev
3376

    
3377

    
3378
def _GenerateDiskTemplate(lu, template_name,
3379
                          instance_name, primary_node,
3380
                          secondary_nodes, disk_info,
3381
                          file_storage_dir, file_driver,
3382
                          base_index):
3383
  """Generate the entire disk layout for a given template type.
3384

3385
  """
3386
  #TODO: compute space requirements
3387

    
3388
  vgname = lu.cfg.GetVGName()
3389
  disk_count = len(disk_info)
3390
  disks = []
3391
  if template_name == constants.DT_DISKLESS:
3392
    pass
3393
  elif template_name == constants.DT_PLAIN:
3394
    if len(secondary_nodes) != 0:
3395
      raise errors.ProgrammerError("Wrong template configuration")
3396

    
3397
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3398
                                      for i in range(disk_count)])
3399
    for idx, disk in enumerate(disk_info):
3400
      disk_index = idx + base_index
3401
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3402
                              logical_id=(vgname, names[idx]),
3403
                              iv_name="disk/%d" % disk_index)
3404
      disks.append(disk_dev)
3405
  elif template_name == constants.DT_DRBD8:
3406
    if len(secondary_nodes) != 1:
3407
      raise errors.ProgrammerError("Wrong template configuration")
3408
    remote_node = secondary_nodes[0]
3409
    minors = lu.cfg.AllocateDRBDMinor(
3410
      [primary_node, remote_node] * len(disk_info), instance_name)
3411

    
3412
    names = _GenerateUniqueNames(lu,
3413
                                 [".disk%d_%s" % (i, s)
3414
                                  for i in range(disk_count)
3415
                                  for s in ("data", "meta")
3416
                                  ])
3417
    for idx, disk in enumerate(disk_info):
3418
      disk_index = idx + base_index
3419
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3420
                                      disk["size"], names[idx*2:idx*2+2],
3421
                                      "disk/%d" % disk_index,
3422
                                      minors[idx*2], minors[idx*2+1])
3423
      disks.append(disk_dev)
3424
  elif template_name == constants.DT_FILE:
3425
    if len(secondary_nodes) != 0:
3426
      raise errors.ProgrammerError("Wrong template configuration")
3427

    
3428
    for idx, disk in enumerate(disk_info):
3429
      disk_index = idx + base_index
3430
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3431
                              iv_name="disk/%d" % disk_index,
3432
                              logical_id=(file_driver,
3433
                                          "%s/disk%d" % (file_storage_dir,
3434
                                                         idx)))
3435
      disks.append(disk_dev)
3436
  else:
3437
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3438
  return disks
3439

    
3440

    
3441
def _GetInstanceInfoText(instance):
3442
  """Compute that text that should be added to the disk's metadata.
3443

3444
  """
3445
  return "originstname+%s" % instance.name
3446

    
3447

    
3448
def _CreateDisks(lu, instance):
3449
  """Create all disks for an instance.
3450

3451
  This abstracts away some work from AddInstance.
3452

3453
  @type lu: L{LogicalUnit}
3454
  @param lu: the logical unit on whose behalf we execute
3455
  @type instance: L{objects.Instance}
3456
  @param instance: the instance whose disks we should create
3457
  @rtype: boolean
3458
  @return: the success of the creation
3459

3460
  """
3461
  info = _GetInstanceInfoText(instance)
3462

    
3463
  if instance.disk_template == constants.DT_FILE:
3464
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3465
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3466
                                                 file_storage_dir)
3467

    
3468
    if result.failed or not result.data:
3469
      logging.error("Could not connect to node '%s'", instance.primary_node)
3470
      return False
3471

    
3472
    if not result.data[0]:
3473
      logging.error("Failed to create directory '%s'", file_storage_dir)
3474
      return False
3475

    
3476
  # Note: this needs to be kept in sync with adding of disks in
3477
  # LUSetInstanceParams
3478
  for device in instance.disks:
3479
    logging.info("Creating volume %s for instance %s",
3480
                 device.iv_name, instance.name)
3481
    #HARDCODE
3482
    for secondary_node in instance.secondary_nodes:
3483
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3484
                                        device, False, info):
3485
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3486
                      device.iv_name, device, secondary_node)
3487
        return False
3488
    #HARDCODE
3489
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3490
                                    instance, device, info):
3491
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3492
      return False
3493

    
3494
  return True
3495

    
3496

    
3497
def _RemoveDisks(lu, instance):
3498
  """Remove all disks for an instance.
3499

3500
  This abstracts away some work from `AddInstance()` and
3501
  `RemoveInstance()`. Note that in case some of the devices couldn't
3502
  be removed, the removal will continue with the other ones (compare
3503
  with `_CreateDisks()`).
3504

3505
  @type lu: L{LogicalUnit}
3506
  @param lu: the logical unit on whose behalf we execute
3507
  @type instance: L{objects.Instance}
3508
  @param instance: the instance whose disks we should remove
3509
  @rtype: boolean
3510
  @return: the success of the removal
3511

3512
  """
3513
  logging.info("Removing block devices for instance %s", instance.name)
3514

    
3515
  result = True
3516
  for device in instance.disks:
3517
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3518
      lu.cfg.SetDiskID(disk, node)
3519
      result = lu.rpc.call_blockdev_remove(node, disk)
3520
      if result.failed or not result.data:
3521
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3522
                           " continuing anyway", device.iv_name, node)
3523
        result = False
3524

    
3525
  if instance.disk_template == constants.DT_FILE:
3526
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3527
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3528
                                                 file_storage_dir)
3529
    if result.failed or not result.data:
3530
      logging.error("Could not remove directory '%s'", file_storage_dir)
3531
      result = False
3532

    
3533
  return result
3534

    
3535

    
3536
def _ComputeDiskSize(disk_template, disks):
3537
  """Compute disk size requirements in the volume group
3538

3539
  """
3540
  # Required free disk space as a function of disk and swap space
3541
  req_size_dict = {
3542
    constants.DT_DISKLESS: None,
3543
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3544
    # 128 MB are added for drbd metadata for each disk
3545
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3546
    constants.DT_FILE: None,
3547
  }
3548

    
3549
  if disk_template not in req_size_dict:
3550
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3551
                                 " is unknown" %  disk_template)
3552

    
3553
  return req_size_dict[disk_template]
3554

    
3555

    
3556
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3557
  """Hypervisor parameter validation.
3558

3559
  This function abstract the hypervisor parameter validation to be
3560
  used in both instance create and instance modify.
3561

3562
  @type lu: L{LogicalUnit}
3563
  @param lu: the logical unit for which we check
3564
  @type nodenames: list
3565
  @param nodenames: the list of nodes on which we should check
3566
  @type hvname: string
3567
  @param hvname: the name of the hypervisor we should use
3568
  @type hvparams: dict
3569
  @param hvparams: the parameters which we need to check
3570
  @raise errors.OpPrereqError: if the parameters are not valid
3571

3572
  """
3573
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3574
                                                  hvname,
3575
                                                  hvparams)
3576
  for node in nodenames:
3577
    info = hvinfo[node]
3578
    info.Raise()
3579
    if not info.data or not isinstance(info.data, (tuple, list)):
3580
      raise errors.OpPrereqError("Cannot get current information"
3581
                                 " from node '%s' (%s)" % (node, info.data))
3582
    if not info.data[0]:
3583
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3584
                                 " %s" % info.data[1])
3585

    
3586

    
3587
class LUCreateInstance(LogicalUnit):
3588
  """Create an instance.
3589

3590
  """
3591
  HPATH = "instance-add"
3592
  HTYPE = constants.HTYPE_INSTANCE
3593
  _OP_REQP = ["instance_name", "disks", "disk_template",
3594
              "mode", "start",
3595
              "wait_for_sync", "ip_check", "nics",
3596
              "hvparams", "beparams"]
3597
  REQ_BGL = False
3598

    
3599
  def _ExpandNode(self, node):
3600
    """Expands and checks one node name.
3601

3602
    """
3603
    node_full = self.cfg.ExpandNodeName(node)
3604
    if node_full is None:
3605
      raise errors.OpPrereqError("Unknown node %s" % node)
3606
    return node_full
3607

    
3608
  def ExpandNames(self):
3609
    """ExpandNames for CreateInstance.
3610

3611
    Figure out the right locks for instance creation.
3612

3613
    """
3614
    self.needed_locks = {}
3615

    
3616
    # set optional parameters to none if they don't exist
3617
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3618
      if not hasattr(self.op, attr):
3619
        setattr(self.op, attr, None)
3620

    
3621
    # cheap checks, mostly valid constants given
3622

    
3623
    # verify creation mode
3624
    if self.op.mode not in (constants.INSTANCE_CREATE,
3625
                            constants.INSTANCE_IMPORT):
3626
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3627
                                 self.op.mode)
3628

    
3629
    # disk template and mirror node verification
3630
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3631
      raise errors.OpPrereqError("Invalid disk template name")
3632

    
3633
    if self.op.hypervisor is None:
3634
      self.op.hypervisor = self.cfg.GetHypervisorType()
3635

    
3636
    cluster = self.cfg.GetClusterInfo()
3637
    enabled_hvs = cluster.enabled_hypervisors
3638
    if self.op.hypervisor not in enabled_hvs:
3639
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3640
                                 " cluster (%s)" % (self.op.hypervisor,
3641
                                  ",".join(enabled_hvs)))
3642

    
3643
    # check hypervisor parameter syntax (locally)
3644

    
3645
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3646
                                  self.op.hvparams)
3647
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3648
    hv_type.CheckParameterSyntax(filled_hvp)
3649

    
3650
    # fill and remember the beparams dict
3651
    utils.CheckBEParams(self.op.beparams)
3652
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3653
                                    self.op.beparams)
3654

    
3655
    #### instance parameters check
3656

    
3657
    # instance name verification
3658
    hostname1 = utils.HostInfo(self.op.instance_name)
3659
    self.op.instance_name = instance_name = hostname1.name
3660

    
3661
    # this is just a preventive check, but someone might still add this
3662
    # instance in the meantime, and creation will fail at lock-add time
3663
    if instance_name in self.cfg.GetInstanceList():
3664
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3665
                                 instance_name)
3666

    
3667
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3668

    
3669
    # NIC buildup
3670
    self.nics = []
3671
    for nic in self.op.nics:
3672
      # ip validity checks
3673
      ip = nic.get("ip", None)
3674
      if ip is None or ip.lower() == "none":
3675
        nic_ip = None
3676
      elif ip.lower() == constants.VALUE_AUTO:
3677
        nic_ip = hostname1.ip
3678
      else:
3679
        if not utils.IsValidIP(ip):
3680
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3681
                                     " like a valid IP" % ip)
3682
        nic_ip = ip
3683

    
3684
      # MAC address verification
3685
      mac = nic.get("mac", constants.VALUE_AUTO)
3686
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3687
        if not utils.IsValidMac(mac.lower()):
3688
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3689
                                     mac)
3690
      # bridge verification
3691
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3692
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3693

    
3694
    # disk checks/pre-build
3695
    self.disks = []
3696
    for disk in self.op.disks:
3697
      mode = disk.get("mode", constants.DISK_RDWR)
3698
      if mode not in constants.DISK_ACCESS_SET:
3699
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3700
                                   mode)
3701
      size = disk.get("size", None)
3702
      if size is None:
3703
        raise errors.OpPrereqError("Missing disk size")
3704
      try:
3705
        size = int(size)
3706
      except ValueError:
3707
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3708
      self.disks.append({"size": size, "mode": mode})
3709

    
3710
    # used in CheckPrereq for ip ping check
3711
    self.check_ip = hostname1.ip
3712

    
3713
    # file storage checks
3714
    if (self.op.file_driver and
3715
        not self.op.file_driver in constants.FILE_DRIVER):
3716
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3717
                                 self.op.file_driver)
3718

    
3719
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3720
      raise errors.OpPrereqError("File storage directory path not absolute")
3721

    
3722
    ### Node/iallocator related checks
3723
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3724
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3725
                                 " node must be given")
3726

    
3727
    if self.op.iallocator:
3728
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3729
    else:
3730
      self.op.pnode = self._ExpandNode(self.op.pnode)
3731
      nodelist = [self.op.pnode]
3732
      if self.op.snode is not None:
3733
        self.op.snode = self._ExpandNode(self.op.snode)
3734
        nodelist.append(self.op.snode)
3735
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3736

    
3737
    # in case of import lock the source node too
3738
    if self.op.mode == constants.INSTANCE_IMPORT:
3739
      src_node = getattr(self.op, "src_node", None)
3740
      src_path = getattr(self.op, "src_path", None)
3741

    
3742
      if src_path is None:
3743
        self.op.src_path = src_path = self.op.instance_name
3744

    
3745
      if src_node is None:
3746
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3747
        self.op.src_node = None
3748
        if os.path.isabs(src_path):
3749
          raise errors.OpPrereqError("Importing an instance from an absolute"
3750
                                     " path requires a source node option.")
3751
      else:
3752
        self.op.src_node = src_node = self._ExpandNode(src_node)
3753
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3754
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
3755
        if not os.path.isabs(src_path):
3756
          self.op.src_path = src_path = \
3757
            os.path.join(constants.EXPORT_DIR, src_path)
3758

    
3759
    else: # INSTANCE_CREATE
3760
      if getattr(self.op, "os_type", None) is None:
3761
        raise errors.OpPrereqError("No guest OS specified")
3762

    
3763
  def _RunAllocator(self):
3764
    """Run the allocator based on input opcode.
3765

3766
    """
3767
    nics = [n.ToDict() for n in self.nics]
3768
    ial = IAllocator(self,
3769
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3770
                     name=self.op.instance_name,
3771
                     disk_template=self.op.disk_template,
3772
                     tags=[],
3773
                     os=self.op.os_type,
3774
                     vcpus=self.be_full[constants.BE_VCPUS],
3775
                     mem_size=self.be_full[constants.BE_MEMORY],
3776
                     disks=self.disks,
3777
                     nics=nics,
3778
                     hypervisor=self.op.hypervisor,
3779
                     )
3780

    
3781
    ial.Run(self.op.iallocator)
3782

    
3783
    if not ial.success:
3784
      raise errors.OpPrereqError("Can't compute nodes using"
3785
                                 " iallocator '%s': %s" % (self.op.iallocator,
3786
                                                           ial.info))
3787
    if len(ial.nodes) != ial.required_nodes:
3788
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3789
                                 " of nodes (%s), required %s" %
3790
                                 (self.op.iallocator, len(ial.nodes),
3791
                                  ial.required_nodes))
3792
    self.op.pnode = ial.nodes[0]
3793
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3794
                 self.op.instance_name, self.op.iallocator,
3795
                 ", ".join(ial.nodes))
3796
    if ial.required_nodes == 2:
3797
      self.op.snode = ial.nodes[1]
3798

    
3799
  def BuildHooksEnv(self):
3800
    """Build hooks env.
3801

3802
    This runs on master, primary and secondary nodes of the instance.
3803

3804
    """
3805
    env = {
3806
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3807
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3808
      "INSTANCE_ADD_MODE": self.op.mode,
3809
      }
3810
    if self.op.mode == constants.INSTANCE_IMPORT:
3811
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3812
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3813
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3814

    
3815
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3816
      primary_node=self.op.pnode,
3817
      secondary_nodes=self.secondaries,
3818
      status=self.instance_status,
3819
      os_type=self.op.os_type,
3820
      memory=self.be_full[constants.BE_MEMORY],
3821
      vcpus=self.be_full[constants.BE_VCPUS],
3822
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3823
    ))
3824

    
3825
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3826
          self.secondaries)
3827
    return env, nl, nl
3828

    
3829

    
3830
  def CheckPrereq(self):
3831
    """Check prerequisites.
3832

3833
    """
3834
    if (not self.cfg.GetVGName() and
3835
        self.op.disk_template not in constants.DTS_NOT_LVM):
3836
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3837
                                 " instances")
3838

    
3839

    
3840
    if self.op.mode == constants.INSTANCE_IMPORT:
3841
      src_node = self.op.src_node
3842
      src_path = self.op.src_path
3843

    
3844
      if src_node is None:
3845
        exp_list = self.rpc.call_export_list(
3846
          self.acquired_locks[locking.LEVEL_NODE])
3847
        found = False
3848
        for node in exp_list:
3849
          if not exp_list[node].failed and src_path in exp_list[node].data:
3850
            found = True
3851
            self.op.src_node = src_node = node
3852
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3853
                                                       src_path)
3854
            break
3855
        if not found:
3856
          raise errors.OpPrereqError("No export found for relative path %s" %
3857
                                      src_path)
3858

    
3859
      result = self.rpc.call_export_info(src_node, src_path)
3860
      result.Raise()
3861
      if not result.data:
3862
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3863

    
3864
      export_info = result.data
3865
      if not export_info.has_section(constants.INISECT_EXP):
3866
        raise errors.ProgrammerError("Corrupted export config")
3867

    
3868
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3869
      if (int(ei_version) != constants.EXPORT_VERSION):
3870
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3871
                                   (ei_version, constants.EXPORT_VERSION))
3872

    
3873
      # Check that the new instance doesn't have less disks than the export
3874
      instance_disks = len(self.disks)
3875
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3876
      if instance_disks < export_disks:
3877
        raise errors.OpPrereqError("Not enough disks to import."
3878
                                   " (instance: %d, export: %d)" %
3879
                                   (instance_disks, export_disks))
3880

    
3881
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3882
      disk_images = []
3883
      for idx in range(export_disks):
3884
        option = 'disk%d_dump' % idx
3885
        if export_info.has_option(constants.INISECT_INS, option):
3886
          # FIXME: are the old os-es, disk sizes, etc. useful?
3887
          export_name = export_info.get(constants.INISECT_INS, option)
3888
          image = os.path.join(src_path, export_name)
3889
          disk_images.append(image)
3890
        else:
3891
          disk_images.append(False)
3892

    
3893
      self.src_images = disk_images
3894

    
3895
      old_name = export_info.get(constants.INISECT_INS, 'name')
3896
      # FIXME: int() here could throw a ValueError on broken exports
3897
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3898
      if self.op.instance_name == old_name:
3899
        for idx, nic in enumerate(self.nics):
3900
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3901
            nic_mac_ini = 'nic%d_mac' % idx
3902
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3903

    
3904
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3905
    if self.op.start and not self.op.ip_check:
3906
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3907
                                 " adding an instance in start mode")
3908

    
3909
    if self.op.ip_check:
3910
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3911
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3912
                                   (self.check_ip, self.op.instance_name))
3913

    
3914
    #### allocator run
3915

    
3916
    if self.op.iallocator is not None:
3917
      self._RunAllocator()
3918

    
3919
    #### node related checks
3920

    
3921
    # check primary node
3922
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3923
    assert self.pnode is not None, \
3924
      "Cannot retrieve locked node %s" % self.op.pnode
3925
    self.secondaries = []
3926

    
3927
    # mirror node verification
3928
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3929
      if self.op.snode is None:
3930
        raise errors.OpPrereqError("The networked disk templates need"
3931
                                   " a mirror node")
3932
      if self.op.snode == pnode.name:
3933
        raise errors.OpPrereqError("The secondary node cannot be"
3934
                                   " the primary node.")
3935
      self.secondaries.append(self.op.snode)
3936

    
3937
    nodenames = [pnode.name] + self.secondaries
3938

    
3939
    req_size = _ComputeDiskSize(self.op.disk_template,
3940
                                self.disks)
3941

    
3942
    # Check lv size requirements
3943
    if req_size is not None:
3944
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3945
                                         self.op.hypervisor)
3946
      for node in nodenames:
3947
        info = nodeinfo[node]
3948
        info.Raise()
3949
        info = info.data
3950
        if not info:
3951
          raise errors.OpPrereqError("Cannot get current information"
3952
                                     " from node '%s'" % node)
3953
        vg_free = info.get('vg_free', None)
3954
        if not isinstance(vg_free, int):
3955
          raise errors.OpPrereqError("Can't compute free disk space on"
3956
                                     " node %s" % node)
3957
        if req_size > info['vg_free']:
3958
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3959
                                     " %d MB available, %d MB required" %
3960
                                     (node, info['vg_free'], req_size))
3961

    
3962
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3963

    
3964
    # os verification
3965
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3966
    result.Raise()
3967
    if not isinstance(result.data, objects.OS):
3968
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3969
                                 " primary node"  % self.op.os_type)
3970

    
3971
    # bridge check on primary node
3972
    bridges = [n.bridge for n in self.nics]
3973
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3974
    result.Raise()
3975
    if not result.data:
3976
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
3977
                                 " exist on destination node '%s'" %
3978
                                 (",".join(bridges), pnode.name))
3979

    
3980
    # memory check on primary node
3981
    if self.op.start:
3982
      _CheckNodeFreeMemory(self, self.pnode.name,
3983
                           "creating instance %s" % self.op.instance_name,
3984
                           self.be_full[constants.BE_MEMORY],
3985
                           self.op.hypervisor)
3986

    
3987
    if self.op.start:
3988
      self.instance_status = 'up'
3989
    else:
3990
      self.instance_status = 'down'
3991

    
3992
  def Exec(self, feedback_fn):
3993
    """Create and add the instance to the cluster.
3994

3995
    """
3996
    instance = self.op.instance_name
3997
    pnode_name = self.pnode.name
3998

    
3999
    for nic in self.nics:
4000
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4001
        nic.mac = self.cfg.GenerateMAC()
4002

    
4003
    ht_kind =