Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 3a5ba66a

History | View | Annotate | Download (217.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = lu.cfg.GetInstanceList()
396
  return utils.NiceSort(wanted)
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _CheckBooleanOpField(op, name):
419
  """Validates boolean opcode parameters.
420

421
  This will ensure that an opcode parameter is either a boolean value,
422
  or None (but that it always exists).
423

424
  """
425
  val = getattr(op, name, None)
426
  if not (val is None or isinstance(val, bool)):
427
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428
                               (name, str(val)))
429
  setattr(op, name, val)
430

    
431

    
432
def _CheckNodeOnline(lu, node):
433
  """Ensure that a given node is online.
434

435
  @param lu: the LU on behalf of which we make the check
436
  @param node: the node to check
437
  @raise errors.OpPrereqError: if the nodes is offline
438

439
  """
440
  if lu.cfg.GetNodeInfo(node).offline:
441
    raise errors.OpPrereqError("Can't use offline node %s" % node)
442

    
443

    
444
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445
                          memory, vcpus, nics):
446
  """Builds instance related env variables for hooks
447

448
  This builds the hook environment from individual variables.
449

450
  @type name: string
451
  @param name: the name of the instance
452
  @type primary_node: string
453
  @param primary_node: the name of the instance's primary node
454
  @type secondary_nodes: list
455
  @param secondary_nodes: list of secondary nodes as strings
456
  @type os_type: string
457
  @param os_type: the name of the instance's OS
458
  @type status: string
459
  @param status: the desired status of the instances
460
  @type memory: string
461
  @param memory: the memory size of the instance
462
  @type vcpus: string
463
  @param vcpus: the count of VCPUs the instance has
464
  @type nics: list
465
  @param nics: list of tuples (ip, bridge, mac) representing
466
      the NICs the instance  has
467
  @rtype: dict
468
  @return: the hook environment for this instance
469

470
  """
471
  env = {
472
    "OP_TARGET": name,
473
    "INSTANCE_NAME": name,
474
    "INSTANCE_PRIMARY": primary_node,
475
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
476
    "INSTANCE_OS_TYPE": os_type,
477
    "INSTANCE_STATUS": status,
478
    "INSTANCE_MEMORY": memory,
479
    "INSTANCE_VCPUS": vcpus,
480
  }
481

    
482
  if nics:
483
    nic_count = len(nics)
484
    for idx, (ip, bridge, mac) in enumerate(nics):
485
      if ip is None:
486
        ip = ""
487
      env["INSTANCE_NIC%d_IP" % idx] = ip
488
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
489
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
490
  else:
491
    nic_count = 0
492

    
493
  env["INSTANCE_NIC_COUNT"] = nic_count
494

    
495
  return env
496

    
497

    
498
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
499
  """Builds instance related env variables for hooks from an object.
500

501
  @type lu: L{LogicalUnit}
502
  @param lu: the logical unit on whose behalf we execute
503
  @type instance: L{objects.Instance}
504
  @param instance: the instance for which we should build the
505
      environment
506
  @type override: dict
507
  @param override: dictionary with key/values that will override
508
      our values
509
  @rtype: dict
510
  @return: the hook environment dictionary
511

512
  """
513
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
514
  args = {
515
    'name': instance.name,
516
    'primary_node': instance.primary_node,
517
    'secondary_nodes': instance.secondary_nodes,
518
    'os_type': instance.os,
519
    'status': instance.os,
520
    'memory': bep[constants.BE_MEMORY],
521
    'vcpus': bep[constants.BE_VCPUS],
522
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
523
  }
524
  if override:
525
    args.update(override)
526
  return _BuildInstanceHookEnv(**args)
527

    
528

    
529
def _AdjustCandidatePool(lu):
530
  """Adjust the candidate pool after node operations.
531

532
  """
533
  mod_list = lu.cfg.MaintainCandidatePool()
534
  if mod_list:
535
    lu.LogInfo("Promoted nodes to master candidate role: %s",
536
               ", ".join(mod_list))
537
    for name in mod_list:
538
      lu.context.ReaddNode(name)
539
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
540
  if mc_now > mc_max:
541
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
542
               (mc_now, mc_max))
543

    
544

    
545
def _CheckInstanceBridgesExist(lu, instance):
546
  """Check that the brigdes needed by an instance exist.
547

548
  """
549
  # check bridges existance
550
  brlist = [nic.bridge for nic in instance.nics]
551
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
552
  result.Raise()
553
  if not result.data:
554
    raise errors.OpPrereqError("One or more target bridges %s does not"
555
                               " exist on destination node '%s'" %
556
                               (brlist, instance.primary_node))
557

    
558

    
559
class LUDestroyCluster(NoHooksLU):
560
  """Logical unit for destroying the cluster.
561

562
  """
563
  _OP_REQP = []
564

    
565
  def CheckPrereq(self):
566
    """Check prerequisites.
567

568
    This checks whether the cluster is empty.
569

570
    Any errors are signalled by raising errors.OpPrereqError.
571

572
    """
573
    master = self.cfg.GetMasterNode()
574

    
575
    nodelist = self.cfg.GetNodeList()
576
    if len(nodelist) != 1 or nodelist[0] != master:
577
      raise errors.OpPrereqError("There are still %d node(s) in"
578
                                 " this cluster." % (len(nodelist) - 1))
579
    instancelist = self.cfg.GetInstanceList()
580
    if instancelist:
581
      raise errors.OpPrereqError("There are still %d instance(s) in"
582
                                 " this cluster." % len(instancelist))
583

    
584
  def Exec(self, feedback_fn):
585
    """Destroys the cluster.
586

587
    """
588
    master = self.cfg.GetMasterNode()
589
    result = self.rpc.call_node_stop_master(master, False)
590
    result.Raise()
591
    if not result.data:
592
      raise errors.OpExecError("Could not disable the master role")
593
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594
    utils.CreateBackup(priv_key)
595
    utils.CreateBackup(pub_key)
596
    return master
597

    
598

    
599
class LUVerifyCluster(LogicalUnit):
600
  """Verifies the cluster status.
601

602
  """
603
  HPATH = "cluster-verify"
604
  HTYPE = constants.HTYPE_CLUSTER
605
  _OP_REQP = ["skip_checks"]
606
  REQ_BGL = False
607

    
608
  def ExpandNames(self):
609
    self.needed_locks = {
610
      locking.LEVEL_NODE: locking.ALL_SET,
611
      locking.LEVEL_INSTANCE: locking.ALL_SET,
612
    }
613
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
614

    
615
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
616
                  node_result, feedback_fn, master_files):
617
    """Run multiple tests against a node.
618

619
    Test list:
620

621
      - compares ganeti version
622
      - checks vg existance and size > 20G
623
      - checks config file checksum
624
      - checks ssh to other nodes
625

626
    @type nodeinfo: L{objects.Node}
627
    @param nodeinfo: the node to check
628
    @param file_list: required list of files
629
    @param local_cksum: dictionary of local files and their checksums
630
    @param node_result: the results from the node
631
    @param feedback_fn: function used to accumulate results
632
    @param master_files: list of files that only masters should have
633

634
    """
635
    node = nodeinfo.name
636

    
637
    # main result, node_result should be a non-empty dict
638
    if not node_result or not isinstance(node_result, dict):
639
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
640
      return True
641

    
642
    # compares ganeti version
643
    local_version = constants.PROTOCOL_VERSION
644
    remote_version = node_result.get('version', None)
645
    if not remote_version:
646
      feedback_fn("  - ERROR: connection to %s failed" % (node))
647
      return True
648

    
649
    if local_version != remote_version:
650
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
651
                      (local_version, node, remote_version))
652
      return True
653

    
654
    # checks vg existance and size > 20G
655

    
656
    bad = False
657
    vglist = node_result.get(constants.NV_VGLIST, None)
658
    if not vglist:
659
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
660
                      (node,))
661
      bad = True
662
    else:
663
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
664
                                            constants.MIN_VG_SIZE)
665
      if vgstatus:
666
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
667
        bad = True
668

    
669
    # checks config file checksum
670

    
671
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
672
    if not isinstance(remote_cksum, dict):
673
      bad = True
674
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
675
    else:
676
      for file_name in file_list:
677
        node_is_mc = nodeinfo.master_candidate
678
        must_have_file = file_name not in master_files
679
        if file_name not in remote_cksum:
680
          if node_is_mc or must_have_file:
681
            bad = True
682
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
683
        elif remote_cksum[file_name] != local_cksum[file_name]:
684
          if node_is_mc or must_have_file:
685
            bad = True
686
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
687
          else:
688
            # not candidate and this is not a must-have file
689
            bad = True
690
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
691
                        " '%s'" % file_name)
692
        else:
693
          # all good, except non-master/non-must have combination
694
          if not node_is_mc and not must_have_file:
695
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
696
                        " candidates" % file_name)
697

    
698
    # checks ssh to any
699

    
700
    if constants.NV_NODELIST not in node_result:
701
      bad = True
702
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
703
    else:
704
      if node_result[constants.NV_NODELIST]:
705
        bad = True
706
        for node in node_result[constants.NV_NODELIST]:
707
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
708
                          (node, node_result[constants.NV_NODELIST][node]))
709

    
710
    if constants.NV_NODENETTEST not in node_result:
711
      bad = True
712
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
713
    else:
714
      if node_result[constants.NV_NODENETTEST]:
715
        bad = True
716
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
717
        for node in nlist:
718
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
719
                          (node, node_result[constants.NV_NODENETTEST][node]))
720

    
721
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
722
    if isinstance(hyp_result, dict):
723
      for hv_name, hv_result in hyp_result.iteritems():
724
        if hv_result is not None:
725
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
726
                      (hv_name, hv_result))
727
    return bad
728

    
729
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
730
                      node_instance, feedback_fn, n_offline):
731
    """Verify an instance.
732

733
    This function checks to see if the required block devices are
734
    available on the instance's node.
735

736
    """
737
    bad = False
738

    
739
    node_current = instanceconfig.primary_node
740

    
741
    node_vol_should = {}
742
    instanceconfig.MapLVsByNode(node_vol_should)
743

    
744
    for node in node_vol_should:
745
      if node in n_offline:
746
        # ignore missing volumes on offline nodes
747
        continue
748
      for volume in node_vol_should[node]:
749
        if node not in node_vol_is or volume not in node_vol_is[node]:
750
          feedback_fn("  - ERROR: volume %s missing on node %s" %
751
                          (volume, node))
752
          bad = True
753

    
754
    if not instanceconfig.status == 'down':
755
      if ((node_current not in node_instance or
756
          not instance in node_instance[node_current]) and
757
          node_current not in n_offline):
758
        feedback_fn("  - ERROR: instance %s not running on node %s" %
759
                        (instance, node_current))
760
        bad = True
761

    
762
    for node in node_instance:
763
      if (not node == node_current):
764
        if instance in node_instance[node]:
765
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
766
                          (instance, node))
767
          bad = True
768

    
769
    return bad
770

    
771
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
772
    """Verify if there are any unknown volumes in the cluster.
773

774
    The .os, .swap and backup volumes are ignored. All other volumes are
775
    reported as unknown.
776

777
    """
778
    bad = False
779

    
780
    for node in node_vol_is:
781
      for volume in node_vol_is[node]:
782
        if node not in node_vol_should or volume not in node_vol_should[node]:
783
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
784
                      (volume, node))
785
          bad = True
786
    return bad
787

    
788
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
789
    """Verify the list of running instances.
790

791
    This checks what instances are running but unknown to the cluster.
792

793
    """
794
    bad = False
795
    for node in node_instance:
796
      for runninginstance in node_instance[node]:
797
        if runninginstance not in instancelist:
798
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
799
                          (runninginstance, node))
800
          bad = True
801
    return bad
802

    
803
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
804
    """Verify N+1 Memory Resilience.
805

806
    Check that if one single node dies we can still start all the instances it
807
    was primary for.
808

809
    """
810
    bad = False
811

    
812
    for node, nodeinfo in node_info.iteritems():
813
      # This code checks that every node which is now listed as secondary has
814
      # enough memory to host all instances it is supposed to should a single
815
      # other node in the cluster fail.
816
      # FIXME: not ready for failover to an arbitrary node
817
      # FIXME: does not support file-backed instances
818
      # WARNING: we currently take into account down instances as well as up
819
      # ones, considering that even if they're down someone might want to start
820
      # them even in the event of a node failure.
821
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
822
        needed_mem = 0
823
        for instance in instances:
824
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
825
          if bep[constants.BE_AUTO_BALANCE]:
826
            needed_mem += bep[constants.BE_MEMORY]
827
        if nodeinfo['mfree'] < needed_mem:
828
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
829
                      " failovers should node %s fail" % (node, prinode))
830
          bad = True
831
    return bad
832

    
833
  def CheckPrereq(self):
834
    """Check prerequisites.
835

836
    Transform the list of checks we're going to skip into a set and check that
837
    all its members are valid.
838

839
    """
840
    self.skip_set = frozenset(self.op.skip_checks)
841
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
842
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
843

    
844
  def BuildHooksEnv(self):
845
    """Build hooks env.
846

847
    Cluster-Verify hooks just rone in the post phase and their failure makes
848
    the output be logged in the verify output and the verification to fail.
849

850
    """
851
    all_nodes = self.cfg.GetNodeList()
852
    # TODO: populate the environment with useful information for verify hooks
853
    env = {}
854
    return env, [], all_nodes
855

    
856
  def Exec(self, feedback_fn):
857
    """Verify integrity of cluster, performing various test on nodes.
858

859
    """
860
    bad = False
861
    feedback_fn("* Verifying global settings")
862
    for msg in self.cfg.VerifyConfig():
863
      feedback_fn("  - ERROR: %s" % msg)
864

    
865
    vg_name = self.cfg.GetVGName()
866
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
867
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
868
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
869
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
870
    i_non_redundant = [] # Non redundant instances
871
    i_non_a_balanced = [] # Non auto-balanced instances
872
    n_offline = [] # List of offline nodes
873
    node_volume = {}
874
    node_instance = {}
875
    node_info = {}
876
    instance_cfg = {}
877

    
878
    # FIXME: verify OS list
879
    # do local checksums
880
    master_files = [constants.CLUSTER_CONF_FILE]
881

    
882
    file_names = ssconf.SimpleStore().GetFileList()
883
    file_names.append(constants.SSL_CERT_FILE)
884
    file_names.extend(master_files)
885

    
886
    local_checksums = utils.FingerprintFiles(file_names)
887

    
888
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
889
    node_verify_param = {
890
      constants.NV_FILELIST: file_names,
891
      constants.NV_NODELIST: nodelist,
892
      constants.NV_HYPERVISOR: hypervisors,
893
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
894
                                  node.secondary_ip) for node in nodeinfo],
895
      constants.NV_LVLIST: vg_name,
896
      constants.NV_INSTANCELIST: hypervisors,
897
      constants.NV_VGLIST: None,
898
      constants.NV_VERSION: None,
899
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
900
      }
901
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
902
                                           self.cfg.GetClusterName())
903

    
904
    cluster = self.cfg.GetClusterInfo()
905
    master_node = self.cfg.GetMasterNode()
906
    for node_i in nodeinfo:
907
      node = node_i.name
908
      nresult = all_nvinfo[node].data
909

    
910
      if node_i.offline:
911
        feedback_fn("* Skipping offline node %s" % (node,))
912
        n_offline.append(node)
913
        continue
914

    
915
      if node == master_node:
916
        ntype = "master"
917
      elif node_i.master_candidate:
918
        ntype = "master candidate"
919
      else:
920
        ntype = "regular"
921
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
922

    
923
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
924
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
925
        bad = True
926
        continue
927

    
928
      result = self._VerifyNode(node_i, file_names, local_checksums,
929
                                nresult, feedback_fn, master_files)
930
      bad = bad or result
931

    
932
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
933
      if isinstance(lvdata, basestring):
934
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
935
                    (node, lvdata.encode('string_escape')))
936
        bad = True
937
        node_volume[node] = {}
938
      elif not isinstance(lvdata, dict):
939
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
940
        bad = True
941
        continue
942
      else:
943
        node_volume[node] = lvdata
944

    
945
      # node_instance
946
      idata = nresult.get(constants.NV_INSTANCELIST, None)
947
      if not isinstance(idata, list):
948
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
949
                    (node,))
950
        bad = True
951
        continue
952

    
953
      node_instance[node] = idata
954

    
955
      # node_info
956
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
957
      if not isinstance(nodeinfo, dict):
958
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
959
        bad = True
960
        continue
961

    
962
      try:
963
        node_info[node] = {
964
          "mfree": int(nodeinfo['memory_free']),
965
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
966
          "pinst": [],
967
          "sinst": [],
968
          # dictionary holding all instances this node is secondary for,
969
          # grouped by their primary node. Each key is a cluster node, and each
970
          # value is a list of instances which have the key as primary and the
971
          # current node as secondary.  this is handy to calculate N+1 memory
972
          # availability if you can only failover from a primary to its
973
          # secondary.
974
          "sinst-by-pnode": {},
975
        }
976
      except ValueError:
977
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
978
        bad = True
979
        continue
980

    
981
    node_vol_should = {}
982

    
983
    for instance in instancelist:
984
      feedback_fn("* Verifying instance %s" % instance)
985
      inst_config = self.cfg.GetInstanceInfo(instance)
986
      result =  self._VerifyInstance(instance, inst_config, node_volume,
987
                                     node_instance, feedback_fn, n_offline)
988
      bad = bad or result
989

    
990
      inst_config.MapLVsByNode(node_vol_should)
991

    
992
      instance_cfg[instance] = inst_config
993

    
994
      pnode = inst_config.primary_node
995
      if pnode in node_info:
996
        node_info[pnode]['pinst'].append(instance)
997
      elif pnode not in n_offline:
998
        feedback_fn("  - ERROR: instance %s, connection to primary node"
999
                    " %s failed" % (instance, pnode))
1000
        bad = True
1001

    
1002
      # If the instance is non-redundant we cannot survive losing its primary
1003
      # node, so we are not N+1 compliant. On the other hand we have no disk
1004
      # templates with more than one secondary so that situation is not well
1005
      # supported either.
1006
      # FIXME: does not support file-backed instances
1007
      if len(inst_config.secondary_nodes) == 0:
1008
        i_non_redundant.append(instance)
1009
      elif len(inst_config.secondary_nodes) > 1:
1010
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1011
                    % instance)
1012

    
1013
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1014
        i_non_a_balanced.append(instance)
1015

    
1016
      for snode in inst_config.secondary_nodes:
1017
        if snode in node_info:
1018
          node_info[snode]['sinst'].append(instance)
1019
          if pnode not in node_info[snode]['sinst-by-pnode']:
1020
            node_info[snode]['sinst-by-pnode'][pnode] = []
1021
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1022
        elif snode not in n_offline:
1023
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1024
                      " %s failed" % (instance, snode))
1025

    
1026
    feedback_fn("* Verifying orphan volumes")
1027
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1028
                                       feedback_fn)
1029
    bad = bad or result
1030

    
1031
    feedback_fn("* Verifying remaining instances")
1032
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1033
                                         feedback_fn)
1034
    bad = bad or result
1035

    
1036
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1037
      feedback_fn("* Verifying N+1 Memory redundancy")
1038
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1039
      bad = bad or result
1040

    
1041
    feedback_fn("* Other Notes")
1042
    if i_non_redundant:
1043
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1044
                  % len(i_non_redundant))
1045

    
1046
    if i_non_a_balanced:
1047
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1048
                  % len(i_non_a_balanced))
1049

    
1050
    if n_offline:
1051
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1052

    
1053
    return not bad
1054

    
1055
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1056
    """Analize the post-hooks' result
1057

1058
    This method analyses the hook result, handles it, and sends some
1059
    nicely-formatted feedback back to the user.
1060

1061
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1062
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1063
    @param hooks_results: the results of the multi-node hooks rpc call
1064
    @param feedback_fn: function used send feedback back to the caller
1065
    @param lu_result: previous Exec result
1066
    @return: the new Exec result, based on the previous result
1067
        and hook results
1068

1069
    """
1070
    # We only really run POST phase hooks, and are only interested in
1071
    # their results
1072
    if phase == constants.HOOKS_PHASE_POST:
1073
      # Used to change hooks' output to proper indentation
1074
      indent_re = re.compile('^', re.M)
1075
      feedback_fn("* Hooks Results")
1076
      if not hooks_results:
1077
        feedback_fn("  - ERROR: general communication failure")
1078
        lu_result = 1
1079
      else:
1080
        for node_name in hooks_results:
1081
          show_node_header = True
1082
          res = hooks_results[node_name]
1083
          if res.failed or res.data is False or not isinstance(res.data, list):
1084
            if res.offline:
1085
              # no need to warn or set fail return value
1086
              continue
1087
            feedback_fn("    Communication failure in hooks execution")
1088
            lu_result = 1
1089
            continue
1090
          for script, hkr, output in res.data:
1091
            if hkr == constants.HKR_FAIL:
1092
              # The node header is only shown once, if there are
1093
              # failing hooks on that node
1094
              if show_node_header:
1095
                feedback_fn("  Node %s:" % node_name)
1096
                show_node_header = False
1097
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1098
              output = indent_re.sub('      ', output)
1099
              feedback_fn("%s" % output)
1100
              lu_result = 1
1101

    
1102
      return lu_result
1103

    
1104

    
1105
class LUVerifyDisks(NoHooksLU):
1106
  """Verifies the cluster disks status.
1107

1108
  """
1109
  _OP_REQP = []
1110
  REQ_BGL = False
1111

    
1112
  def ExpandNames(self):
1113
    self.needed_locks = {
1114
      locking.LEVEL_NODE: locking.ALL_SET,
1115
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1116
    }
1117
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1118

    
1119
  def CheckPrereq(self):
1120
    """Check prerequisites.
1121

1122
    This has no prerequisites.
1123

1124
    """
1125
    pass
1126

    
1127
  def Exec(self, feedback_fn):
1128
    """Verify integrity of cluster disks.
1129

1130
    """
1131
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1132

    
1133
    vg_name = self.cfg.GetVGName()
1134
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1135
    instances = [self.cfg.GetInstanceInfo(name)
1136
                 for name in self.cfg.GetInstanceList()]
1137

    
1138
    nv_dict = {}
1139
    for inst in instances:
1140
      inst_lvs = {}
1141
      if (inst.status != "up" or
1142
          inst.disk_template not in constants.DTS_NET_MIRROR):
1143
        continue
1144
      inst.MapLVsByNode(inst_lvs)
1145
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1146
      for node, vol_list in inst_lvs.iteritems():
1147
        for vol in vol_list:
1148
          nv_dict[(node, vol)] = inst
1149

    
1150
    if not nv_dict:
1151
      return result
1152

    
1153
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1154

    
1155
    to_act = set()
1156
    for node in nodes:
1157
      # node_volume
1158
      lvs = node_lvs[node]
1159
      if lvs.failed:
1160
        if not lvs.offline:
1161
          self.LogWarning("Connection to node %s failed: %s" %
1162
                          (node, lvs.data))
1163
        continue
1164
      lvs = lvs.data
1165
      if isinstance(lvs, basestring):
1166
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1167
        res_nlvm[node] = lvs
1168
      elif not isinstance(lvs, dict):
1169
        logging.warning("Connection to node %s failed or invalid data"
1170
                        " returned", node)
1171
        res_nodes.append(node)
1172
        continue
1173

    
1174
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1175
        inst = nv_dict.pop((node, lv_name), None)
1176
        if (not lv_online and inst is not None
1177
            and inst.name not in res_instances):
1178
          res_instances.append(inst.name)
1179

    
1180
    # any leftover items in nv_dict are missing LVs, let's arrange the
1181
    # data better
1182
    for key, inst in nv_dict.iteritems():
1183
      if inst.name not in res_missing:
1184
        res_missing[inst.name] = []
1185
      res_missing[inst.name].append(key)
1186

    
1187
    return result
1188

    
1189

    
1190
class LURenameCluster(LogicalUnit):
1191
  """Rename the cluster.
1192

1193
  """
1194
  HPATH = "cluster-rename"
1195
  HTYPE = constants.HTYPE_CLUSTER
1196
  _OP_REQP = ["name"]
1197

    
1198
  def BuildHooksEnv(self):
1199
    """Build hooks env.
1200

1201
    """
1202
    env = {
1203
      "OP_TARGET": self.cfg.GetClusterName(),
1204
      "NEW_NAME": self.op.name,
1205
      }
1206
    mn = self.cfg.GetMasterNode()
1207
    return env, [mn], [mn]
1208

    
1209
  def CheckPrereq(self):
1210
    """Verify that the passed name is a valid one.
1211

1212
    """
1213
    hostname = utils.HostInfo(self.op.name)
1214

    
1215
    new_name = hostname.name
1216
    self.ip = new_ip = hostname.ip
1217
    old_name = self.cfg.GetClusterName()
1218
    old_ip = self.cfg.GetMasterIP()
1219
    if new_name == old_name and new_ip == old_ip:
1220
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1221
                                 " cluster has changed")
1222
    if new_ip != old_ip:
1223
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1224
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1225
                                   " reachable on the network. Aborting." %
1226
                                   new_ip)
1227

    
1228
    self.op.name = new_name
1229

    
1230
  def Exec(self, feedback_fn):
1231
    """Rename the cluster.
1232

1233
    """
1234
    clustername = self.op.name
1235
    ip = self.ip
1236

    
1237
    # shutdown the master IP
1238
    master = self.cfg.GetMasterNode()
1239
    result = self.rpc.call_node_stop_master(master, False)
1240
    if result.failed or not result.data:
1241
      raise errors.OpExecError("Could not disable the master role")
1242

    
1243
    try:
1244
      cluster = self.cfg.GetClusterInfo()
1245
      cluster.cluster_name = clustername
1246
      cluster.master_ip = ip
1247
      self.cfg.Update(cluster)
1248

    
1249
      # update the known hosts file
1250
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1251
      node_list = self.cfg.GetNodeList()
1252
      try:
1253
        node_list.remove(master)
1254
      except ValueError:
1255
        pass
1256
      result = self.rpc.call_upload_file(node_list,
1257
                                         constants.SSH_KNOWN_HOSTS_FILE)
1258
      for to_node, to_result in result.iteritems():
1259
        if to_result.failed or not to_result.data:
1260
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1261

    
1262
    finally:
1263
      result = self.rpc.call_node_start_master(master, False)
1264
      if result.failed or not result.data:
1265
        self.LogWarning("Could not re-enable the master role on"
1266
                        " the master, please restart manually.")
1267

    
1268

    
1269
def _RecursiveCheckIfLVMBased(disk):
1270
  """Check if the given disk or its children are lvm-based.
1271

1272
  @type disk: L{objects.Disk}
1273
  @param disk: the disk to check
1274
  @rtype: booleean
1275
  @return: boolean indicating whether a LD_LV dev_type was found or not
1276

1277
  """
1278
  if disk.children:
1279
    for chdisk in disk.children:
1280
      if _RecursiveCheckIfLVMBased(chdisk):
1281
        return True
1282
  return disk.dev_type == constants.LD_LV
1283

    
1284

    
1285
class LUSetClusterParams(LogicalUnit):
1286
  """Change the parameters of the cluster.
1287

1288
  """
1289
  HPATH = "cluster-modify"
1290
  HTYPE = constants.HTYPE_CLUSTER
1291
  _OP_REQP = []
1292
  REQ_BGL = False
1293

    
1294
  def CheckParameters(self):
1295
    """Check parameters
1296

1297
    """
1298
    if not hasattr(self.op, "candidate_pool_size"):
1299
      self.op.candidate_pool_size = None
1300
    if self.op.candidate_pool_size is not None:
1301
      try:
1302
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1303
      except ValueError, err:
1304
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1305
                                   str(err))
1306
      if self.op.candidate_pool_size < 1:
1307
        raise errors.OpPrereqError("At least one master candidate needed")
1308

    
1309
  def ExpandNames(self):
1310
    # FIXME: in the future maybe other cluster params won't require checking on
1311
    # all nodes to be modified.
1312
    self.needed_locks = {
1313
      locking.LEVEL_NODE: locking.ALL_SET,
1314
    }
1315
    self.share_locks[locking.LEVEL_NODE] = 1
1316

    
1317
  def BuildHooksEnv(self):
1318
    """Build hooks env.
1319

1320
    """
1321
    env = {
1322
      "OP_TARGET": self.cfg.GetClusterName(),
1323
      "NEW_VG_NAME": self.op.vg_name,
1324
      }
1325
    mn = self.cfg.GetMasterNode()
1326
    return env, [mn], [mn]
1327

    
1328
  def CheckPrereq(self):
1329
    """Check prerequisites.
1330

1331
    This checks whether the given params don't conflict and
1332
    if the given volume group is valid.
1333

1334
    """
1335
    # FIXME: This only works because there is only one parameter that can be
1336
    # changed or removed.
1337
    if self.op.vg_name is not None and not self.op.vg_name:
1338
      instances = self.cfg.GetAllInstancesInfo().values()
1339
      for inst in instances:
1340
        for disk in inst.disks:
1341
          if _RecursiveCheckIfLVMBased(disk):
1342
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1343
                                       " lvm-based instances exist")
1344

    
1345
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1346

    
1347
    # if vg_name not None, checks given volume group on all nodes
1348
    if self.op.vg_name:
1349
      vglist = self.rpc.call_vg_list(node_list)
1350
      for node in node_list:
1351
        if vglist[node].failed:
1352
          # ignoring down node
1353
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1354
          continue
1355
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1356
                                              self.op.vg_name,
1357
                                              constants.MIN_VG_SIZE)
1358
        if vgstatus:
1359
          raise errors.OpPrereqError("Error on node '%s': %s" %
1360
                                     (node, vgstatus))
1361

    
1362
    self.cluster = cluster = self.cfg.GetClusterInfo()
1363
    # validate beparams changes
1364
    if self.op.beparams:
1365
      utils.CheckBEParams(self.op.beparams)
1366
      self.new_beparams = cluster.FillDict(
1367
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1368

    
1369
    # hypervisor list/parameters
1370
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1371
    if self.op.hvparams:
1372
      if not isinstance(self.op.hvparams, dict):
1373
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1374
      for hv_name, hv_dict in self.op.hvparams.items():
1375
        if hv_name not in self.new_hvparams:
1376
          self.new_hvparams[hv_name] = hv_dict
1377
        else:
1378
          self.new_hvparams[hv_name].update(hv_dict)
1379

    
1380
    if self.op.enabled_hypervisors is not None:
1381
      self.hv_list = self.op.enabled_hypervisors
1382
    else:
1383
      self.hv_list = cluster.enabled_hypervisors
1384

    
1385
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1386
      # either the enabled list has changed, or the parameters have, validate
1387
      for hv_name, hv_params in self.new_hvparams.items():
1388
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1389
            (self.op.enabled_hypervisors and
1390
             hv_name in self.op.enabled_hypervisors)):
1391
          # either this is a new hypervisor, or its parameters have changed
1392
          hv_class = hypervisor.GetHypervisor(hv_name)
1393
          hv_class.CheckParameterSyntax(hv_params)
1394
          _CheckHVParams(self, node_list, hv_name, hv_params)
1395

    
1396
  def Exec(self, feedback_fn):
1397
    """Change the parameters of the cluster.
1398

1399
    """
1400
    if self.op.vg_name is not None:
1401
      if self.op.vg_name != self.cfg.GetVGName():
1402
        self.cfg.SetVGName(self.op.vg_name)
1403
      else:
1404
        feedback_fn("Cluster LVM configuration already in desired"
1405
                    " state, not changing")
1406
    if self.op.hvparams:
1407
      self.cluster.hvparams = self.new_hvparams
1408
    if self.op.enabled_hypervisors is not None:
1409
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1410
    if self.op.beparams:
1411
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1412
    if self.op.candidate_pool_size is not None:
1413
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1414

    
1415
    self.cfg.Update(self.cluster)
1416

    
1417
    # we want to update nodes after the cluster so that if any errors
1418
    # happen, we have recorded and saved the cluster info
1419
    if self.op.candidate_pool_size is not None:
1420
      _AdjustCandidatePool(self)
1421

    
1422

    
1423
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1424
  """Sleep and poll for an instance's disk to sync.
1425

1426
  """
1427
  if not instance.disks:
1428
    return True
1429

    
1430
  if not oneshot:
1431
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1432

    
1433
  node = instance.primary_node
1434

    
1435
  for dev in instance.disks:
1436
    lu.cfg.SetDiskID(dev, node)
1437

    
1438
  retries = 0
1439
  while True:
1440
    max_time = 0
1441
    done = True
1442
    cumul_degraded = False
1443
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1444
    if rstats.failed or not rstats.data:
1445
      lu.LogWarning("Can't get any data from node %s", node)
1446
      retries += 1
1447
      if retries >= 10:
1448
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1449
                                 " aborting." % node)
1450
      time.sleep(6)
1451
      continue
1452
    rstats = rstats.data
1453
    retries = 0
1454
    for i in range(len(rstats)):
1455
      mstat = rstats[i]
1456
      if mstat is None:
1457
        lu.LogWarning("Can't compute data for node %s/%s",
1458
                           node, instance.disks[i].iv_name)
1459
        continue
1460
      # we ignore the ldisk parameter
1461
      perc_done, est_time, is_degraded, _ = mstat
1462
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1463
      if perc_done is not None:
1464
        done = False
1465
        if est_time is not None:
1466
          rem_time = "%d estimated seconds remaining" % est_time
1467
          max_time = est_time
1468
        else:
1469
          rem_time = "no time estimate"
1470
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1471
                        (instance.disks[i].iv_name, perc_done, rem_time))
1472
    if done or oneshot:
1473
      break
1474

    
1475
    time.sleep(min(60, max_time))
1476

    
1477
  if done:
1478
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1479
  return not cumul_degraded
1480

    
1481

    
1482
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1483
  """Check that mirrors are not degraded.
1484

1485
  The ldisk parameter, if True, will change the test from the
1486
  is_degraded attribute (which represents overall non-ok status for
1487
  the device(s)) to the ldisk (representing the local storage status).
1488

1489
  """
1490
  lu.cfg.SetDiskID(dev, node)
1491
  if ldisk:
1492
    idx = 6
1493
  else:
1494
    idx = 5
1495

    
1496
  result = True
1497
  if on_primary or dev.AssembleOnSecondary():
1498
    rstats = lu.rpc.call_blockdev_find(node, dev)
1499
    if rstats.failed or not rstats.data:
1500
      logging.warning("Node %s: disk degraded, not found or node down", node)
1501
      result = False
1502
    else:
1503
      result = result and (not rstats.data[idx])
1504
  if dev.children:
1505
    for child in dev.children:
1506
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1507

    
1508
  return result
1509

    
1510

    
1511
class LUDiagnoseOS(NoHooksLU):
1512
  """Logical unit for OS diagnose/query.
1513

1514
  """
1515
  _OP_REQP = ["output_fields", "names"]
1516
  REQ_BGL = False
1517
  _FIELDS_STATIC = utils.FieldSet()
1518
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1519

    
1520
  def ExpandNames(self):
1521
    if self.op.names:
1522
      raise errors.OpPrereqError("Selective OS query not supported")
1523

    
1524
    _CheckOutputFields(static=self._FIELDS_STATIC,
1525
                       dynamic=self._FIELDS_DYNAMIC,
1526
                       selected=self.op.output_fields)
1527

    
1528
    # Lock all nodes, in shared mode
1529
    self.needed_locks = {}
1530
    self.share_locks[locking.LEVEL_NODE] = 1
1531
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1532

    
1533
  def CheckPrereq(self):
1534
    """Check prerequisites.
1535

1536
    """
1537

    
1538
  @staticmethod
1539
  def _DiagnoseByOS(node_list, rlist):
1540
    """Remaps a per-node return list into an a per-os per-node dictionary
1541

1542
    @param node_list: a list with the names of all nodes
1543
    @param rlist: a map with node names as keys and OS objects as values
1544

1545
    @rtype: dict
1546
    @returns: a dictionary with osnames as keys and as value another map, with
1547
        nodes as keys and list of OS objects as values, eg::
1548

1549
          {"debian-etch": {"node1": [<object>,...],
1550
                           "node2": [<object>,]}
1551
          }
1552

1553
    """
1554
    all_os = {}
1555
    for node_name, nr in rlist.iteritems():
1556
      if nr.failed or not nr.data:
1557
        continue
1558
      for os_obj in nr.data:
1559
        if os_obj.name not in all_os:
1560
          # build a list of nodes for this os containing empty lists
1561
          # for each node in node_list
1562
          all_os[os_obj.name] = {}
1563
          for nname in node_list:
1564
            all_os[os_obj.name][nname] = []
1565
        all_os[os_obj.name][node_name].append(os_obj)
1566
    return all_os
1567

    
1568
  def Exec(self, feedback_fn):
1569
    """Compute the list of OSes.
1570

1571
    """
1572
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1573
    node_data = self.rpc.call_os_diagnose(node_list)
1574
    if node_data == False:
1575
      raise errors.OpExecError("Can't gather the list of OSes")
1576
    pol = self._DiagnoseByOS(node_list, node_data)
1577
    output = []
1578
    for os_name, os_data in pol.iteritems():
1579
      row = []
1580
      for field in self.op.output_fields:
1581
        if field == "name":
1582
          val = os_name
1583
        elif field == "valid":
1584
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1585
        elif field == "node_status":
1586
          val = {}
1587
          for node_name, nos_list in os_data.iteritems():
1588
            val[node_name] = [(v.status, v.path) for v in nos_list]
1589
        else:
1590
          raise errors.ParameterError(field)
1591
        row.append(val)
1592
      output.append(row)
1593

    
1594
    return output
1595

    
1596

    
1597
class LURemoveNode(LogicalUnit):
1598
  """Logical unit for removing a node.
1599

1600
  """
1601
  HPATH = "node-remove"
1602
  HTYPE = constants.HTYPE_NODE
1603
  _OP_REQP = ["node_name"]
1604

    
1605
  def BuildHooksEnv(self):
1606
    """Build hooks env.
1607

1608
    This doesn't run on the target node in the pre phase as a failed
1609
    node would then be impossible to remove.
1610

1611
    """
1612
    env = {
1613
      "OP_TARGET": self.op.node_name,
1614
      "NODE_NAME": self.op.node_name,
1615
      }
1616
    all_nodes = self.cfg.GetNodeList()
1617
    all_nodes.remove(self.op.node_name)
1618
    return env, all_nodes, all_nodes
1619

    
1620
  def CheckPrereq(self):
1621
    """Check prerequisites.
1622

1623
    This checks:
1624
     - the node exists in the configuration
1625
     - it does not have primary or secondary instances
1626
     - it's not the master
1627

1628
    Any errors are signalled by raising errors.OpPrereqError.
1629

1630
    """
1631
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1632
    if node is None:
1633
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1634

    
1635
    instance_list = self.cfg.GetInstanceList()
1636

    
1637
    masternode = self.cfg.GetMasterNode()
1638
    if node.name == masternode:
1639
      raise errors.OpPrereqError("Node is the master node,"
1640
                                 " you need to failover first.")
1641

    
1642
    for instance_name in instance_list:
1643
      instance = self.cfg.GetInstanceInfo(instance_name)
1644
      if node.name == instance.primary_node:
1645
        raise errors.OpPrereqError("Instance %s still running on the node,"
1646
                                   " please remove first." % instance_name)
1647
      if node.name in instance.secondary_nodes:
1648
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1649
                                   " please remove first." % instance_name)
1650
    self.op.node_name = node.name
1651
    self.node = node
1652

    
1653
  def Exec(self, feedback_fn):
1654
    """Removes the node from the cluster.
1655

1656
    """
1657
    node = self.node
1658
    logging.info("Stopping the node daemon and removing configs from node %s",
1659
                 node.name)
1660

    
1661
    self.context.RemoveNode(node.name)
1662

    
1663
    self.rpc.call_node_leave_cluster(node.name)
1664

    
1665
    # Promote nodes to master candidate as needed
1666
    _AdjustCandidatePool(self)
1667

    
1668

    
1669
class LUQueryNodes(NoHooksLU):
1670
  """Logical unit for querying nodes.
1671

1672
  """
1673
  _OP_REQP = ["output_fields", "names"]
1674
  REQ_BGL = False
1675
  _FIELDS_DYNAMIC = utils.FieldSet(
1676
    "dtotal", "dfree",
1677
    "mtotal", "mnode", "mfree",
1678
    "bootid",
1679
    "ctotal",
1680
    )
1681

    
1682
  _FIELDS_STATIC = utils.FieldSet(
1683
    "name", "pinst_cnt", "sinst_cnt",
1684
    "pinst_list", "sinst_list",
1685
    "pip", "sip", "tags",
1686
    "serial_no",
1687
    "master_candidate",
1688
    "master",
1689
    "offline",
1690
    )
1691

    
1692
  def ExpandNames(self):
1693
    _CheckOutputFields(static=self._FIELDS_STATIC,
1694
                       dynamic=self._FIELDS_DYNAMIC,
1695
                       selected=self.op.output_fields)
1696

    
1697
    self.needed_locks = {}
1698
    self.share_locks[locking.LEVEL_NODE] = 1
1699

    
1700
    if self.op.names:
1701
      self.wanted = _GetWantedNodes(self, self.op.names)
1702
    else:
1703
      self.wanted = locking.ALL_SET
1704

    
1705
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1706
    if self.do_locking:
1707
      # if we don't request only static fields, we need to lock the nodes
1708
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1709

    
1710

    
1711
  def CheckPrereq(self):
1712
    """Check prerequisites.
1713

1714
    """
1715
    # The validation of the node list is done in the _GetWantedNodes,
1716
    # if non empty, and if empty, there's no validation to do
1717
    pass
1718

    
1719
  def Exec(self, feedback_fn):
1720
    """Computes the list of nodes and their attributes.
1721

1722
    """
1723
    all_info = self.cfg.GetAllNodesInfo()
1724
    if self.do_locking:
1725
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1726
    elif self.wanted != locking.ALL_SET:
1727
      nodenames = self.wanted
1728
      missing = set(nodenames).difference(all_info.keys())
1729
      if missing:
1730
        raise errors.OpExecError(
1731
          "Some nodes were removed before retrieving their data: %s" % missing)
1732
    else:
1733
      nodenames = all_info.keys()
1734

    
1735
    nodenames = utils.NiceSort(nodenames)
1736
    nodelist = [all_info[name] for name in nodenames]
1737

    
1738
    # begin data gathering
1739

    
1740
    if self.do_locking:
1741
      live_data = {}
1742
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1743
                                          self.cfg.GetHypervisorType())
1744
      for name in nodenames:
1745
        nodeinfo = node_data[name]
1746
        if not nodeinfo.failed and nodeinfo.data:
1747
          nodeinfo = nodeinfo.data
1748
          fn = utils.TryConvert
1749
          live_data[name] = {
1750
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1751
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1752
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1753
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1754
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1755
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1756
            "bootid": nodeinfo.get('bootid', None),
1757
            }
1758
        else:
1759
          live_data[name] = {}
1760
    else:
1761
      live_data = dict.fromkeys(nodenames, {})
1762

    
1763
    node_to_primary = dict([(name, set()) for name in nodenames])
1764
    node_to_secondary = dict([(name, set()) for name in nodenames])
1765

    
1766
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1767
                             "sinst_cnt", "sinst_list"))
1768
    if inst_fields & frozenset(self.op.output_fields):
1769
      instancelist = self.cfg.GetInstanceList()
1770

    
1771
      for instance_name in instancelist:
1772
        inst = self.cfg.GetInstanceInfo(instance_name)
1773
        if inst.primary_node in node_to_primary:
1774
          node_to_primary[inst.primary_node].add(inst.name)
1775
        for secnode in inst.secondary_nodes:
1776
          if secnode in node_to_secondary:
1777
            node_to_secondary[secnode].add(inst.name)
1778

    
1779
    master_node = self.cfg.GetMasterNode()
1780

    
1781
    # end data gathering
1782

    
1783
    output = []
1784
    for node in nodelist:
1785
      node_output = []
1786
      for field in self.op.output_fields:
1787
        if field == "name":
1788
          val = node.name
1789
        elif field == "pinst_list":
1790
          val = list(node_to_primary[node.name])
1791
        elif field == "sinst_list":
1792
          val = list(node_to_secondary[node.name])
1793
        elif field == "pinst_cnt":
1794
          val = len(node_to_primary[node.name])
1795
        elif field == "sinst_cnt":
1796
          val = len(node_to_secondary[node.name])
1797
        elif field == "pip":
1798
          val = node.primary_ip
1799
        elif field == "sip":
1800
          val = node.secondary_ip
1801
        elif field == "tags":
1802
          val = list(node.GetTags())
1803
        elif field == "serial_no":
1804
          val = node.serial_no
1805
        elif field == "master_candidate":
1806
          val = node.master_candidate
1807
        elif field == "master":
1808
          val = node.name == master_node
1809
        elif field == "offline":
1810
          val = node.offline
1811
        elif self._FIELDS_DYNAMIC.Matches(field):
1812
          val = live_data[node.name].get(field, None)
1813
        else:
1814
          raise errors.ParameterError(field)
1815
        node_output.append(val)
1816
      output.append(node_output)
1817

    
1818
    return output
1819

    
1820

    
1821
class LUQueryNodeVolumes(NoHooksLU):
1822
  """Logical unit for getting volumes on node(s).
1823

1824
  """
1825
  _OP_REQP = ["nodes", "output_fields"]
1826
  REQ_BGL = False
1827
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1828
  _FIELDS_STATIC = utils.FieldSet("node")
1829

    
1830
  def ExpandNames(self):
1831
    _CheckOutputFields(static=self._FIELDS_STATIC,
1832
                       dynamic=self._FIELDS_DYNAMIC,
1833
                       selected=self.op.output_fields)
1834

    
1835
    self.needed_locks = {}
1836
    self.share_locks[locking.LEVEL_NODE] = 1
1837
    if not self.op.nodes:
1838
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1839
    else:
1840
      self.needed_locks[locking.LEVEL_NODE] = \
1841
        _GetWantedNodes(self, self.op.nodes)
1842

    
1843
  def CheckPrereq(self):
1844
    """Check prerequisites.
1845

1846
    This checks that the fields required are valid output fields.
1847

1848
    """
1849
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1850

    
1851
  def Exec(self, feedback_fn):
1852
    """Computes the list of nodes and their attributes.
1853

1854
    """
1855
    nodenames = self.nodes
1856
    volumes = self.rpc.call_node_volumes(nodenames)
1857

    
1858
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1859
             in self.cfg.GetInstanceList()]
1860

    
1861
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1862

    
1863
    output = []
1864
    for node in nodenames:
1865
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1866
        continue
1867

    
1868
      node_vols = volumes[node].data[:]
1869
      node_vols.sort(key=lambda vol: vol['dev'])
1870

    
1871
      for vol in node_vols:
1872
        node_output = []
1873
        for field in self.op.output_fields:
1874
          if field == "node":
1875
            val = node
1876
          elif field == "phys":
1877
            val = vol['dev']
1878
          elif field == "vg":
1879
            val = vol['vg']
1880
          elif field == "name":
1881
            val = vol['name']
1882
          elif field == "size":
1883
            val = int(float(vol['size']))
1884
          elif field == "instance":
1885
            for inst in ilist:
1886
              if node not in lv_by_node[inst]:
1887
                continue
1888
              if vol['name'] in lv_by_node[inst][node]:
1889
                val = inst.name
1890
                break
1891
            else:
1892
              val = '-'
1893
          else:
1894
            raise errors.ParameterError(field)
1895
          node_output.append(str(val))
1896

    
1897
        output.append(node_output)
1898

    
1899
    return output
1900

    
1901

    
1902
class LUAddNode(LogicalUnit):
1903
  """Logical unit for adding node to the cluster.
1904

1905
  """
1906
  HPATH = "node-add"
1907
  HTYPE = constants.HTYPE_NODE
1908
  _OP_REQP = ["node_name"]
1909

    
1910
  def BuildHooksEnv(self):
1911
    """Build hooks env.
1912

1913
    This will run on all nodes before, and on all nodes + the new node after.
1914

1915
    """
1916
    env = {
1917
      "OP_TARGET": self.op.node_name,
1918
      "NODE_NAME": self.op.node_name,
1919
      "NODE_PIP": self.op.primary_ip,
1920
      "NODE_SIP": self.op.secondary_ip,
1921
      }
1922
    nodes_0 = self.cfg.GetNodeList()
1923
    nodes_1 = nodes_0 + [self.op.node_name, ]
1924
    return env, nodes_0, nodes_1
1925

    
1926
  def CheckPrereq(self):
1927
    """Check prerequisites.
1928

1929
    This checks:
1930
     - the new node is not already in the config
1931
     - it is resolvable
1932
     - its parameters (single/dual homed) matches the cluster
1933

1934
    Any errors are signalled by raising errors.OpPrereqError.
1935

1936
    """
1937
    node_name = self.op.node_name
1938
    cfg = self.cfg
1939

    
1940
    dns_data = utils.HostInfo(node_name)
1941

    
1942
    node = dns_data.name
1943
    primary_ip = self.op.primary_ip = dns_data.ip
1944
    secondary_ip = getattr(self.op, "secondary_ip", None)
1945
    if secondary_ip is None:
1946
      secondary_ip = primary_ip
1947
    if not utils.IsValidIP(secondary_ip):
1948
      raise errors.OpPrereqError("Invalid secondary IP given")
1949
    self.op.secondary_ip = secondary_ip
1950

    
1951
    node_list = cfg.GetNodeList()
1952
    if not self.op.readd and node in node_list:
1953
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1954
                                 node)
1955
    elif self.op.readd and node not in node_list:
1956
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1957

    
1958
    for existing_node_name in node_list:
1959
      existing_node = cfg.GetNodeInfo(existing_node_name)
1960

    
1961
      if self.op.readd and node == existing_node_name:
1962
        if (existing_node.primary_ip != primary_ip or
1963
            existing_node.secondary_ip != secondary_ip):
1964
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1965
                                     " address configuration as before")
1966
        continue
1967

    
1968
      if (existing_node.primary_ip == primary_ip or
1969
          existing_node.secondary_ip == primary_ip or
1970
          existing_node.primary_ip == secondary_ip or
1971
          existing_node.secondary_ip == secondary_ip):
1972
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1973
                                   " existing node %s" % existing_node.name)
1974

    
1975
    # check that the type of the node (single versus dual homed) is the
1976
    # same as for the master
1977
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1978
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1979
    newbie_singlehomed = secondary_ip == primary_ip
1980
    if master_singlehomed != newbie_singlehomed:
1981
      if master_singlehomed:
1982
        raise errors.OpPrereqError("The master has no private ip but the"
1983
                                   " new node has one")
1984
      else:
1985
        raise errors.OpPrereqError("The master has a private ip but the"
1986
                                   " new node doesn't have one")
1987

    
1988
    # checks reachablity
1989
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1990
      raise errors.OpPrereqError("Node not reachable by ping")
1991

    
1992
    if not newbie_singlehomed:
1993
      # check reachability from my secondary ip to newbie's secondary ip
1994
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1995
                           source=myself.secondary_ip):
1996
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1997
                                   " based ping to noded port")
1998

    
1999
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2000
    node_info = self.cfg.GetAllNodesInfo().values()
2001
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2002
    master_candidate = mc_now < cp_size
2003

    
2004
    self.new_node = objects.Node(name=node,
2005
                                 primary_ip=primary_ip,
2006
                                 secondary_ip=secondary_ip,
2007
                                 master_candidate=master_candidate,
2008
                                 offline=False)
2009

    
2010
  def Exec(self, feedback_fn):
2011
    """Adds the new node to the cluster.
2012

2013
    """
2014
    new_node = self.new_node
2015
    node = new_node.name
2016

    
2017
    # check connectivity
2018
    result = self.rpc.call_version([node])[node]
2019
    result.Raise()
2020
    if result.data:
2021
      if constants.PROTOCOL_VERSION == result.data:
2022
        logging.info("Communication to node %s fine, sw version %s match",
2023
                     node, result.data)
2024
      else:
2025
        raise errors.OpExecError("Version mismatch master version %s,"
2026
                                 " node version %s" %
2027
                                 (constants.PROTOCOL_VERSION, result.data))
2028
    else:
2029
      raise errors.OpExecError("Cannot get version from the new node")
2030

    
2031
    # setup ssh on node
2032
    logging.info("Copy ssh key to node %s", node)
2033
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2034
    keyarray = []
2035
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2036
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2037
                priv_key, pub_key]
2038

    
2039
    for i in keyfiles:
2040
      f = open(i, 'r')
2041
      try:
2042
        keyarray.append(f.read())
2043
      finally:
2044
        f.close()
2045

    
2046
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2047
                                    keyarray[2],
2048
                                    keyarray[3], keyarray[4], keyarray[5])
2049

    
2050
    if result.failed or not result.data:
2051
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2052

    
2053
    # Add node to our /etc/hosts, and add key to known_hosts
2054
    utils.AddHostToEtcHosts(new_node.name)
2055

    
2056
    if new_node.secondary_ip != new_node.primary_ip:
2057
      result = self.rpc.call_node_has_ip_address(new_node.name,
2058
                                                 new_node.secondary_ip)
2059
      if result.failed or not result.data:
2060
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2061
                                 " you gave (%s). Please fix and re-run this"
2062
                                 " command." % new_node.secondary_ip)
2063

    
2064
    node_verify_list = [self.cfg.GetMasterNode()]
2065
    node_verify_param = {
2066
      'nodelist': [node],
2067
      # TODO: do a node-net-test as well?
2068
    }
2069

    
2070
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2071
                                       self.cfg.GetClusterName())
2072
    for verifier in node_verify_list:
2073
      if result[verifier].failed or not result[verifier].data:
2074
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2075
                                 " for remote verification" % verifier)
2076
      if result[verifier].data['nodelist']:
2077
        for failed in result[verifier].data['nodelist']:
2078
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2079
                      (verifier, result[verifier]['nodelist'][failed]))
2080
        raise errors.OpExecError("ssh/hostname verification failed.")
2081

    
2082
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2083
    # including the node just added
2084
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2085
    dist_nodes = self.cfg.GetNodeList()
2086
    if not self.op.readd:
2087
      dist_nodes.append(node)
2088
    if myself.name in dist_nodes:
2089
      dist_nodes.remove(myself.name)
2090

    
2091
    logging.debug("Copying hosts and known_hosts to all nodes")
2092
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2093
      result = self.rpc.call_upload_file(dist_nodes, fname)
2094
      for to_node, to_result in result.iteritems():
2095
        if to_result.failed or not to_result.data:
2096
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2097

    
2098
    to_copy = []
2099
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2100
      to_copy.append(constants.VNC_PASSWORD_FILE)
2101
    for fname in to_copy:
2102
      result = self.rpc.call_upload_file([node], fname)
2103
      if result[node].failed or not result[node]:
2104
        logging.error("Could not copy file %s to node %s", fname, node)
2105

    
2106
    if self.op.readd:
2107
      self.context.ReaddNode(new_node)
2108
    else:
2109
      self.context.AddNode(new_node)
2110

    
2111

    
2112
class LUSetNodeParams(LogicalUnit):
2113
  """Modifies the parameters of a node.
2114

2115
  """
2116
  HPATH = "node-modify"
2117
  HTYPE = constants.HTYPE_NODE
2118
  _OP_REQP = ["node_name"]
2119
  REQ_BGL = False
2120

    
2121
  def CheckArguments(self):
2122
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2123
    if node_name is None:
2124
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2125
    self.op.node_name = node_name
2126
    _CheckBooleanOpField(self.op, 'master_candidate')
2127
    _CheckBooleanOpField(self.op, 'offline')
2128
    if self.op.master_candidate is None and self.op.offline is None:
2129
      raise errors.OpPrereqError("Please pass at least one modification")
2130
    if self.op.offline == True and self.op.master_candidate == True:
2131
      raise errors.OpPrereqError("Can't set the node into offline and"
2132
                                 " master_candidate at the same time")
2133

    
2134
  def ExpandNames(self):
2135
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2136

    
2137
  def BuildHooksEnv(self):
2138
    """Build hooks env.
2139

2140
    This runs on the master node.
2141

2142
    """
2143
    env = {
2144
      "OP_TARGET": self.op.node_name,
2145
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2146
      "OFFLINE": str(self.op.offline),
2147
      }
2148
    nl = [self.cfg.GetMasterNode(),
2149
          self.op.node_name]
2150
    return env, nl, nl
2151

    
2152
  def CheckPrereq(self):
2153
    """Check prerequisites.
2154

2155
    This only checks the instance list against the existing names.
2156

2157
    """
2158
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2159

    
2160
    if ((self.op.master_candidate == False or self.op.offline == True)
2161
        and node.master_candidate):
2162
      # we will demote the node from master_candidate
2163
      if self.op.node_name == self.cfg.GetMasterNode():
2164
        raise errors.OpPrereqError("The master node has to be a"
2165
                                   " master candidate and online")
2166
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2167
      node_info = self.cfg.GetAllNodesInfo().values()
2168
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2169
      if num_candidates <= cp_size:
2170
        msg = ("Not enough master candidates (desired"
2171
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2172
        if self.op.force:
2173
          self.LogWarning(msg)
2174
        else:
2175
          raise errors.OpPrereqError(msg)
2176

    
2177
    if (self.op.master_candidate == True and node.offline and
2178
        not self.op.offline == False):
2179
      raise errors.OpPrereqError("Can't set an offline node to"
2180
                                 " master_candidate")
2181

    
2182
    return
2183

    
2184
  def Exec(self, feedback_fn):
2185
    """Modifies a node.
2186

2187
    """
2188
    node = self.node
2189

    
2190
    result = []
2191

    
2192
    if self.op.offline is not None:
2193
      node.offline = self.op.offline
2194
      result.append(("offline", str(self.op.offline)))
2195
      if self.op.offline == True and node.master_candidate:
2196
        node.master_candidate = False
2197
        result.append(("master_candidate", "auto-demotion due to offline"))
2198

    
2199
    if self.op.master_candidate is not None:
2200
      node.master_candidate = self.op.master_candidate
2201
      result.append(("master_candidate", str(self.op.master_candidate)))
2202
      if self.op.master_candidate == False:
2203
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2204
        if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2205
            or len(rrc.data) != 2):
2206
          self.LogWarning("Node rpc error: %s" % rrc.error)
2207
        elif not rrc.data[0]:
2208
          self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2209

    
2210
    # this will trigger configuration file update, if needed
2211
    self.cfg.Update(node)
2212
    # this will trigger job queue propagation or cleanup
2213
    if self.op.node_name != self.cfg.GetMasterNode():
2214
      self.context.ReaddNode(node)
2215

    
2216
    return result
2217

    
2218

    
2219
class LUQueryClusterInfo(NoHooksLU):
2220
  """Query cluster configuration.
2221

2222
  """
2223
  _OP_REQP = []
2224
  REQ_BGL = False
2225

    
2226
  def ExpandNames(self):
2227
    self.needed_locks = {}
2228

    
2229
  def CheckPrereq(self):
2230
    """No prerequsites needed for this LU.
2231

2232
    """
2233
    pass
2234

    
2235
  def Exec(self, feedback_fn):
2236
    """Return cluster config.
2237

2238
    """
2239
    cluster = self.cfg.GetClusterInfo()
2240
    result = {
2241
      "software_version": constants.RELEASE_VERSION,
2242
      "protocol_version": constants.PROTOCOL_VERSION,
2243
      "config_version": constants.CONFIG_VERSION,
2244
      "os_api_version": constants.OS_API_VERSION,
2245
      "export_version": constants.EXPORT_VERSION,
2246
      "architecture": (platform.architecture()[0], platform.machine()),
2247
      "name": cluster.cluster_name,
2248
      "master": cluster.master_node,
2249
      "default_hypervisor": cluster.default_hypervisor,
2250
      "enabled_hypervisors": cluster.enabled_hypervisors,
2251
      "hvparams": cluster.hvparams,
2252
      "beparams": cluster.beparams,
2253
      "candidate_pool_size": cluster.candidate_pool_size,
2254
      }
2255

    
2256
    return result
2257

    
2258

    
2259
class LUQueryConfigValues(NoHooksLU):
2260
  """Return configuration values.
2261

2262
  """
2263
  _OP_REQP = []
2264
  REQ_BGL = False
2265
  _FIELDS_DYNAMIC = utils.FieldSet()
2266
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2267

    
2268
  def ExpandNames(self):
2269
    self.needed_locks = {}
2270

    
2271
    _CheckOutputFields(static=self._FIELDS_STATIC,
2272
                       dynamic=self._FIELDS_DYNAMIC,
2273
                       selected=self.op.output_fields)
2274

    
2275
  def CheckPrereq(self):
2276
    """No prerequisites.
2277

2278
    """
2279
    pass
2280

    
2281
  def Exec(self, feedback_fn):
2282
    """Dump a representation of the cluster config to the standard output.
2283

2284
    """
2285
    values = []
2286
    for field in self.op.output_fields:
2287
      if field == "cluster_name":
2288
        entry = self.cfg.GetClusterName()
2289
      elif field == "master_node":
2290
        entry = self.cfg.GetMasterNode()
2291
      elif field == "drain_flag":
2292
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2293
      else:
2294
        raise errors.ParameterError(field)
2295
      values.append(entry)
2296
    return values
2297

    
2298

    
2299
class LUActivateInstanceDisks(NoHooksLU):
2300
  """Bring up an instance's disks.
2301

2302
  """
2303
  _OP_REQP = ["instance_name"]
2304
  REQ_BGL = False
2305

    
2306
  def ExpandNames(self):
2307
    self._ExpandAndLockInstance()
2308
    self.needed_locks[locking.LEVEL_NODE] = []
2309
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2310

    
2311
  def DeclareLocks(self, level):
2312
    if level == locking.LEVEL_NODE:
2313
      self._LockInstancesNodes()
2314

    
2315
  def CheckPrereq(self):
2316
    """Check prerequisites.
2317

2318
    This checks that the instance is in the cluster.
2319

2320
    """
2321
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2322
    assert self.instance is not None, \
2323
      "Cannot retrieve locked instance %s" % self.op.instance_name
2324
    _CheckNodeOnline(self, instance.primary_node)
2325

    
2326
  def Exec(self, feedback_fn):
2327
    """Activate the disks.
2328

2329
    """
2330
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2331
    if not disks_ok:
2332
      raise errors.OpExecError("Cannot activate block devices")
2333

    
2334
    return disks_info
2335

    
2336

    
2337
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2338
  """Prepare the block devices for an instance.
2339

2340
  This sets up the block devices on all nodes.
2341

2342
  @type lu: L{LogicalUnit}
2343
  @param lu: the logical unit on whose behalf we execute
2344
  @type instance: L{objects.Instance}
2345
  @param instance: the instance for whose disks we assemble
2346
  @type ignore_secondaries: boolean
2347
  @param ignore_secondaries: if true, errors on secondary nodes
2348
      won't result in an error return from the function
2349
  @return: False if the operation failed, otherwise a list of
2350
      (host, instance_visible_name, node_visible_name)
2351
      with the mapping from node devices to instance devices
2352

2353
  """
2354
  device_info = []
2355
  disks_ok = True
2356
  iname = instance.name
2357
  # With the two passes mechanism we try to reduce the window of
2358
  # opportunity for the race condition of switching DRBD to primary
2359
  # before handshaking occured, but we do not eliminate it
2360

    
2361
  # The proper fix would be to wait (with some limits) until the
2362
  # connection has been made and drbd transitions from WFConnection
2363
  # into any other network-connected state (Connected, SyncTarget,
2364
  # SyncSource, etc.)
2365

    
2366
  # 1st pass, assemble on all nodes in secondary mode
2367
  for inst_disk in instance.disks:
2368
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2369
      lu.cfg.SetDiskID(node_disk, node)
2370
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2371
      if result.failed or not result:
2372
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2373
                           " (is_primary=False, pass=1)",
2374
                           inst_disk.iv_name, node)
2375
        if not ignore_secondaries:
2376
          disks_ok = False
2377

    
2378
  # FIXME: race condition on drbd migration to primary
2379

    
2380
  # 2nd pass, do only the primary node
2381
  for inst_disk in instance.disks:
2382
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2383
      if node != instance.primary_node:
2384
        continue
2385
      lu.cfg.SetDiskID(node_disk, node)
2386
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2387
      if result.failed or not result:
2388
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2389
                           " (is_primary=True, pass=2)",
2390
                           inst_disk.iv_name, node)
2391
        disks_ok = False
2392
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2393

    
2394
  # leave the disks configured for the primary node
2395
  # this is a workaround that would be fixed better by
2396
  # improving the logical/physical id handling
2397
  for disk in instance.disks:
2398
    lu.cfg.SetDiskID(disk, instance.primary_node)
2399

    
2400
  return disks_ok, device_info
2401

    
2402

    
2403
def _StartInstanceDisks(lu, instance, force):
2404
  """Start the disks of an instance.
2405

2406
  """
2407
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2408
                                           ignore_secondaries=force)
2409
  if not disks_ok:
2410
    _ShutdownInstanceDisks(lu, instance)
2411
    if force is not None and not force:
2412
      lu.proc.LogWarning("", hint="If the message above refers to a"
2413
                         " secondary node,"
2414
                         " you can retry the operation using '--force'.")
2415
    raise errors.OpExecError("Disk consistency error")
2416

    
2417

    
2418
class LUDeactivateInstanceDisks(NoHooksLU):
2419
  """Shutdown an instance's disks.
2420

2421
  """
2422
  _OP_REQP = ["instance_name"]
2423
  REQ_BGL = False
2424

    
2425
  def ExpandNames(self):
2426
    self._ExpandAndLockInstance()
2427
    self.needed_locks[locking.LEVEL_NODE] = []
2428
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2429

    
2430
  def DeclareLocks(self, level):
2431
    if level == locking.LEVEL_NODE:
2432
      self._LockInstancesNodes()
2433

    
2434
  def CheckPrereq(self):
2435
    """Check prerequisites.
2436

2437
    This checks that the instance is in the cluster.
2438

2439
    """
2440
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2441
    assert self.instance is not None, \
2442
      "Cannot retrieve locked instance %s" % self.op.instance_name
2443

    
2444
  def Exec(self, feedback_fn):
2445
    """Deactivate the disks
2446

2447
    """
2448
    instance = self.instance
2449
    _SafeShutdownInstanceDisks(self, instance)
2450

    
2451

    
2452
def _SafeShutdownInstanceDisks(lu, instance):
2453
  """Shutdown block devices of an instance.
2454

2455
  This function checks if an instance is running, before calling
2456
  _ShutdownInstanceDisks.
2457

2458
  """
2459
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2460
                                      [instance.hypervisor])
2461
  ins_l = ins_l[instance.primary_node]
2462
  if ins_l.failed or not isinstance(ins_l.data, list):
2463
    raise errors.OpExecError("Can't contact node '%s'" %
2464
                             instance.primary_node)
2465

    
2466
  if instance.name in ins_l.data:
2467
    raise errors.OpExecError("Instance is running, can't shutdown"
2468
                             " block devices.")
2469

    
2470
  _ShutdownInstanceDisks(lu, instance)
2471

    
2472

    
2473
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2474
  """Shutdown block devices of an instance.
2475

2476
  This does the shutdown on all nodes of the instance.
2477

2478
  If the ignore_primary is false, errors on the primary node are
2479
  ignored.
2480

2481
  """
2482
  result = True
2483
  for disk in instance.disks:
2484
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2485
      lu.cfg.SetDiskID(top_disk, node)
2486
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2487
      if result.failed or not result.data:
2488
        logging.error("Could not shutdown block device %s on node %s",
2489
                      disk.iv_name, node)
2490
        if not ignore_primary or node != instance.primary_node:
2491
          result = False
2492
  return result
2493

    
2494

    
2495
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2496
  """Checks if a node has enough free memory.
2497

2498
  This function check if a given node has the needed amount of free
2499
  memory. In case the node has less memory or we cannot get the
2500
  information from the node, this function raise an OpPrereqError
2501
  exception.
2502

2503
  @type lu: C{LogicalUnit}
2504
  @param lu: a logical unit from which we get configuration data
2505
  @type node: C{str}
2506
  @param node: the node to check
2507
  @type reason: C{str}
2508
  @param reason: string to use in the error message
2509
  @type requested: C{int}
2510
  @param requested: the amount of memory in MiB to check for
2511
  @type hypervisor: C{str}
2512
  @param hypervisor: the hypervisor to ask for memory stats
2513
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2514
      we cannot check the node
2515

2516
  """
2517
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2518
  nodeinfo[node].Raise()
2519
  free_mem = nodeinfo[node].data.get('memory_free')
2520
  if not isinstance(free_mem, int):
2521
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2522
                             " was '%s'" % (node, free_mem))
2523
  if requested > free_mem:
2524
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2525
                             " needed %s MiB, available %s MiB" %
2526
                             (node, reason, requested, free_mem))
2527

    
2528

    
2529
class LUStartupInstance(LogicalUnit):
2530
  """Starts an instance.
2531

2532
  """
2533
  HPATH = "instance-start"
2534
  HTYPE = constants.HTYPE_INSTANCE
2535
  _OP_REQP = ["instance_name", "force"]
2536
  REQ_BGL = False
2537

    
2538
  def ExpandNames(self):
2539
    self._ExpandAndLockInstance()
2540

    
2541
  def BuildHooksEnv(self):
2542
    """Build hooks env.
2543

2544
    This runs on master, primary and secondary nodes of the instance.
2545

2546
    """
2547
    env = {
2548
      "FORCE": self.op.force,
2549
      }
2550
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2551
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2552
          list(self.instance.secondary_nodes))
2553
    return env, nl, nl
2554

    
2555
  def CheckPrereq(self):
2556
    """Check prerequisites.
2557

2558
    This checks that the instance is in the cluster.
2559

2560
    """
2561
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2562
    assert self.instance is not None, \
2563
      "Cannot retrieve locked instance %s" % self.op.instance_name
2564

    
2565
    _CheckNodeOnline(self, instance.primary_node)
2566

    
2567
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2568
    # check bridges existance
2569
    _CheckInstanceBridgesExist(self, instance)
2570

    
2571
    _CheckNodeFreeMemory(self, instance.primary_node,
2572
                         "starting instance %s" % instance.name,
2573
                         bep[constants.BE_MEMORY], instance.hypervisor)
2574

    
2575
  def Exec(self, feedback_fn):
2576
    """Start the instance.
2577

2578
    """
2579
    instance = self.instance
2580
    force = self.op.force
2581
    extra_args = getattr(self.op, "extra_args", "")
2582

    
2583
    self.cfg.MarkInstanceUp(instance.name)
2584

    
2585
    node_current = instance.primary_node
2586

    
2587
    _StartInstanceDisks(self, instance, force)
2588

    
2589
    result = self.rpc.call_instance_start(node_current, instance, extra_args)
2590
    if result.failed or not result.data:
2591
      _ShutdownInstanceDisks(self, instance)
2592
      raise errors.OpExecError("Could not start instance")
2593

    
2594

    
2595
class LURebootInstance(LogicalUnit):
2596
  """Reboot an instance.
2597

2598
  """
2599
  HPATH = "instance-reboot"
2600
  HTYPE = constants.HTYPE_INSTANCE
2601
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2602
  REQ_BGL = False
2603

    
2604
  def ExpandNames(self):
2605
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2606
                                   constants.INSTANCE_REBOOT_HARD,
2607
                                   constants.INSTANCE_REBOOT_FULL]:
2608
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2609
                                  (constants.INSTANCE_REBOOT_SOFT,
2610
                                   constants.INSTANCE_REBOOT_HARD,
2611
                                   constants.INSTANCE_REBOOT_FULL))
2612
    self._ExpandAndLockInstance()
2613

    
2614
  def BuildHooksEnv(self):
2615
    """Build hooks env.
2616

2617
    This runs on master, primary and secondary nodes of the instance.
2618

2619
    """
2620
    env = {
2621
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2622
      }
2623
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2624
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2625
          list(self.instance.secondary_nodes))
2626
    return env, nl, nl
2627

    
2628
  def CheckPrereq(self):
2629
    """Check prerequisites.
2630

2631
    This checks that the instance is in the cluster.
2632

2633
    """
2634
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2635
    assert self.instance is not None, \
2636
      "Cannot retrieve locked instance %s" % self.op.instance_name
2637

    
2638
    _CheckNodeOnline(self, instance.primary_node)
2639

    
2640
    # check bridges existance
2641
    _CheckInstanceBridgesExist(self, instance)
2642

    
2643
  def Exec(self, feedback_fn):
2644
    """Reboot the instance.
2645

2646
    """
2647
    instance = self.instance
2648
    ignore_secondaries = self.op.ignore_secondaries
2649
    reboot_type = self.op.reboot_type
2650
    extra_args = getattr(self.op, "extra_args", "")
2651

    
2652
    node_current = instance.primary_node
2653

    
2654
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2655
                       constants.INSTANCE_REBOOT_HARD]:
2656
      result = self.rpc.call_instance_reboot(node_current, instance,
2657
                                             reboot_type, extra_args)
2658
      if result.failed or not result.data:
2659
        raise errors.OpExecError("Could not reboot instance")
2660
    else:
2661
      if not self.rpc.call_instance_shutdown(node_current, instance):
2662
        raise errors.OpExecError("could not shutdown instance for full reboot")
2663
      _ShutdownInstanceDisks(self, instance)
2664
      _StartInstanceDisks(self, instance, ignore_secondaries)
2665
      result = self.rpc.call_instance_start(node_current, instance, extra_args)
2666
      if result.failed or not result.data:
2667
        _ShutdownInstanceDisks(self, instance)
2668
        raise errors.OpExecError("Could not start instance for full reboot")
2669

    
2670
    self.cfg.MarkInstanceUp(instance.name)
2671

    
2672

    
2673
class LUShutdownInstance(LogicalUnit):
2674
  """Shutdown an instance.
2675

2676
  """
2677
  HPATH = "instance-stop"
2678
  HTYPE = constants.HTYPE_INSTANCE
2679
  _OP_REQP = ["instance_name"]
2680
  REQ_BGL = False
2681

    
2682
  def ExpandNames(self):
2683
    self._ExpandAndLockInstance()
2684

    
2685
  def BuildHooksEnv(self):
2686
    """Build hooks env.
2687

2688
    This runs on master, primary and secondary nodes of the instance.
2689

2690
    """
2691
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2692
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2693
          list(self.instance.secondary_nodes))
2694
    return env, nl, nl
2695

    
2696
  def CheckPrereq(self):
2697
    """Check prerequisites.
2698

2699
    This checks that the instance is in the cluster.
2700

2701
    """
2702
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2703
    assert self.instance is not None, \
2704
      "Cannot retrieve locked instance %s" % self.op.instance_name
2705
    _CheckNodeOnline(self, instance.primary_node)
2706

    
2707
  def Exec(self, feedback_fn):
2708
    """Shutdown the instance.
2709

2710
    """
2711
    instance = self.instance
2712
    node_current = instance.primary_node
2713
    self.cfg.MarkInstanceDown(instance.name)
2714
    result = self.rpc.call_instance_shutdown(node_current, instance)
2715
    if result.failed or not result.data:
2716
      self.proc.LogWarning("Could not shutdown instance")
2717

    
2718
    _ShutdownInstanceDisks(self, instance)
2719

    
2720

    
2721
class LUReinstallInstance(LogicalUnit):
2722
  """Reinstall an instance.
2723

2724
  """
2725
  HPATH = "instance-reinstall"
2726
  HTYPE = constants.HTYPE_INSTANCE
2727
  _OP_REQP = ["instance_name"]
2728
  REQ_BGL = False
2729

    
2730
  def ExpandNames(self):
2731
    self._ExpandAndLockInstance()
2732

    
2733
  def BuildHooksEnv(self):
2734
    """Build hooks env.
2735

2736
    This runs on master, primary and secondary nodes of the instance.
2737

2738
    """
2739
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2740
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2741
          list(self.instance.secondary_nodes))
2742
    return env, nl, nl
2743

    
2744
  def CheckPrereq(self):
2745
    """Check prerequisites.
2746

2747
    This checks that the instance is in the cluster and is not running.
2748

2749
    """
2750
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2751
    assert instance is not None, \
2752
      "Cannot retrieve locked instance %s" % self.op.instance_name
2753
    _CheckNodeOnline(self, instance.primary_node)
2754

    
2755
    if instance.disk_template == constants.DT_DISKLESS:
2756
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2757
                                 self.op.instance_name)
2758
    if instance.status != "down":
2759
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2760
                                 self.op.instance_name)
2761
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2762
                                              instance.name,
2763
                                              instance.hypervisor)
2764
    if remote_info.failed or remote_info.data:
2765
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2766
                                 (self.op.instance_name,
2767
                                  instance.primary_node))
2768

    
2769
    self.op.os_type = getattr(self.op, "os_type", None)
2770
    if self.op.os_type is not None:
2771
      # OS verification
2772
      pnode = self.cfg.GetNodeInfo(
2773
        self.cfg.ExpandNodeName(instance.primary_node))
2774
      if pnode is None:
2775
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2776
                                   self.op.pnode)
2777
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2778
      result.Raise()
2779
      if not isinstance(result.data, objects.OS):
2780
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2781
                                   " primary node"  % self.op.os_type)
2782

    
2783
    self.instance = instance
2784

    
2785
  def Exec(self, feedback_fn):
2786
    """Reinstall the instance.
2787

2788
    """
2789
    inst = self.instance
2790

    
2791
    if self.op.os_type is not None:
2792
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2793
      inst.os = self.op.os_type
2794
      self.cfg.Update(inst)
2795

    
2796
    _StartInstanceDisks(self, inst, None)
2797
    try:
2798
      feedback_fn("Running the instance OS create scripts...")
2799
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2800
      result.Raise()
2801
      if not result.data:
2802
        raise errors.OpExecError("Could not install OS for instance %s"
2803
                                 " on node %s" %
2804
                                 (inst.name, inst.primary_node))
2805
    finally:
2806
      _ShutdownInstanceDisks(self, inst)
2807

    
2808

    
2809
class LURenameInstance(LogicalUnit):
2810
  """Rename an instance.
2811

2812
  """
2813
  HPATH = "instance-rename"
2814
  HTYPE = constants.HTYPE_INSTANCE
2815
  _OP_REQP = ["instance_name", "new_name"]
2816

    
2817
  def BuildHooksEnv(self):
2818
    """Build hooks env.
2819

2820
    This runs on master, primary and secondary nodes of the instance.
2821

2822
    """
2823
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2824
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2825
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2826
          list(self.instance.secondary_nodes))
2827
    return env, nl, nl
2828

    
2829
  def CheckPrereq(self):
2830
    """Check prerequisites.
2831

2832
    This checks that the instance is in the cluster and is not running.
2833

2834
    """
2835
    instance = self.cfg.GetInstanceInfo(
2836
      self.cfg.ExpandInstanceName(self.op.instance_name))
2837
    if instance is None:
2838
      raise errors.OpPrereqError("Instance '%s' not known" %
2839
                                 self.op.instance_name)
2840
    _CheckNodeOnline(self, instance.primary_node)
2841

    
2842
    if instance.status != "down":
2843
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2844
                                 self.op.instance_name)
2845
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2846
                                              instance.name,
2847
                                              instance.hypervisor)
2848
    remote_info.Raise()
2849
    if remote_info.data:
2850
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2851
                                 (self.op.instance_name,
2852
                                  instance.primary_node))
2853
    self.instance = instance
2854

    
2855
    # new name verification
2856
    name_info = utils.HostInfo(self.op.new_name)
2857

    
2858
    self.op.new_name = new_name = name_info.name
2859
    instance_list = self.cfg.GetInstanceList()
2860
    if new_name in instance_list:
2861
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2862
                                 new_name)
2863

    
2864
    if not getattr(self.op, "ignore_ip", False):
2865
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2866
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2867
                                   (name_info.ip, new_name))
2868

    
2869

    
2870
  def Exec(self, feedback_fn):
2871
    """Reinstall the instance.
2872

2873
    """
2874
    inst = self.instance
2875
    old_name = inst.name
2876

    
2877
    if inst.disk_template == constants.DT_FILE:
2878
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2879

    
2880
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2881
    # Change the instance lock. This is definitely safe while we hold the BGL
2882
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2883
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2884

    
2885
    # re-read the instance from the configuration after rename
2886
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2887

    
2888
    if inst.disk_template == constants.DT_FILE:
2889
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2890
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2891
                                                     old_file_storage_dir,
2892
                                                     new_file_storage_dir)
2893
      result.Raise()
2894
      if not result.data:
2895
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2896
                                 " directory '%s' to '%s' (but the instance"
2897
                                 " has been renamed in Ganeti)" % (
2898
                                 inst.primary_node, old_file_storage_dir,
2899
                                 new_file_storage_dir))
2900

    
2901
      if not result.data[0]:
2902
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2903
                                 " (but the instance has been renamed in"
2904
                                 " Ganeti)" % (old_file_storage_dir,
2905
                                               new_file_storage_dir))
2906

    
2907
    _StartInstanceDisks(self, inst, None)
2908
    try:
2909
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2910
                                                 old_name)
2911
      if result.failed or not result.data:
2912
        msg = ("Could not run OS rename script for instance %s on node %s"
2913
               " (but the instance has been renamed in Ganeti)" %
2914
               (inst.name, inst.primary_node))
2915
        self.proc.LogWarning(msg)
2916
    finally:
2917
      _ShutdownInstanceDisks(self, inst)
2918

    
2919

    
2920
class LURemoveInstance(LogicalUnit):
2921
  """Remove an instance.
2922

2923
  """
2924
  HPATH = "instance-remove"
2925
  HTYPE = constants.HTYPE_INSTANCE
2926
  _OP_REQP = ["instance_name", "ignore_failures"]
2927
  REQ_BGL = False
2928

    
2929
  def ExpandNames(self):
2930
    self._ExpandAndLockInstance()
2931
    self.needed_locks[locking.LEVEL_NODE] = []
2932
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2933

    
2934
  def DeclareLocks(self, level):
2935
    if level == locking.LEVEL_NODE:
2936
      self._LockInstancesNodes()
2937

    
2938
  def BuildHooksEnv(self):
2939
    """Build hooks env.
2940

2941
    This runs on master, primary and secondary nodes of the instance.
2942

2943
    """
2944
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2945
    nl = [self.cfg.GetMasterNode()]
2946
    return env, nl, nl
2947

    
2948
  def CheckPrereq(self):
2949
    """Check prerequisites.
2950

2951
    This checks that the instance is in the cluster.
2952

2953
    """
2954
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2955
    assert self.instance is not None, \
2956
      "Cannot retrieve locked instance %s" % self.op.instance_name
2957

    
2958
  def Exec(self, feedback_fn):
2959
    """Remove the instance.
2960

2961
    """
2962
    instance = self.instance
2963
    logging.info("Shutting down instance %s on node %s",
2964
                 instance.name, instance.primary_node)
2965

    
2966
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2967
    if result.failed or not result.data:
2968
      if self.op.ignore_failures:
2969
        feedback_fn("Warning: can't shutdown instance")
2970
      else:
2971
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2972
                                 (instance.name, instance.primary_node))
2973

    
2974
    logging.info("Removing block devices for instance %s", instance.name)
2975

    
2976
    if not _RemoveDisks(self, instance):
2977
      if self.op.ignore_failures:
2978
        feedback_fn("Warning: can't remove instance's disks")
2979
      else:
2980
        raise errors.OpExecError("Can't remove instance's disks")
2981

    
2982
    logging.info("Removing instance %s out of cluster config", instance.name)
2983

    
2984
    self.cfg.RemoveInstance(instance.name)
2985
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2986

    
2987

    
2988
class LUQueryInstances(NoHooksLU):
2989
  """Logical unit for querying instances.
2990

2991
  """
2992
  _OP_REQP = ["output_fields", "names"]
2993
  REQ_BGL = False
2994
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2995
                                    "admin_state", "admin_ram",
2996
                                    "disk_template", "ip", "mac", "bridge",
2997
                                    "sda_size", "sdb_size", "vcpus", "tags",
2998
                                    "network_port", "beparams",
2999
                                    "(disk).(size)/([0-9]+)",
3000
                                    "(disk).(sizes)",
3001
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
3002
                                    "(nic).(macs|ips|bridges)",
3003
                                    "(disk|nic).(count)",
3004
                                    "serial_no", "hypervisor", "hvparams",] +
3005
                                  ["hv/%s" % name
3006
                                   for name in constants.HVS_PARAMETERS] +
3007
                                  ["be/%s" % name
3008
                                   for name in constants.BES_PARAMETERS])
3009
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3010

    
3011

    
3012
  def ExpandNames(self):
3013
    _CheckOutputFields(static=self._FIELDS_STATIC,
3014
                       dynamic=self._FIELDS_DYNAMIC,
3015
                       selected=self.op.output_fields)
3016

    
3017
    self.needed_locks = {}
3018
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3019
    self.share_locks[locking.LEVEL_NODE] = 1
3020

    
3021
    if self.op.names:
3022
      self.wanted = _GetWantedInstances(self, self.op.names)
3023
    else:
3024
      self.wanted = locking.ALL_SET
3025

    
3026
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3027
    if self.do_locking:
3028
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3029
      self.needed_locks[locking.LEVEL_NODE] = []
3030
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3031

    
3032
  def DeclareLocks(self, level):
3033
    if level == locking.LEVEL_NODE and self.do_locking:
3034
      self._LockInstancesNodes()
3035

    
3036
  def CheckPrereq(self):
3037
    """Check prerequisites.
3038

3039
    """
3040
    pass
3041

    
3042
  def Exec(self, feedback_fn):
3043
    """Computes the list of nodes and their attributes.
3044

3045
    """
3046
    all_info = self.cfg.GetAllInstancesInfo()
3047
    if self.do_locking:
3048
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3049
    elif self.wanted != locking.ALL_SET:
3050
      instance_names = self.wanted
3051
      missing = set(instance_names).difference(all_info.keys())
3052
      if missing:
3053
        raise errors.OpExecError(
3054
          "Some instances were removed before retrieving their data: %s"
3055
          % missing)
3056
    else:
3057
      instance_names = all_info.keys()
3058

    
3059
    instance_names = utils.NiceSort(instance_names)
3060
    instance_list = [all_info[iname] for iname in instance_names]
3061

    
3062
    # begin data gathering
3063

    
3064
    nodes = frozenset([inst.primary_node for inst in instance_list])
3065
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3066

    
3067
    bad_nodes = []
3068
    off_nodes = []
3069
    if self.do_locking:
3070
      live_data = {}
3071
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3072
      for name in nodes:
3073
        result = node_data[name]
3074
        if result.offline:
3075
          # offline nodes will be in both lists
3076
          off_nodes.append(name)
3077
        if result.failed:
3078
          bad_nodes.append(name)
3079
        else:
3080
          if result.data:
3081
            live_data.update(result.data)
3082
            # else no instance is alive
3083
    else:
3084
      live_data = dict([(name, {}) for name in instance_names])
3085

    
3086
    # end data gathering
3087

    
3088
    HVPREFIX = "hv/"
3089
    BEPREFIX = "be/"
3090
    output = []
3091
    for instance in instance_list:
3092
      iout = []
3093
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3094
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3095
      for field in self.op.output_fields:
3096
        st_match = self._FIELDS_STATIC.Matches(field)
3097
        if field == "name":
3098
          val = instance.name
3099
        elif field == "os":
3100
          val = instance.os
3101
        elif field == "pnode":
3102
          val = instance.primary_node
3103
        elif field == "snodes":
3104
          val = list(instance.secondary_nodes)
3105
        elif field == "admin_state":
3106
          val = (instance.status != "down")
3107
        elif field == "oper_state":
3108
          if instance.primary_node in bad_nodes:
3109
            val = None
3110
          else:
3111
            val = bool(live_data.get(instance.name))
3112
        elif field == "status":
3113
          if instance.primary_node in off_nodes:
3114
            val = "ERROR_nodeoffline"
3115
          elif instance.primary_node in bad_nodes:
3116
            val = "ERROR_nodedown"
3117
          else:
3118
            running = bool(live_data.get(instance.name))
3119
            if running:
3120
              if instance.status != "down":
3121
                val = "running"
3122
              else:
3123
                val = "ERROR_up"
3124
            else:
3125
              if instance.status != "down":
3126
                val = "ERROR_down"
3127
              else:
3128
                val = "ADMIN_down"
3129
        elif field == "oper_ram":
3130
          if instance.primary_node in bad_nodes:
3131
            val = None
3132
          elif instance.name in live_data:
3133
            val = live_data[instance.name].get("memory", "?")
3134
          else:
3135
            val = "-"
3136
        elif field == "disk_template":
3137
          val = instance.disk_template
3138
        elif field == "ip":
3139
          val = instance.nics[0].ip
3140
        elif field == "bridge":
3141
          val = instance.nics[0].bridge
3142
        elif field == "mac":
3143
          val = instance.nics[0].mac
3144
        elif field == "sda_size" or field == "sdb_size":
3145
          idx = ord(field[2]) - ord('a')
3146
          try:
3147
            val = instance.FindDisk(idx).size
3148
          except errors.OpPrereqError:
3149
            val = None
3150
        elif field == "tags":
3151
          val = list(instance.GetTags())
3152
        elif field == "serial_no":
3153
          val = instance.serial_no
3154
        elif field == "network_port":
3155
          val = instance.network_port
3156
        elif field == "hypervisor":
3157
          val = instance.hypervisor
3158
        elif field == "hvparams":
3159
          val = i_hv
3160
        elif (field.startswith(HVPREFIX) and
3161
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3162
          val = i_hv.get(field[len(HVPREFIX):], None)
3163
        elif field == "beparams":
3164
          val = i_be
3165
        elif (field.startswith(BEPREFIX) and
3166
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3167
          val = i_be.get(field[len(BEPREFIX):], None)
3168
        elif st_match and st_match.groups():
3169
          # matches a variable list
3170
          st_groups = st_match.groups()
3171
          if st_groups and st_groups[0] == "disk":
3172
            if st_groups[1] == "count":
3173
              val = len(instance.disks)
3174
            elif st_groups[1] == "sizes":
3175
              val = [disk.size for disk in instance.disks]
3176
            elif st_groups[1] == "size":
3177
              try:
3178
                val = instance.FindDisk(st_groups[2]).size
3179
              except errors.OpPrereqError:
3180
                val = None
3181
            else:
3182
              assert False, "Unhandled disk parameter"
3183
          elif st_groups[0] == "nic":
3184
            if st_groups[1] == "count":
3185
              val = len(instance.nics)
3186
            elif st_groups[1] == "macs":
3187
              val = [nic.mac for nic in instance.nics]
3188
            elif st_groups[1] == "ips":
3189
              val = [nic.ip for nic in instance.nics]
3190
            elif st_groups[1] == "bridges":
3191
              val = [nic.bridge for nic in instance.nics]
3192
            else:
3193
              # index-based item
3194
              nic_idx = int(st_groups[2])
3195
              if nic_idx >= len(instance.nics):
3196
                val = None
3197
              else:
3198
                if st_groups[1] == "mac":
3199
                  val = instance.nics[nic_idx].mac
3200
                elif st_groups[1] == "ip":
3201
                  val = instance.nics[nic_idx].ip
3202
                elif st_groups[1] == "bridge":
3203
                  val = instance.nics[nic_idx].bridge
3204
                else:
3205
                  assert False, "Unhandled NIC parameter"
3206
          else:
3207
            assert False, "Unhandled variable parameter"
3208
        else:
3209
          raise errors.ParameterError(field)
3210
        iout.append(val)
3211
      output.append(iout)
3212

    
3213
    return output
3214

    
3215

    
3216
class LUFailoverInstance(LogicalUnit):
3217
  """Failover an instance.
3218

3219
  """
3220
  HPATH = "instance-failover"
3221
  HTYPE = constants.HTYPE_INSTANCE
3222
  _OP_REQP = ["instance_name", "ignore_consistency"]
3223
  REQ_BGL = False
3224

    
3225
  def ExpandNames(self):
3226
    self._ExpandAndLockInstance()
3227
    self.needed_locks[locking.LEVEL_NODE] = []
3228
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3229

    
3230
  def DeclareLocks(self, level):
3231
    if level == locking.LEVEL_NODE:
3232
      self._LockInstancesNodes()
3233

    
3234
  def BuildHooksEnv(self):
3235
    """Build hooks env.
3236

3237
    This runs on master, primary and secondary nodes of the instance.
3238

3239
    """
3240
    env = {
3241
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3242
      }
3243
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3244
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3245
    return env, nl, nl
3246

    
3247
  def CheckPrereq(self):
3248
    """Check prerequisites.
3249

3250
    This checks that the instance is in the cluster.
3251

3252
    """
3253
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3254
    assert self.instance is not None, \
3255
      "Cannot retrieve locked instance %s" % self.op.instance_name
3256

    
3257
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3258
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3259
      raise errors.OpPrereqError("Instance's disk layout is not"
3260
                                 " network mirrored, cannot failover.")
3261

    
3262
    secondary_nodes = instance.secondary_nodes
3263
    if not secondary_nodes:
3264
      raise errors.ProgrammerError("no secondary node but using "
3265
                                   "a mirrored disk template")
3266

    
3267
    target_node = secondary_nodes[0]
3268
    _CheckNodeOnline(self, target_node)
3269
    # check memory requirements on the secondary node
3270
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3271
                         instance.name, bep[constants.BE_MEMORY],
3272
                         instance.hypervisor)
3273

    
3274
    # check bridge existance
3275
    brlist = [nic.bridge for nic in instance.nics]
3276
    result = self.rpc.call_bridges_exist(target_node, brlist)
3277
    result.Raise()
3278
    if not result.data:
3279
      raise errors.OpPrereqError("One or more target bridges %s does not"
3280
                                 " exist on destination node '%s'" %
3281
                                 (brlist, target_node))
3282

    
3283
  def Exec(self, feedback_fn):
3284
    """Failover an instance.
3285

3286
    The failover is done by shutting it down on its present node and
3287
    starting it on the secondary.
3288

3289
    """
3290
    instance = self.instance
3291

    
3292
    source_node = instance.primary_node
3293
    target_node = instance.secondary_nodes[0]
3294

    
3295
    feedback_fn("* checking disk consistency between source and target")
3296
    for dev in instance.disks:
3297
      # for drbd, these are drbd over lvm
3298
      if not _CheckDiskConsistency(self, dev, target_node, False):
3299
        if instance.status == "up" and not self.op.ignore_consistency:
3300
          raise errors.OpExecError("Disk %s is degraded on target node,"
3301
                                   " aborting failover." % dev.iv_name)
3302

    
3303
    feedback_fn("* shutting down instance on source node")
3304
    logging.info("Shutting down instance %s on node %s",
3305
                 instance.name, source_node)
3306

    
3307
    result = self.rpc.call_instance_shutdown(source_node, instance)
3308
    if result.failed or not result.data:
3309
      if self.op.ignore_consistency:
3310
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3311
                             " Proceeding"
3312
                             " anyway. Please make sure node %s is down",
3313
                             instance.name, source_node, source_node)
3314
      else:
3315
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3316
                                 (instance.name, source_node))
3317

    
3318
    feedback_fn("* deactivating the instance's disks on source node")
3319
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3320
      raise errors.OpExecError("Can't shut down the instance's disks.")
3321

    
3322
    instance.primary_node = target_node
3323
    # distribute new instance config to the other nodes
3324
    self.cfg.Update(instance)
3325

    
3326
    # Only start the instance if it's marked as up
3327
    if instance.status == "up":
3328
      feedback_fn("* activating the instance's disks on target node")
3329
      logging.info("Starting instance %s on node %s",
3330
                   instance.name, target_node)
3331

    
3332
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3333
                                               ignore_secondaries=True)
3334
      if not disks_ok:
3335
        _ShutdownInstanceDisks(self, instance)
3336
        raise errors.OpExecError("Can't activate the instance's disks")
3337

    
3338
      feedback_fn("* starting the instance on the target node")
3339
      result = self.rpc.call_instance_start(target_node, instance, None)
3340
      if result.failed or not result.data:
3341
        _ShutdownInstanceDisks(self, instance)
3342
        raise errors.OpExecError("Could not start instance %s on node %s." %
3343
                                 (instance.name, target_node))
3344

    
3345

    
3346
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3347
  """Create a tree of block devices on the primary node.
3348

3349
  This always creates all devices.
3350

3351
  """
3352
  if device.children:
3353
    for child in device.children:
3354
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3355
        return False
3356

    
3357
  lu.cfg.SetDiskID(device, node)
3358
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3359
                                       instance.name, True, info)
3360
  if new_id.failed or not new_id.data:
3361
    return False
3362
  if device.physical_id is None:
3363
    device.physical_id = new_id
3364
  return True
3365

    
3366

    
3367
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3368
  """Create a tree of block devices on a secondary node.
3369

3370
  If this device type has to be created on secondaries, create it and
3371
  all its children.
3372

3373
  If not, just recurse to children keeping the same 'force' value.
3374

3375
  """
3376
  if device.CreateOnSecondary():
3377
    force = True
3378
  if device.children:
3379
    for child in device.children:
3380
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3381
                                        child, force, info):
3382
        return False
3383

    
3384
  if not force:
3385
    return True
3386
  lu.cfg.SetDiskID(device, node)
3387
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3388
                                       instance.name, False, info)
3389
  if new_id.failed or not new_id.data:
3390
    return False
3391
  if device.physical_id is None:
3392
    device.physical_id = new_id
3393
  return True
3394

    
3395

    
3396
def _GenerateUniqueNames(lu, exts):
3397
  """Generate a suitable LV name.
3398

3399
  This will generate a logical volume name for the given instance.
3400

3401
  """
3402
  results = []
3403
  for val in exts:
3404
    new_id = lu.cfg.GenerateUniqueID()
3405
    results.append("%s%s" % (new_id, val))
3406
  return results
3407

    
3408

    
3409
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3410
                         p_minor, s_minor):
3411
  """Generate a drbd8 device complete with its children.
3412

3413
  """
3414
  port = lu.cfg.AllocatePort()
3415
  vgname = lu.cfg.GetVGName()
3416
  shared_secret = lu.cfg.GenerateDRBDSecret()
3417
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3418
                          logical_id=(vgname, names[0]))
3419
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3420
                          logical_id=(vgname, names[1]))
3421
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3422
                          logical_id=(primary, secondary, port,
3423
                                      p_minor, s_minor,
3424
                                      shared_secret),
3425
                          children=[dev_data, dev_meta],
3426
                          iv_name=iv_name)
3427
  return drbd_dev
3428

    
3429

    
3430
def _GenerateDiskTemplate(lu, template_name,
3431
                          instance_name, primary_node,
3432
                          secondary_nodes, disk_info,
3433
                          file_storage_dir, file_driver,
3434
                          base_index):
3435
  """Generate the entire disk layout for a given template type.
3436

3437
  """
3438
  #TODO: compute space requirements
3439

    
3440
  vgname = lu.cfg.GetVGName()
3441
  disk_count = len(disk_info)
3442
  disks = []
3443
  if template_name == constants.DT_DISKLESS:
3444
    pass
3445
  elif template_name == constants.DT_PLAIN:
3446
    if len(secondary_nodes) != 0:
3447
      raise errors.ProgrammerError("Wrong template configuration")
3448

    
3449
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3450
                                      for i in range(disk_count)])
3451
    for idx, disk in enumerate(disk_info):
3452
      disk_index = idx + base_index
3453
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3454
                              logical_id=(vgname, names[idx]),
3455
                              iv_name="disk/%d" % disk_index)
3456
      disks.append(disk_dev)
3457
  elif template_name == constants.DT_DRBD8:
3458
    if len(secondary_nodes) != 1:
3459
      raise errors.ProgrammerError("Wrong template configuration")
3460
    remote_node = secondary_nodes[0]
3461
    minors = lu.cfg.AllocateDRBDMinor(
3462
      [primary_node, remote_node] * len(disk_info), instance_name)
3463

    
3464
    names = _GenerateUniqueNames(lu,
3465
                                 [".disk%d_%s" % (i, s)
3466
                                  for i in range(disk_count)
3467
                                  for s in ("data", "meta")
3468
                                  ])
3469
    for idx, disk in enumerate(disk_info):
3470
      disk_index = idx + base_index
3471
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3472
                                      disk["size"], names[idx*2:idx*2+2],
3473
                                      "disk/%d" % disk_index,
3474
                                      minors[idx*2], minors[idx*2+1])
3475
      disks.append(disk_dev)
3476
  elif template_name == constants.DT_FILE:
3477
    if len(secondary_nodes) != 0:
3478
      raise errors.ProgrammerError("Wrong template configuration")
3479

    
3480
    for idx, disk in enumerate(disk_info):
3481
      disk_index = idx + base_index
3482
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3483
                              iv_name="disk/%d" % disk_index,
3484
                              logical_id=(file_driver,
3485
                                          "%s/disk%d" % (file_storage_dir,
3486
                                                         idx)))
3487
      disks.append(disk_dev)
3488
  else:
3489
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3490
  return disks
3491

    
3492

    
3493
def _GetInstanceInfoText(instance):
3494
  """Compute that text that should be added to the disk's metadata.
3495

3496
  """
3497
  return "originstname+%s" % instance.name
3498

    
3499

    
3500
def _CreateDisks(lu, instance):
3501
  """Create all disks for an instance.
3502

3503
  This abstracts away some work from AddInstance.
3504

3505
  @type lu: L{LogicalUnit}
3506
  @param lu: the logical unit on whose behalf we execute
3507
  @type instance: L{objects.Instance}
3508
  @param instance: the instance whose disks we should create
3509
  @rtype: boolean
3510
  @return: the success of the creation
3511

3512
  """
3513
  info = _GetInstanceInfoText(instance)
3514

    
3515
  if instance.disk_template == constants.DT_FILE:
3516
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3517
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3518
                                                 file_storage_dir)
3519

    
3520
    if result.failed or not result.data:
3521
      logging.error("Could not connect to node '%s'", instance.primary_node)
3522
      return False
3523

    
3524
    if not result.data[0]:
3525
      logging.error("Failed to create directory '%s'", file_storage_dir)
3526
      return False
3527

    
3528
  # Note: this needs to be kept in sync with adding of disks in
3529
  # LUSetInstanceParams
3530
  for device in instance.disks:
3531
    logging.info("Creating volume %s for instance %s",
3532
                 device.iv_name, instance.name)
3533
    #HARDCODE
3534
    for secondary_node in instance.secondary_nodes:
3535
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3536
                                        device, False, info):
3537
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3538
                      device.iv_name, device, secondary_node)
3539
        return False
3540
    #HARDCODE
3541
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3542
                                    instance, device, info):
3543
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3544
      return False
3545

    
3546
  return True
3547

    
3548

    
3549
def _RemoveDisks(lu, instance):
3550
  """Remove all disks for an instance.
3551

3552
  This abstracts away some work from `AddInstance()` and
3553
  `RemoveInstance()`. Note that in case some of the devices couldn't
3554
  be removed, the removal will continue with the other ones (compare
3555
  with `_CreateDisks()`).
3556

3557
  @type lu: L{LogicalUnit}
3558
  @param lu: the logical unit on whose behalf we execute
3559
  @type instance: L{objects.Instance}
3560
  @param instance: the instance whose disks we should remove
3561
  @rtype: boolean
3562
  @return: the success of the removal
3563

3564
  """
3565
  logging.info("Removing block devices for instance %s", instance.name)
3566

    
3567
  result = True
3568
  for device in instance.disks:
3569
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3570
      lu.cfg.SetDiskID(disk, node)
3571
      result = lu.rpc.call_blockdev_remove(node, disk)
3572
      if result.failed or not result.data:
3573
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3574
                           " continuing anyway", device.iv_name, node)
3575
        result = False
3576

    
3577
  if instance.disk_template == constants.DT_FILE:
3578
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3579
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3580
                                                 file_storage_dir)
3581
    if result.failed or not result.data:
3582
      logging.error("Could not remove directory '%s'", file_storage_dir)
3583
      result = False
3584

    
3585
  return result
3586

    
3587

    
3588
def _ComputeDiskSize(disk_template, disks):
3589
  """Compute disk size requirements in the volume group
3590

3591
  """
3592
  # Required free disk space as a function of disk and swap space
3593
  req_size_dict = {
3594
    constants.DT_DISKLESS: None,
3595
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3596
    # 128 MB are added for drbd metadata for each disk
3597
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3598
    constants.DT_FILE: None,
3599
  }
3600

    
3601
  if disk_template not in req_size_dict:
3602
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3603
                                 " is unknown" %  disk_template)
3604

    
3605
  return req_size_dict[disk_template]
3606

    
3607

    
3608
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3609
  """Hypervisor parameter validation.
3610

3611
  This function abstract the hypervisor parameter validation to be
3612
  used in both instance create and instance modify.
3613

3614
  @type lu: L{LogicalUnit}
3615
  @param lu: the logical unit for which we check
3616
  @type nodenames: list
3617
  @param nodenames: the list of nodes on which we should check
3618
  @type hvname: string
3619
  @param hvname: the name of the hypervisor we should use
3620
  @type hvparams: dict
3621
  @param hvparams: the parameters which we need to check
3622
  @raise errors.OpPrereqError: if the parameters are not valid
3623

3624
  """
3625
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3626
                                                  hvname,
3627
                                                  hvparams)
3628
  for node in nodenames:
3629
    info = hvinfo[node]
3630
    info.Raise()
3631
    if not info.data or not isinstance(info.data, (tuple, list)):
3632
      raise errors.OpPrereqError("Cannot get current information"
3633
                                 " from node '%s' (%s)" % (node, info.data))
3634
    if not info.data[0]:
3635
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3636
                                 " %s" % info.data[1])
3637

    
3638

    
3639
class LUCreateInstance(LogicalUnit):
3640
  """Create an instance.
3641

3642
  """
3643
  HPATH = "instance-add"
3644
  HTYPE = constants.HTYPE_INSTANCE
3645
  _OP_REQP = ["instance_name", "disks", "disk_template",
3646
              "mode", "start",
3647
              "wait_for_sync", "ip_check", "nics",
3648
              "hvparams", "beparams"]
3649
  REQ_BGL = False
3650

    
3651
  def _ExpandNode(self, node):
3652
    """Expands and checks one node name.
3653

3654
    """
3655
    node_full = self.cfg.ExpandNodeName(node)
3656
    if node_full is None:
3657
      raise errors.OpPrereqError("Unknown node %s" % node)
3658
    return node_full
3659

    
3660
  def ExpandNames(self):
3661
    """ExpandNames for CreateInstance.
3662

3663
    Figure out the right locks for instance creation.
3664

3665
    """
3666
    self.needed_locks = {}
3667

    
3668
    # set optional parameters to none if they don't exist
3669
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3670
      if not hasattr(self.op, attr):
3671
        setattr(self.op, attr, None)
3672

    
3673
    # cheap checks, mostly valid constants given
3674

    
3675
    # verify creation mode
3676
    if self.op.mode not in (constants.INSTANCE_CREATE,
3677
                            constants.INSTANCE_IMPORT):
3678
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3679
                                 self.op.mode)
3680

    
3681
    # disk template and mirror node verification
3682
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3683
      raise errors.OpPrereqError("Invalid disk template name")
3684

    
3685
    if self.op.hypervisor is None:
3686
      self.op.hypervisor = self.cfg.GetHypervisorType()
3687

    
3688
    cluster = self.cfg.GetClusterInfo()
3689
    enabled_hvs = cluster.enabled_hypervisors
3690
    if self.op.hypervisor not in enabled_hvs:
3691
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3692
                                 " cluster (%s)" % (self.op.hypervisor,
3693
                                  ",".join(enabled_hvs)))
3694

    
3695
    # check hypervisor parameter syntax (locally)
3696

    
3697
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3698
                                  self.op.hvparams)
3699
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3700
    hv_type.CheckParameterSyntax(filled_hvp)
3701

    
3702
    # fill and remember the beparams dict
3703
    utils.CheckBEParams(self.op.beparams)
3704
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3705
                                    self.op.beparams)
3706

    
3707
    #### instance parameters check
3708

    
3709
    # instance name verification
3710
    hostname1 = utils.HostInfo(self.op.instance_name)
3711
    self.op.instance_name = instance_name = hostname1.name
3712

    
3713
    # this is just a preventive check, but someone might still add this
3714
    # instance in the meantime, and creation will fail at lock-add time
3715
    if instance_name in self.cfg.GetInstanceList():
3716
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3717
                                 instance_name)
3718

    
3719
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3720

    
3721
    # NIC buildup
3722
    self.nics = []
3723
    for nic in self.op.nics:
3724
      # ip validity checks
3725
      ip = nic.get("ip", None)
3726
      if ip is None or ip.lower() == "none":
3727
        nic_ip = None
3728
      elif ip.lower() == constants.VALUE_AUTO:
3729
        nic_ip = hostname1.ip
3730
      else:
3731
        if not utils.IsValidIP(ip):
3732
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3733
                                     " like a valid IP" % ip)
3734
        nic_ip = ip
3735

    
3736
      # MAC address verification
3737
      mac = nic.get("mac", constants.VALUE_AUTO)
3738
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3739
        if not utils.IsValidMac(mac.lower()):
3740
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3741
                                     mac)
3742
      # bridge verification
3743
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3744
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3745

    
3746
    # disk checks/pre-build
3747
    self.disks = []
3748
    for disk in self.op.disks:
3749
      mode = disk.get("mode", constants.DISK_RDWR)
3750
      if mode not in constants.DISK_ACCESS_SET:
3751
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3752
                                   mode)
3753
      size = disk.get("size", None)
3754
      if size is None:
3755
        raise errors.OpPrereqError("Missing disk size")
3756
      try:
3757
        size = int(size)
3758
      except ValueError:
3759
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3760
      self.disks.append({"size": size, "mode": mode})
3761

    
3762
    # used in CheckPrereq for ip ping check
3763
    self.check_ip = hostname1.ip
3764

    
3765
    # file storage checks
3766
    if (self.op.file_driver and
3767
        not self.op.file_driver in constants.FILE_DRIVER):
3768
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3769
                                 self.op.file_driver)
3770

    
3771
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3772
      raise errors.OpPrereqError("File storage directory path not absolute")
3773

    
3774
    ### Node/iallocator related checks
3775
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3776
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3777
                                 " node must be given")
3778

    
3779
    if self.op.iallocator:
3780
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3781
    else:
3782
      self.op.pnode = self._ExpandNode(self.op.pnode)
3783
      nodelist = [self.op.pnode]
3784
      if self.op.snode is not None:
3785
        self.op.snode = self._ExpandNode(self.op.snode)
3786
        nodelist.append(self.op.snode)
3787
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3788

    
3789
    # in case of import lock the source node too
3790
    if self.op.mode == constants.INSTANCE_IMPORT:
3791
      src_node = getattr(self.op, "src_node", None)
3792
      src_path = getattr(self.op, "src_path", None)
3793

    
3794
      if src_path is None:
3795
        self.op.src_path = src_path = self.op.instance_name
3796

    
3797
      if src_node is None:
3798
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3799
        self.op.src_node = None
3800
        if os.path.isabs(src_path):
3801
          raise errors.OpPrereqError("Importing an instance from an absolute"
3802
                                     " path requires a source node option.")
3803
      else:
3804
        self.op.src_node = src_node = self._ExpandNode(src_node)
3805
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3806
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
3807
        if not os.path.isabs(src_path):
3808
          self.op.src_path = src_path = \
3809
            os.path.join(constants.EXPORT_DIR, src_path)
3810

    
3811
    else: # INSTANCE_CREATE
3812
      if getattr(self.op, "os_type", None) is None:
3813
        raise errors.OpPrereqError("No guest OS specified")
3814

    
3815
  def _RunAllocator(self):
3816
    """Run the allocator based on input opcode.
3817

3818
    """
3819
    nics = [n.ToDict() for n in self.nics]
3820
    ial = IAllocator(self,
3821
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3822
                     name=self.op.instance_name,
3823
                     disk_template=self.op.disk_template,
3824
                     tags=[],
3825
                     os=self.op.os_type,
3826
                     vcpus=self.be_full[constants.BE_VCPUS],
3827
                     mem_size=self.be_full[constants.BE_MEMORY],
3828
                     disks=self.disks,
3829
                     nics=nics,
3830
                     hypervisor=self.op.hypervisor,
3831
                     )
3832

    
3833
    ial.Run(self.op.iallocator)
3834

    
3835
    if not ial.success:
3836
      raise errors.OpPrereqError("Can't compute nodes using"
3837
                                 " iallocator '%s': %s" % (self.op.iallocator,
3838
                                                           ial.info))
3839
    if len(ial.nodes) != ial.required_nodes:
3840
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3841
                                 " of nodes (%s), required %s" %
3842
                                 (self.op.iallocator, len(ial.nodes),
3843
                                  ial.required_nodes))
3844
    self.op.pnode = ial.nodes[0]
3845
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3846
                 self.op.instance_name, self.op.iallocator,
3847
                 ", ".join(ial.nodes))
3848
    if ial.required_nodes == 2:
3849
      self.op.snode = ial.nodes[1]
3850

    
3851
  def BuildHooksEnv(self):
3852
    """Build hooks env.
3853

3854
    This runs on master, primary and secondary nodes of the instance.
3855

3856
    """
3857
    env = {
3858
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3859
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3860
      "INSTANCE_ADD_MODE": self.op.mode,
3861
      }
3862
    if self.op.mode == constants.INSTANCE_IMPORT:
3863
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3864
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3865
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3866

    
3867
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3868
      primary_node=self.op.pnode,
3869
      secondary_nodes=self.secondaries,
3870
      status=self.instance_status,
3871
      os_type=self.op.os_type,
3872
      memory=self.be_full[constants.BE_MEMORY],
3873
      vcpus=self.be_full[constants.BE_VCPUS],
3874
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3875
    ))
3876

    
3877
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3878
          self.secondaries)
3879
    return env, nl, nl
3880

    
3881

    
3882
  def CheckPrereq(self):
3883
    """Check prerequisites.
3884

3885
    """
3886
    if (not self.cfg.GetVGName() and
3887
        self.op.disk_template not in constants.DTS_NOT_LVM):
3888
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3889
                                 " instances")
3890

    
3891

    
3892
    if self.op.mode == constants.INSTANCE_IMPORT:
3893
      src_node = self.op.src_node
3894
      src_path = self.op.src_path
3895

    
3896
      if src_node is None:
3897
        exp_list = self.rpc.call_export_list(
3898
          self.acquired_locks[locking.LEVEL_NODE])
3899
        found = False
3900
        for node in exp_list:
3901
          if not exp_list[node].failed and src_path in exp_list[node].data:
3902
            found = True
3903
            self.op.src_node = src_node = node
3904
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3905
                                                       src_path)
3906
            break
3907
        if not found:
3908
          raise errors.OpPrereqError("No export found for relative path %s" %
3909
                                      src_path)
3910

    
3911
      _CheckNodeOnline(self, src_node)
3912
      result = self.rpc.call_export_info(src_node, src_path)
3913
      result.Raise()
3914
      if not result.data:
3915
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3916

    
3917
      export_info = result.data
3918
      if not export_info.has_section(constants.INISECT_EXP):
3919
        raise errors.ProgrammerError("Corrupted export config")
3920

    
3921
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3922
      if (int(ei_version) != constants.EXPORT_VERSION):
3923
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3924
                                   (ei_version, constants.EXPORT_VERSION))
3925

    
3926
      # Check that the new instance doesn't have less disks than the export
3927
      instance_disks = len(self.disks)
3928
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3929
      if instance_disks < export_disks:
3930
        raise errors.OpPrereqError("Not enough disks to import."
3931
                                   " (instance: %d, export: %d)" %
3932
                                   (instance_disks, export_disks))
3933

    
3934
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3935
      disk_images = []
3936
      for idx in range(export_disks):
3937
        option = 'disk%d_dump' % idx
3938
        if export_info.has_option(constants.INISECT_INS, option):
3939
          # FIXME: are the old os-es, disk sizes, etc. useful?
3940
          export_name = export_info.get(constants.INISECT_INS, option)
3941
          image = os.path.join(src_path, export_name)
3942
          disk_images.append(image)
3943
        else:
3944
          disk_images.append(False)
3945

    
3946
      self.src_images = disk_images
3947

    
3948
      old_name = export_info.get(constants.INISECT_INS, 'name')
3949
      # FIXME: int() here could throw a ValueError on broken exports
3950
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3951
      if self.op.instance_name == old_name:
3952
        for idx, nic in enumerate(self.nics):
3953
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3954
            nic_mac_ini = 'nic%d_mac' % idx
3955
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3956

    
3957
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3958
    if self.op.start and not self.op.ip_check:
3959
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3960
                                 " adding an instance in start mode")
3961

    
3962
    if self.op.ip_check:
3963
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3964
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3965
                                   (self.check_ip, self.op.instance_name))
3966

    
3967
    #### allocator run
3968

    
3969
    if self.op.iallocator is not None:
3970
      self._RunAllocator()
3971

    
3972
    #### node related checks
3973

    
3974
    # check primary node
3975
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3976
    assert self.pnode is not None, \
3977
      "Cannot retrieve locked node %s" % self.op.pnode
3978
    if pnode.offline:
3979
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
3980
                                 pnode.name)
3981

    
3982
    self.secondaries = []
3983

    
3984
    # mirror node verification
3985
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3986
      if self.op.snode is None:
3987
        raise errors.OpPrereqError("The networked disk templates need"
3988
                                   " a mirror node")
3989
      if self.op.snode == pnode.name:
3990
        raise errors.OpPrereqError("The secondary node cannot be"
3991
                                   " the primary node.")
3992
      self.secondaries.append(self.op.snode)
3993
      _CheckNodeOnline(self, self.op.snode)
3994

    
3995
    nodenames = [pnode.name] + self.secondaries
3996

    
3997
    req_size = _ComputeDiskSize(self.op.disk_template,
3998
                                self.disks)
3999

    
4000
    # Check lv size requirements
4001
    if req_size is not None:
4002
      nodeinfo = self.rpc.call_node_info