Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 25361b9a

History | View | Annotate | Download (212.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = lu.cfg.GetInstanceList()
396
  return utils.NiceSort(wanted)
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419
                          memory, vcpus, nics):
420
  """Builds instance related env variables for hooks
421

422
  This builds the hook environment from individual variables.
423

424
  @type name: string
425
  @param name: the name of the instance
426
  @type primary_node: string
427
  @param primary_node: the name of the instance's primary node
428
  @type secondary_nodes: list
429
  @param secondary_nodes: list of secondary nodes as strings
430
  @type os_type: string
431
  @param os_type: the name of the instance's OS
432
  @type status: string
433
  @param status: the desired status of the instances
434
  @type memory: string
435
  @param memory: the memory size of the instance
436
  @type vcpus: string
437
  @param vcpus: the count of VCPUs the instance has
438
  @type nics: list
439
  @param nics: list of tuples (ip, bridge, mac) representing
440
      the NICs the instance  has
441
  @rtype: dict
442
  @return: the hook environment for this instance
443

444
  """
445
  env = {
446
    "OP_TARGET": name,
447
    "INSTANCE_NAME": name,
448
    "INSTANCE_PRIMARY": primary_node,
449
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450
    "INSTANCE_OS_TYPE": os_type,
451
    "INSTANCE_STATUS": status,
452
    "INSTANCE_MEMORY": memory,
453
    "INSTANCE_VCPUS": vcpus,
454
  }
455

    
456
  if nics:
457
    nic_count = len(nics)
458
    for idx, (ip, bridge, mac) in enumerate(nics):
459
      if ip is None:
460
        ip = ""
461
      env["INSTANCE_NIC%d_IP" % idx] = ip
462
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
464
  else:
465
    nic_count = 0
466

    
467
  env["INSTANCE_NIC_COUNT"] = nic_count
468

    
469
  return env
470

    
471

    
472
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473
  """Builds instance related env variables for hooks from an object.
474

475
  @type lu: L{LogicalUnit}
476
  @param lu: the logical unit on whose behalf we execute
477
  @type instance: L{objects.Instance}
478
  @param instance: the instance for which we should build the
479
      environment
480
  @type override: dict
481
  @param override: dictionary with key/values that will override
482
      our values
483
  @rtype: dict
484
  @return: the hook environment dictionary
485

486
  """
487
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
488
  args = {
489
    'name': instance.name,
490
    'primary_node': instance.primary_node,
491
    'secondary_nodes': instance.secondary_nodes,
492
    'os_type': instance.os,
493
    'status': instance.os,
494
    'memory': bep[constants.BE_MEMORY],
495
    'vcpus': bep[constants.BE_VCPUS],
496
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
497
  }
498
  if override:
499
    args.update(override)
500
  return _BuildInstanceHookEnv(**args)
501

    
502

    
503
def _CheckInstanceBridgesExist(lu, instance):
504
  """Check that the brigdes needed by an instance exist.
505

506
  """
507
  # check bridges existance
508
  brlist = [nic.bridge for nic in instance.nics]
509
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
510
  result.Raise()
511
  if not result.data:
512
    raise errors.OpPrereqError("One or more target bridges %s does not"
513
                               " exist on destination node '%s'" %
514
                               (brlist, instance.primary_node))
515

    
516

    
517
class LUDestroyCluster(NoHooksLU):
518
  """Logical unit for destroying the cluster.
519

520
  """
521
  _OP_REQP = []
522

    
523
  def CheckPrereq(self):
524
    """Check prerequisites.
525

526
    This checks whether the cluster is empty.
527

528
    Any errors are signalled by raising errors.OpPrereqError.
529

530
    """
531
    master = self.cfg.GetMasterNode()
532

    
533
    nodelist = self.cfg.GetNodeList()
534
    if len(nodelist) != 1 or nodelist[0] != master:
535
      raise errors.OpPrereqError("There are still %d node(s) in"
536
                                 " this cluster." % (len(nodelist) - 1))
537
    instancelist = self.cfg.GetInstanceList()
538
    if instancelist:
539
      raise errors.OpPrereqError("There are still %d instance(s) in"
540
                                 " this cluster." % len(instancelist))
541

    
542
  def Exec(self, feedback_fn):
543
    """Destroys the cluster.
544

545
    """
546
    master = self.cfg.GetMasterNode()
547
    result = self.rpc.call_node_stop_master(master, False)
548
    result.Raise()
549
    if not result.data:
550
      raise errors.OpExecError("Could not disable the master role")
551
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552
    utils.CreateBackup(priv_key)
553
    utils.CreateBackup(pub_key)
554
    return master
555

    
556

    
557
class LUVerifyCluster(LogicalUnit):
558
  """Verifies the cluster status.
559

560
  """
561
  HPATH = "cluster-verify"
562
  HTYPE = constants.HTYPE_CLUSTER
563
  _OP_REQP = ["skip_checks"]
564
  REQ_BGL = False
565

    
566
  def ExpandNames(self):
567
    self.needed_locks = {
568
      locking.LEVEL_NODE: locking.ALL_SET,
569
      locking.LEVEL_INSTANCE: locking.ALL_SET,
570
    }
571
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
572

    
573
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574
                  node_result, feedback_fn, master_files):
575
    """Run multiple tests against a node.
576

577
    Test list:
578

579
      - compares ganeti version
580
      - checks vg existance and size > 20G
581
      - checks config file checksum
582
      - checks ssh to other nodes
583

584
    @type nodeinfo: L{objects.Node}
585
    @param nodeinfo: the node to check
586
    @param file_list: required list of files
587
    @param local_cksum: dictionary of local files and their checksums
588
    @param node_result: the results from the node
589
    @param feedback_fn: function used to accumulate results
590
    @param master_files: list of files that only masters should have
591

592
    """
593
    node = nodeinfo.name
594

    
595
    # main result, node_result should be a non-empty dict
596
    if not node_result or not isinstance(node_result, dict):
597
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
598
      return True
599

    
600
    # compares ganeti version
601
    local_version = constants.PROTOCOL_VERSION
602
    remote_version = node_result.get('version', None)
603
    if not remote_version:
604
      feedback_fn("  - ERROR: connection to %s failed" % (node))
605
      return True
606

    
607
    if local_version != remote_version:
608
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
609
                      (local_version, node, remote_version))
610
      return True
611

    
612
    # checks vg existance and size > 20G
613

    
614
    bad = False
615
    vglist = node_result.get(constants.NV_VGLIST, None)
616
    if not vglist:
617
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
618
                      (node,))
619
      bad = True
620
    else:
621
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622
                                            constants.MIN_VG_SIZE)
623
      if vgstatus:
624
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
625
        bad = True
626

    
627
    # checks config file checksum
628

    
629
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
630
    if not isinstance(remote_cksum, dict):
631
      bad = True
632
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
633
    else:
634
      for file_name in file_list:
635
        node_is_mc = nodeinfo.master_candidate
636
        must_have_file = file_name not in master_files
637
        if file_name not in remote_cksum:
638
          if node_is_mc or must_have_file:
639
            bad = True
640
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
641
        elif remote_cksum[file_name] != local_cksum[file_name]:
642
          if node_is_mc or must_have_file:
643
            bad = True
644
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
645
          else:
646
            # not candidate and this is not a must-have file
647
            bad = True
648
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
649
                        " '%s'" % file_name)
650
        else:
651
          # all good, except non-master/non-must have combination
652
          if not node_is_mc and not must_have_file:
653
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
654
                        " candidates" % file_name)
655

    
656
    # checks ssh to any
657

    
658
    if constants.NV_NODELIST not in node_result:
659
      bad = True
660
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
661
    else:
662
      if node_result[constants.NV_NODELIST]:
663
        bad = True
664
        for node in node_result[constants.NV_NODELIST]:
665
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
666
                          (node, node_result[constants.NV_NODELIST][node]))
667

    
668
    if constants.NV_NODENETTEST not in node_result:
669
      bad = True
670
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
671
    else:
672
      if node_result[constants.NV_NODENETTEST]:
673
        bad = True
674
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
675
        for node in nlist:
676
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
677
                          (node, node_result[constants.NV_NODENETTEST][node]))
678

    
679
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680
    if isinstance(hyp_result, dict):
681
      for hv_name, hv_result in hyp_result.iteritems():
682
        if hv_result is not None:
683
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
684
                      (hv_name, hv_result))
685
    return bad
686

    
687
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688
                      node_instance, feedback_fn):
689
    """Verify an instance.
690

691
    This function checks to see if the required block devices are
692
    available on the instance's node.
693

694
    """
695
    bad = False
696

    
697
    node_current = instanceconfig.primary_node
698

    
699
    node_vol_should = {}
700
    instanceconfig.MapLVsByNode(node_vol_should)
701

    
702
    for node in node_vol_should:
703
      for volume in node_vol_should[node]:
704
        if node not in node_vol_is or volume not in node_vol_is[node]:
705
          feedback_fn("  - ERROR: volume %s missing on node %s" %
706
                          (volume, node))
707
          bad = True
708

    
709
    if not instanceconfig.status == 'down':
710
      if (node_current not in node_instance or
711
          not instance in node_instance[node_current]):
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758
    """Verify N+1 Memory Resilience.
759

760
    Check that if one single node dies we can still start all the instances it
761
    was primary for.
762

763
    """
764
    bad = False
765

    
766
    for node, nodeinfo in node_info.iteritems():
767
      # This code checks that every node which is now listed as secondary has
768
      # enough memory to host all instances it is supposed to should a single
769
      # other node in the cluster fail.
770
      # FIXME: not ready for failover to an arbitrary node
771
      # FIXME: does not support file-backed instances
772
      # WARNING: we currently take into account down instances as well as up
773
      # ones, considering that even if they're down someone might want to start
774
      # them even in the event of a node failure.
775
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776
        needed_mem = 0
777
        for instance in instances:
778
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779
          if bep[constants.BE_AUTO_BALANCE]:
780
            needed_mem += bep[constants.BE_MEMORY]
781
        if nodeinfo['mfree'] < needed_mem:
782
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
783
                      " failovers should node %s fail" % (node, prinode))
784
          bad = True
785
    return bad
786

    
787
  def CheckPrereq(self):
788
    """Check prerequisites.
789

790
    Transform the list of checks we're going to skip into a set and check that
791
    all its members are valid.
792

793
    """
794
    self.skip_set = frozenset(self.op.skip_checks)
795
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
797

    
798
  def BuildHooksEnv(self):
799
    """Build hooks env.
800

801
    Cluster-Verify hooks just rone in the post phase and their failure makes
802
    the output be logged in the verify output and the verification to fail.
803

804
    """
805
    all_nodes = self.cfg.GetNodeList()
806
    # TODO: populate the environment with useful information for verify hooks
807
    env = {}
808
    return env, [], all_nodes
809

    
810
  def Exec(self, feedback_fn):
811
    """Verify integrity of cluster, performing various test on nodes.
812

813
    """
814
    bad = False
815
    feedback_fn("* Verifying global settings")
816
    for msg in self.cfg.VerifyConfig():
817
      feedback_fn("  - ERROR: %s" % msg)
818

    
819
    vg_name = self.cfg.GetVGName()
820
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
822
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824
    i_non_redundant = [] # Non redundant instances
825
    i_non_a_balanced = [] # Non auto-balanced instances
826
    node_volume = {}
827
    node_instance = {}
828
    node_info = {}
829
    instance_cfg = {}
830

    
831
    # FIXME: verify OS list
832
    # do local checksums
833
    master_files = [constants.CLUSTER_CONF_FILE]
834

    
835
    file_names = ssconf.SimpleStore().GetFileList()
836
    file_names.append(constants.SSL_CERT_FILE)
837
    file_names.extend(master_files)
838

    
839
    local_checksums = utils.FingerprintFiles(file_names)
840

    
841
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842
    node_verify_param = {
843
      constants.NV_FILELIST: file_names,
844
      constants.NV_NODELIST: nodelist,
845
      constants.NV_HYPERVISOR: hypervisors,
846
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847
                                  node.secondary_ip) for node in nodeinfo],
848
      constants.NV_LVLIST: vg_name,
849
      constants.NV_INSTANCELIST: hypervisors,
850
      constants.NV_VGLIST: None,
851
      constants.NV_VERSION: None,
852
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
853
      }
854
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855
                                           self.cfg.GetClusterName())
856

    
857
    cluster = self.cfg.GetClusterInfo()
858
    master_node = self.cfg.GetMasterNode()
859
    for node_i in nodeinfo:
860
      node = node_i.name
861
      nresult = all_nvinfo[node].data
862

    
863
      if node == master_node:
864
        ntype = "master"
865
      elif node_i.master_candidate:
866
        ntype = "master candidate"
867
      else:
868
        ntype = "regular"
869
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
870

    
871
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
872
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
873
        bad = True
874
        continue
875

    
876
      result = self._VerifyNode(node_i, file_names, local_checksums,
877
                                nresult, feedback_fn, master_files)
878
      bad = bad or result
879

    
880
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881
      if isinstance(lvdata, basestring):
882
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
883
                    (node, lvdata.encode('string_escape')))
884
        bad = True
885
        node_volume[node] = {}
886
      elif not isinstance(lvdata, dict):
887
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
888
        bad = True
889
        continue
890
      else:
891
        node_volume[node] = lvdata
892

    
893
      # node_instance
894
      idata = nresult.get(constants.NV_INSTANCELIST, None)
895
      if not isinstance(idata, list):
896
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
897
                    (node,))
898
        bad = True
899
        continue
900

    
901
      node_instance[node] = idata
902

    
903
      # node_info
904
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
905
      if not isinstance(nodeinfo, dict):
906
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
907
        bad = True
908
        continue
909

    
910
      try:
911
        node_info[node] = {
912
          "mfree": int(nodeinfo['memory_free']),
913
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
914
          "pinst": [],
915
          "sinst": [],
916
          # dictionary holding all instances this node is secondary for,
917
          # grouped by their primary node. Each key is a cluster node, and each
918
          # value is a list of instances which have the key as primary and the
919
          # current node as secondary.  this is handy to calculate N+1 memory
920
          # availability if you can only failover from a primary to its
921
          # secondary.
922
          "sinst-by-pnode": {},
923
        }
924
      except ValueError:
925
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926
        bad = True
927
        continue
928

    
929
    node_vol_should = {}
930

    
931
    for instance in instancelist:
932
      feedback_fn("* Verifying instance %s" % instance)
933
      inst_config = self.cfg.GetInstanceInfo(instance)
934
      result =  self._VerifyInstance(instance, inst_config, node_volume,
935
                                     node_instance, feedback_fn)
936
      bad = bad or result
937

    
938
      inst_config.MapLVsByNode(node_vol_should)
939

    
940
      instance_cfg[instance] = inst_config
941

    
942
      pnode = inst_config.primary_node
943
      if pnode in node_info:
944
        node_info[pnode]['pinst'].append(instance)
945
      else:
946
        feedback_fn("  - ERROR: instance %s, connection to primary node"
947
                    " %s failed" % (instance, pnode))
948
        bad = True
949

    
950
      # If the instance is non-redundant we cannot survive losing its primary
951
      # node, so we are not N+1 compliant. On the other hand we have no disk
952
      # templates with more than one secondary so that situation is not well
953
      # supported either.
954
      # FIXME: does not support file-backed instances
955
      if len(inst_config.secondary_nodes) == 0:
956
        i_non_redundant.append(instance)
957
      elif len(inst_config.secondary_nodes) > 1:
958
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
959
                    % instance)
960

    
961
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962
        i_non_a_balanced.append(instance)
963

    
964
      for snode in inst_config.secondary_nodes:
965
        if snode in node_info:
966
          node_info[snode]['sinst'].append(instance)
967
          if pnode not in node_info[snode]['sinst-by-pnode']:
968
            node_info[snode]['sinst-by-pnode'][pnode] = []
969
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
970
        else:
971
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
972
                      " %s failed" % (instance, snode))
973

    
974
    feedback_fn("* Verifying orphan volumes")
975
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976
                                       feedback_fn)
977
    bad = bad or result
978

    
979
    feedback_fn("* Verifying remaining instances")
980
    result = self._VerifyOrphanInstances(instancelist, node_instance,
981
                                         feedback_fn)
982
    bad = bad or result
983

    
984
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985
      feedback_fn("* Verifying N+1 Memory redundancy")
986
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
987
      bad = bad or result
988

    
989
    feedback_fn("* Other Notes")
990
    if i_non_redundant:
991
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
992
                  % len(i_non_redundant))
993

    
994
    if i_non_a_balanced:
995
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
996
                  % len(i_non_a_balanced))
997

    
998
    return not bad
999

    
1000
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001
    """Analize the post-hooks' result
1002

1003
    This method analyses the hook result, handles it, and sends some
1004
    nicely-formatted feedback back to the user.
1005

1006
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008
    @param hooks_results: the results of the multi-node hooks rpc call
1009
    @param feedback_fn: function used send feedback back to the caller
1010
    @param lu_result: previous Exec result
1011
    @return: the new Exec result, based on the previous result
1012
        and hook results
1013

1014
    """
1015
    # We only really run POST phase hooks, and are only interested in
1016
    # their results
1017
    if phase == constants.HOOKS_PHASE_POST:
1018
      # Used to change hooks' output to proper indentation
1019
      indent_re = re.compile('^', re.M)
1020
      feedback_fn("* Hooks Results")
1021
      if not hooks_results:
1022
        feedback_fn("  - ERROR: general communication failure")
1023
        lu_result = 1
1024
      else:
1025
        for node_name in hooks_results:
1026
          show_node_header = True
1027
          res = hooks_results[node_name]
1028
          if res.failed or res.data is False or not isinstance(res.data, list):
1029
            feedback_fn("    Communication failure in hooks execution")
1030
            lu_result = 1
1031
            continue
1032
          for script, hkr, output in res.data:
1033
            if hkr == constants.HKR_FAIL:
1034
              # The node header is only shown once, if there are
1035
              # failing hooks on that node
1036
              if show_node_header:
1037
                feedback_fn("  Node %s:" % node_name)
1038
                show_node_header = False
1039
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1040
              output = indent_re.sub('      ', output)
1041
              feedback_fn("%s" % output)
1042
              lu_result = 1
1043

    
1044
      return lu_result
1045

    
1046

    
1047
class LUVerifyDisks(NoHooksLU):
1048
  """Verifies the cluster disks status.
1049

1050
  """
1051
  _OP_REQP = []
1052
  REQ_BGL = False
1053

    
1054
  def ExpandNames(self):
1055
    self.needed_locks = {
1056
      locking.LEVEL_NODE: locking.ALL_SET,
1057
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1058
    }
1059
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1060

    
1061
  def CheckPrereq(self):
1062
    """Check prerequisites.
1063

1064
    This has no prerequisites.
1065

1066
    """
1067
    pass
1068

    
1069
  def Exec(self, feedback_fn):
1070
    """Verify integrity of cluster disks.
1071

1072
    """
1073
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1074

    
1075
    vg_name = self.cfg.GetVGName()
1076
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1077
    instances = [self.cfg.GetInstanceInfo(name)
1078
                 for name in self.cfg.GetInstanceList()]
1079

    
1080
    nv_dict = {}
1081
    for inst in instances:
1082
      inst_lvs = {}
1083
      if (inst.status != "up" or
1084
          inst.disk_template not in constants.DTS_NET_MIRROR):
1085
        continue
1086
      inst.MapLVsByNode(inst_lvs)
1087
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088
      for node, vol_list in inst_lvs.iteritems():
1089
        for vol in vol_list:
1090
          nv_dict[(node, vol)] = inst
1091

    
1092
    if not nv_dict:
1093
      return result
1094

    
1095
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1096

    
1097
    to_act = set()
1098
    for node in nodes:
1099
      # node_volume
1100
      lvs = node_lvs[node]
1101
      if lvs.failed:
1102
        self.LogWarning("Connection to node %s failed: %s" %
1103
                        (node, lvs.data))
1104
        continue
1105
      lvs = lvs.data
1106
      if isinstance(lvs, basestring):
1107
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108
        res_nlvm[node] = lvs
1109
      elif not isinstance(lvs, dict):
1110
        logging.warning("Connection to node %s failed or invalid data"
1111
                        " returned", node)
1112
        res_nodes.append(node)
1113
        continue
1114

    
1115
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116
        inst = nv_dict.pop((node, lv_name), None)
1117
        if (not lv_online and inst is not None
1118
            and inst.name not in res_instances):
1119
          res_instances.append(inst.name)
1120

    
1121
    # any leftover items in nv_dict are missing LVs, let's arrange the
1122
    # data better
1123
    for key, inst in nv_dict.iteritems():
1124
      if inst.name not in res_missing:
1125
        res_missing[inst.name] = []
1126
      res_missing[inst.name].append(key)
1127

    
1128
    return result
1129

    
1130

    
1131
class LURenameCluster(LogicalUnit):
1132
  """Rename the cluster.
1133

1134
  """
1135
  HPATH = "cluster-rename"
1136
  HTYPE = constants.HTYPE_CLUSTER
1137
  _OP_REQP = ["name"]
1138

    
1139
  def BuildHooksEnv(self):
1140
    """Build hooks env.
1141

1142
    """
1143
    env = {
1144
      "OP_TARGET": self.cfg.GetClusterName(),
1145
      "NEW_NAME": self.op.name,
1146
      }
1147
    mn = self.cfg.GetMasterNode()
1148
    return env, [mn], [mn]
1149

    
1150
  def CheckPrereq(self):
1151
    """Verify that the passed name is a valid one.
1152

1153
    """
1154
    hostname = utils.HostInfo(self.op.name)
1155

    
1156
    new_name = hostname.name
1157
    self.ip = new_ip = hostname.ip
1158
    old_name = self.cfg.GetClusterName()
1159
    old_ip = self.cfg.GetMasterIP()
1160
    if new_name == old_name and new_ip == old_ip:
1161
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162
                                 " cluster has changed")
1163
    if new_ip != old_ip:
1164
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166
                                   " reachable on the network. Aborting." %
1167
                                   new_ip)
1168

    
1169
    self.op.name = new_name
1170

    
1171
  def Exec(self, feedback_fn):
1172
    """Rename the cluster.
1173

1174
    """
1175
    clustername = self.op.name
1176
    ip = self.ip
1177

    
1178
    # shutdown the master IP
1179
    master = self.cfg.GetMasterNode()
1180
    result = self.rpc.call_node_stop_master(master, False)
1181
    if result.failed or not result.data:
1182
      raise errors.OpExecError("Could not disable the master role")
1183

    
1184
    try:
1185
      cluster = self.cfg.GetClusterInfo()
1186
      cluster.cluster_name = clustername
1187
      cluster.master_ip = ip
1188
      self.cfg.Update(cluster)
1189
    finally:
1190
      result = self.rpc.call_node_start_master(master, False)
1191
      if result.failed or not result.data:
1192
        self.LogWarning("Could not re-enable the master role on"
1193
                        " the master, please restart manually.")
1194

    
1195

    
1196
def _RecursiveCheckIfLVMBased(disk):
1197
  """Check if the given disk or its children are lvm-based.
1198

1199
  @type disk: L{objects.Disk}
1200
  @param disk: the disk to check
1201
  @rtype: booleean
1202
  @return: boolean indicating whether a LD_LV dev_type was found or not
1203

1204
  """
1205
  if disk.children:
1206
    for chdisk in disk.children:
1207
      if _RecursiveCheckIfLVMBased(chdisk):
1208
        return True
1209
  return disk.dev_type == constants.LD_LV
1210

    
1211

    
1212
class LUSetClusterParams(LogicalUnit):
1213
  """Change the parameters of the cluster.
1214

1215
  """
1216
  HPATH = "cluster-modify"
1217
  HTYPE = constants.HTYPE_CLUSTER
1218
  _OP_REQP = []
1219
  REQ_BGL = False
1220

    
1221
  def CheckParameters(self):
1222
    """Check parameters
1223

1224
    """
1225
    if not hasattr(self.op, "candidate_pool_size"):
1226
      self.op.candidate_pool_size = None
1227
    if self.op.candidate_pool_size is not None:
1228
      try:
1229
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1230
      except ValueError, err:
1231
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1232
                                   str(err))
1233
      if self.op.candidate_pool_size < 1:
1234
        raise errors.OpPrereqError("At least one master candidate needed")
1235

    
1236
  def ExpandNames(self):
1237
    # FIXME: in the future maybe other cluster params won't require checking on
1238
    # all nodes to be modified.
1239
    self.needed_locks = {
1240
      locking.LEVEL_NODE: locking.ALL_SET,
1241
    }
1242
    self.share_locks[locking.LEVEL_NODE] = 1
1243

    
1244
  def BuildHooksEnv(self):
1245
    """Build hooks env.
1246

1247
    """
1248
    env = {
1249
      "OP_TARGET": self.cfg.GetClusterName(),
1250
      "NEW_VG_NAME": self.op.vg_name,
1251
      }
1252
    mn = self.cfg.GetMasterNode()
1253
    return env, [mn], [mn]
1254

    
1255
  def CheckPrereq(self):
1256
    """Check prerequisites.
1257

1258
    This checks whether the given params don't conflict and
1259
    if the given volume group is valid.
1260

1261
    """
1262
    # FIXME: This only works because there is only one parameter that can be
1263
    # changed or removed.
1264
    if self.op.vg_name is not None and not self.op.vg_name:
1265
      instances = self.cfg.GetAllInstancesInfo().values()
1266
      for inst in instances:
1267
        for disk in inst.disks:
1268
          if _RecursiveCheckIfLVMBased(disk):
1269
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1270
                                       " lvm-based instances exist")
1271

    
1272
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1273

    
1274
    # if vg_name not None, checks given volume group on all nodes
1275
    if self.op.vg_name:
1276
      vglist = self.rpc.call_vg_list(node_list)
1277
      for node in node_list:
1278
        if vglist[node].failed:
1279
          # ignoring down node
1280
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1281
          continue
1282
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1283
                                              self.op.vg_name,
1284
                                              constants.MIN_VG_SIZE)
1285
        if vgstatus:
1286
          raise errors.OpPrereqError("Error on node '%s': %s" %
1287
                                     (node, vgstatus))
1288

    
1289
    self.cluster = cluster = self.cfg.GetClusterInfo()
1290
    # validate beparams changes
1291
    if self.op.beparams:
1292
      utils.CheckBEParams(self.op.beparams)
1293
      self.new_beparams = cluster.FillDict(
1294
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1295

    
1296
    # hypervisor list/parameters
1297
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1298
    if self.op.hvparams:
1299
      if not isinstance(self.op.hvparams, dict):
1300
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1301
      for hv_name, hv_dict in self.op.hvparams.items():
1302
        if hv_name not in self.new_hvparams:
1303
          self.new_hvparams[hv_name] = hv_dict
1304
        else:
1305
          self.new_hvparams[hv_name].update(hv_dict)
1306

    
1307
    if self.op.enabled_hypervisors is not None:
1308
      self.hv_list = self.op.enabled_hypervisors
1309
    else:
1310
      self.hv_list = cluster.enabled_hypervisors
1311

    
1312
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1313
      # either the enabled list has changed, or the parameters have, validate
1314
      for hv_name, hv_params in self.new_hvparams.items():
1315
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1316
            (self.op.enabled_hypervisors and
1317
             hv_name in self.op.enabled_hypervisors)):
1318
          # either this is a new hypervisor, or its parameters have changed
1319
          hv_class = hypervisor.GetHypervisor(hv_name)
1320
          hv_class.CheckParameterSyntax(hv_params)
1321
          _CheckHVParams(self, node_list, hv_name, hv_params)
1322

    
1323
  def Exec(self, feedback_fn):
1324
    """Change the parameters of the cluster.
1325

1326
    """
1327
    if self.op.vg_name is not None:
1328
      if self.op.vg_name != self.cfg.GetVGName():
1329
        self.cfg.SetVGName(self.op.vg_name)
1330
      else:
1331
        feedback_fn("Cluster LVM configuration already in desired"
1332
                    " state, not changing")
1333
    if self.op.hvparams:
1334
      self.cluster.hvparams = self.new_hvparams
1335
    if self.op.enabled_hypervisors is not None:
1336
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1337
    if self.op.beparams:
1338
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1339
    if self.op.candidate_pool_size is not None:
1340
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1341

    
1342
    self.cfg.Update(self.cluster)
1343

    
1344
    # we want to update nodes after the cluster so that if any errors
1345
    # happen, we have recorded and saved the cluster info
1346
    if self.op.candidate_pool_size is not None:
1347
      node_info = self.cfg.GetAllNodesInfo().values()
1348
      num_candidates = len([node for node in node_info
1349
                            if node.master_candidate])
1350
      num_nodes = len(node_info)
1351
      if num_candidates < self.op.candidate_pool_size:
1352
        random.shuffle(node_info)
1353
        for node in node_info:
1354
          if num_candidates >= self.op.candidate_pool_size:
1355
            break
1356
          if node.master_candidate:
1357
            continue
1358
          node.master_candidate = True
1359
          self.LogInfo("Promoting node %s to master candidate", node.name)
1360
          self.cfg.Update(node)
1361
          self.context.ReaddNode(node)
1362
          num_candidates += 1
1363
      elif num_candidates > self.op.candidate_pool_size:
1364
        self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1365
                     " of candidate_pool_size (%d)" %
1366
                     (num_candidates, self.op.candidate_pool_size))
1367

    
1368

    
1369
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1370
  """Sleep and poll for an instance's disk to sync.
1371

1372
  """
1373
  if not instance.disks:
1374
    return True
1375

    
1376
  if not oneshot:
1377
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1378

    
1379
  node = instance.primary_node
1380

    
1381
  for dev in instance.disks:
1382
    lu.cfg.SetDiskID(dev, node)
1383

    
1384
  retries = 0
1385
  while True:
1386
    max_time = 0
1387
    done = True
1388
    cumul_degraded = False
1389
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1390
    if rstats.failed or not rstats.data:
1391
      lu.LogWarning("Can't get any data from node %s", node)
1392
      retries += 1
1393
      if retries >= 10:
1394
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1395
                                 " aborting." % node)
1396
      time.sleep(6)
1397
      continue
1398
    rstats = rstats.data
1399
    retries = 0
1400
    for i in range(len(rstats)):
1401
      mstat = rstats[i]
1402
      if mstat is None:
1403
        lu.LogWarning("Can't compute data for node %s/%s",
1404
                           node, instance.disks[i].iv_name)
1405
        continue
1406
      # we ignore the ldisk parameter
1407
      perc_done, est_time, is_degraded, _ = mstat
1408
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1409
      if perc_done is not None:
1410
        done = False
1411
        if est_time is not None:
1412
          rem_time = "%d estimated seconds remaining" % est_time
1413
          max_time = est_time
1414
        else:
1415
          rem_time = "no time estimate"
1416
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1417
                        (instance.disks[i].iv_name, perc_done, rem_time))
1418
    if done or oneshot:
1419
      break
1420

    
1421
    time.sleep(min(60, max_time))
1422

    
1423
  if done:
1424
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1425
  return not cumul_degraded
1426

    
1427

    
1428
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1429
  """Check that mirrors are not degraded.
1430

1431
  The ldisk parameter, if True, will change the test from the
1432
  is_degraded attribute (which represents overall non-ok status for
1433
  the device(s)) to the ldisk (representing the local storage status).
1434

1435
  """
1436
  lu.cfg.SetDiskID(dev, node)
1437
  if ldisk:
1438
    idx = 6
1439
  else:
1440
    idx = 5
1441

    
1442
  result = True
1443
  if on_primary or dev.AssembleOnSecondary():
1444
    rstats = lu.rpc.call_blockdev_find(node, dev)
1445
    if rstats.failed or not rstats.data:
1446
      logging.warning("Node %s: disk degraded, not found or node down", node)
1447
      result = False
1448
    else:
1449
      result = result and (not rstats.data[idx])
1450
  if dev.children:
1451
    for child in dev.children:
1452
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1453

    
1454
  return result
1455

    
1456

    
1457
class LUDiagnoseOS(NoHooksLU):
1458
  """Logical unit for OS diagnose/query.
1459

1460
  """
1461
  _OP_REQP = ["output_fields", "names"]
1462
  REQ_BGL = False
1463
  _FIELDS_STATIC = utils.FieldSet()
1464
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1465

    
1466
  def ExpandNames(self):
1467
    if self.op.names:
1468
      raise errors.OpPrereqError("Selective OS query not supported")
1469

    
1470
    _CheckOutputFields(static=self._FIELDS_STATIC,
1471
                       dynamic=self._FIELDS_DYNAMIC,
1472
                       selected=self.op.output_fields)
1473

    
1474
    # Lock all nodes, in shared mode
1475
    self.needed_locks = {}
1476
    self.share_locks[locking.LEVEL_NODE] = 1
1477
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1478

    
1479
  def CheckPrereq(self):
1480
    """Check prerequisites.
1481

1482
    """
1483

    
1484
  @staticmethod
1485
  def _DiagnoseByOS(node_list, rlist):
1486
    """Remaps a per-node return list into an a per-os per-node dictionary
1487

1488
    @param node_list: a list with the names of all nodes
1489
    @param rlist: a map with node names as keys and OS objects as values
1490

1491
    @rtype: dict
1492
    @returns: a dictionary with osnames as keys and as value another map, with
1493
        nodes as keys and list of OS objects as values, eg::
1494

1495
          {"debian-etch": {"node1": [<object>,...],
1496
                           "node2": [<object>,]}
1497
          }
1498

1499
    """
1500
    all_os = {}
1501
    for node_name, nr in rlist.iteritems():
1502
      if nr.failed or not nr.data:
1503
        continue
1504
      for os_obj in nr.data:
1505
        if os_obj.name not in all_os:
1506
          # build a list of nodes for this os containing empty lists
1507
          # for each node in node_list
1508
          all_os[os_obj.name] = {}
1509
          for nname in node_list:
1510
            all_os[os_obj.name][nname] = []
1511
        all_os[os_obj.name][node_name].append(os_obj)
1512
    return all_os
1513

    
1514
  def Exec(self, feedback_fn):
1515
    """Compute the list of OSes.
1516

1517
    """
1518
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1519
    node_data = self.rpc.call_os_diagnose(node_list)
1520
    if node_data == False:
1521
      raise errors.OpExecError("Can't gather the list of OSes")
1522
    pol = self._DiagnoseByOS(node_list, node_data)
1523
    output = []
1524
    for os_name, os_data in pol.iteritems():
1525
      row = []
1526
      for field in self.op.output_fields:
1527
        if field == "name":
1528
          val = os_name
1529
        elif field == "valid":
1530
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1531
        elif field == "node_status":
1532
          val = {}
1533
          for node_name, nos_list in os_data.iteritems():
1534
            val[node_name] = [(v.status, v.path) for v in nos_list]
1535
        else:
1536
          raise errors.ParameterError(field)
1537
        row.append(val)
1538
      output.append(row)
1539

    
1540
    return output
1541

    
1542

    
1543
class LURemoveNode(LogicalUnit):
1544
  """Logical unit for removing a node.
1545

1546
  """
1547
  HPATH = "node-remove"
1548
  HTYPE = constants.HTYPE_NODE
1549
  _OP_REQP = ["node_name"]
1550

    
1551
  def BuildHooksEnv(self):
1552
    """Build hooks env.
1553

1554
    This doesn't run on the target node in the pre phase as a failed
1555
    node would then be impossible to remove.
1556

1557
    """
1558
    env = {
1559
      "OP_TARGET": self.op.node_name,
1560
      "NODE_NAME": self.op.node_name,
1561
      }
1562
    all_nodes = self.cfg.GetNodeList()
1563
    all_nodes.remove(self.op.node_name)
1564
    return env, all_nodes, all_nodes
1565

    
1566
  def CheckPrereq(self):
1567
    """Check prerequisites.
1568

1569
    This checks:
1570
     - the node exists in the configuration
1571
     - it does not have primary or secondary instances
1572
     - it's not the master
1573

1574
    Any errors are signalled by raising errors.OpPrereqError.
1575

1576
    """
1577
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1578
    if node is None:
1579
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1580

    
1581
    instance_list = self.cfg.GetInstanceList()
1582

    
1583
    masternode = self.cfg.GetMasterNode()
1584
    if node.name == masternode:
1585
      raise errors.OpPrereqError("Node is the master node,"
1586
                                 " you need to failover first.")
1587

    
1588
    for instance_name in instance_list:
1589
      instance = self.cfg.GetInstanceInfo(instance_name)
1590
      if node.name == instance.primary_node:
1591
        raise errors.OpPrereqError("Instance %s still running on the node,"
1592
                                   " please remove first." % instance_name)
1593
      if node.name in instance.secondary_nodes:
1594
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1595
                                   " please remove first." % instance_name)
1596
    self.op.node_name = node.name
1597
    self.node = node
1598

    
1599
  def Exec(self, feedback_fn):
1600
    """Removes the node from the cluster.
1601

1602
    """
1603
    node = self.node
1604
    logging.info("Stopping the node daemon and removing configs from node %s",
1605
                 node.name)
1606

    
1607
    self.context.RemoveNode(node.name)
1608

    
1609
    self.rpc.call_node_leave_cluster(node.name)
1610

    
1611

    
1612
class LUQueryNodes(NoHooksLU):
1613
  """Logical unit for querying nodes.
1614

1615
  """
1616
  _OP_REQP = ["output_fields", "names"]
1617
  REQ_BGL = False
1618
  _FIELDS_DYNAMIC = utils.FieldSet(
1619
    "dtotal", "dfree",
1620
    "mtotal", "mnode", "mfree",
1621
    "bootid",
1622
    "ctotal",
1623
    )
1624

    
1625
  _FIELDS_STATIC = utils.FieldSet(
1626
    "name", "pinst_cnt", "sinst_cnt",
1627
    "pinst_list", "sinst_list",
1628
    "pip", "sip", "tags",
1629
    "serial_no",
1630
    "master_candidate",
1631
    "master",
1632
    )
1633

    
1634
  def ExpandNames(self):
1635
    _CheckOutputFields(static=self._FIELDS_STATIC,
1636
                       dynamic=self._FIELDS_DYNAMIC,
1637
                       selected=self.op.output_fields)
1638

    
1639
    self.needed_locks = {}
1640
    self.share_locks[locking.LEVEL_NODE] = 1
1641

    
1642
    if self.op.names:
1643
      self.wanted = _GetWantedNodes(self, self.op.names)
1644
    else:
1645
      self.wanted = locking.ALL_SET
1646

    
1647
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1648
    if self.do_locking:
1649
      # if we don't request only static fields, we need to lock the nodes
1650
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1651

    
1652

    
1653
  def CheckPrereq(self):
1654
    """Check prerequisites.
1655

1656
    """
1657
    # The validation of the node list is done in the _GetWantedNodes,
1658
    # if non empty, and if empty, there's no validation to do
1659
    pass
1660

    
1661
  def Exec(self, feedback_fn):
1662
    """Computes the list of nodes and their attributes.
1663

1664
    """
1665
    all_info = self.cfg.GetAllNodesInfo()
1666
    if self.do_locking:
1667
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1668
    elif self.wanted != locking.ALL_SET:
1669
      nodenames = self.wanted
1670
      missing = set(nodenames).difference(all_info.keys())
1671
      if missing:
1672
        raise errors.OpExecError(
1673
          "Some nodes were removed before retrieving their data: %s" % missing)
1674
    else:
1675
      nodenames = all_info.keys()
1676

    
1677
    nodenames = utils.NiceSort(nodenames)
1678
    nodelist = [all_info[name] for name in nodenames]
1679

    
1680
    # begin data gathering
1681

    
1682
    if self.do_locking:
1683
      live_data = {}
1684
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1685
                                          self.cfg.GetHypervisorType())
1686
      for name in nodenames:
1687
        nodeinfo = node_data[name]
1688
        if not nodeinfo.failed and nodeinfo.data:
1689
          nodeinfo = nodeinfo.data
1690
          fn = utils.TryConvert
1691
          live_data[name] = {
1692
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1693
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1694
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1695
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1696
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1697
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1698
            "bootid": nodeinfo.get('bootid', None),
1699
            }
1700
        else:
1701
          live_data[name] = {}
1702
    else:
1703
      live_data = dict.fromkeys(nodenames, {})
1704

    
1705
    node_to_primary = dict([(name, set()) for name in nodenames])
1706
    node_to_secondary = dict([(name, set()) for name in nodenames])
1707

    
1708
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1709
                             "sinst_cnt", "sinst_list"))
1710
    if inst_fields & frozenset(self.op.output_fields):
1711
      instancelist = self.cfg.GetInstanceList()
1712

    
1713
      for instance_name in instancelist:
1714
        inst = self.cfg.GetInstanceInfo(instance_name)
1715
        if inst.primary_node in node_to_primary:
1716
          node_to_primary[inst.primary_node].add(inst.name)
1717
        for secnode in inst.secondary_nodes:
1718
          if secnode in node_to_secondary:
1719
            node_to_secondary[secnode].add(inst.name)
1720

    
1721
    master_node = self.cfg.GetMasterNode()
1722

    
1723
    # end data gathering
1724

    
1725
    output = []
1726
    for node in nodelist:
1727
      node_output = []
1728
      for field in self.op.output_fields:
1729
        if field == "name":
1730
          val = node.name
1731
        elif field == "pinst_list":
1732
          val = list(node_to_primary[node.name])
1733
        elif field == "sinst_list":
1734
          val = list(node_to_secondary[node.name])
1735
        elif field == "pinst_cnt":
1736
          val = len(node_to_primary[node.name])
1737
        elif field == "sinst_cnt":
1738
          val = len(node_to_secondary[node.name])
1739
        elif field == "pip":
1740
          val = node.primary_ip
1741
        elif field == "sip":
1742
          val = node.secondary_ip
1743
        elif field == "tags":
1744
          val = list(node.GetTags())
1745
        elif field == "serial_no":
1746
          val = node.serial_no
1747
        elif field == "master_candidate":
1748
          val = node.master_candidate
1749
        elif field == "master":
1750
          val = node.name == master_node
1751
        elif self._FIELDS_DYNAMIC.Matches(field):
1752
          val = live_data[node.name].get(field, None)
1753
        else:
1754
          raise errors.ParameterError(field)
1755
        node_output.append(val)
1756
      output.append(node_output)
1757

    
1758
    return output
1759

    
1760

    
1761
class LUQueryNodeVolumes(NoHooksLU):
1762
  """Logical unit for getting volumes on node(s).
1763

1764
  """
1765
  _OP_REQP = ["nodes", "output_fields"]
1766
  REQ_BGL = False
1767
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1768
  _FIELDS_STATIC = utils.FieldSet("node")
1769

    
1770
  def ExpandNames(self):
1771
    _CheckOutputFields(static=self._FIELDS_STATIC,
1772
                       dynamic=self._FIELDS_DYNAMIC,
1773
                       selected=self.op.output_fields)
1774

    
1775
    self.needed_locks = {}
1776
    self.share_locks[locking.LEVEL_NODE] = 1
1777
    if not self.op.nodes:
1778
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1779
    else:
1780
      self.needed_locks[locking.LEVEL_NODE] = \
1781
        _GetWantedNodes(self, self.op.nodes)
1782

    
1783
  def CheckPrereq(self):
1784
    """Check prerequisites.
1785

1786
    This checks that the fields required are valid output fields.
1787

1788
    """
1789
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1790

    
1791
  def Exec(self, feedback_fn):
1792
    """Computes the list of nodes and their attributes.
1793

1794
    """
1795
    nodenames = self.nodes
1796
    volumes = self.rpc.call_node_volumes(nodenames)
1797

    
1798
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1799
             in self.cfg.GetInstanceList()]
1800

    
1801
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1802

    
1803
    output = []
1804
    for node in nodenames:
1805
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1806
        continue
1807

    
1808
      node_vols = volumes[node].data[:]
1809
      node_vols.sort(key=lambda vol: vol['dev'])
1810

    
1811
      for vol in node_vols:
1812
        node_output = []
1813
        for field in self.op.output_fields:
1814
          if field == "node":
1815
            val = node
1816
          elif field == "phys":
1817
            val = vol['dev']
1818
          elif field == "vg":
1819
            val = vol['vg']
1820
          elif field == "name":
1821
            val = vol['name']
1822
          elif field == "size":
1823
            val = int(float(vol['size']))
1824
          elif field == "instance":
1825
            for inst in ilist:
1826
              if node not in lv_by_node[inst]:
1827
                continue
1828
              if vol['name'] in lv_by_node[inst][node]:
1829
                val = inst.name
1830
                break
1831
            else:
1832
              val = '-'
1833
          else:
1834
            raise errors.ParameterError(field)
1835
          node_output.append(str(val))
1836

    
1837
        output.append(node_output)
1838

    
1839
    return output
1840

    
1841

    
1842
class LUAddNode(LogicalUnit):
1843
  """Logical unit for adding node to the cluster.
1844

1845
  """
1846
  HPATH = "node-add"
1847
  HTYPE = constants.HTYPE_NODE
1848
  _OP_REQP = ["node_name"]
1849

    
1850
  def BuildHooksEnv(self):
1851
    """Build hooks env.
1852

1853
    This will run on all nodes before, and on all nodes + the new node after.
1854

1855
    """
1856
    env = {
1857
      "OP_TARGET": self.op.node_name,
1858
      "NODE_NAME": self.op.node_name,
1859
      "NODE_PIP": self.op.primary_ip,
1860
      "NODE_SIP": self.op.secondary_ip,
1861
      }
1862
    nodes_0 = self.cfg.GetNodeList()
1863
    nodes_1 = nodes_0 + [self.op.node_name, ]
1864
    return env, nodes_0, nodes_1
1865

    
1866
  def CheckPrereq(self):
1867
    """Check prerequisites.
1868

1869
    This checks:
1870
     - the new node is not already in the config
1871
     - it is resolvable
1872
     - its parameters (single/dual homed) matches the cluster
1873

1874
    Any errors are signalled by raising errors.OpPrereqError.
1875

1876
    """
1877
    node_name = self.op.node_name
1878
    cfg = self.cfg
1879

    
1880
    dns_data = utils.HostInfo(node_name)
1881

    
1882
    node = dns_data.name
1883
    primary_ip = self.op.primary_ip = dns_data.ip
1884
    secondary_ip = getattr(self.op, "secondary_ip", None)
1885
    if secondary_ip is None:
1886
      secondary_ip = primary_ip
1887
    if not utils.IsValidIP(secondary_ip):
1888
      raise errors.OpPrereqError("Invalid secondary IP given")
1889
    self.op.secondary_ip = secondary_ip
1890

    
1891
    node_list = cfg.GetNodeList()
1892
    if not self.op.readd and node in node_list:
1893
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1894
                                 node)
1895
    elif self.op.readd and node not in node_list:
1896
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1897

    
1898
    for existing_node_name in node_list:
1899
      existing_node = cfg.GetNodeInfo(existing_node_name)
1900

    
1901
      if self.op.readd and node == existing_node_name:
1902
        if (existing_node.primary_ip != primary_ip or
1903
            existing_node.secondary_ip != secondary_ip):
1904
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1905
                                     " address configuration as before")
1906
        continue
1907

    
1908
      if (existing_node.primary_ip == primary_ip or
1909
          existing_node.secondary_ip == primary_ip or
1910
          existing_node.primary_ip == secondary_ip or
1911
          existing_node.secondary_ip == secondary_ip):
1912
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1913
                                   " existing node %s" % existing_node.name)
1914

    
1915
    # check that the type of the node (single versus dual homed) is the
1916
    # same as for the master
1917
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1918
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1919
    newbie_singlehomed = secondary_ip == primary_ip
1920
    if master_singlehomed != newbie_singlehomed:
1921
      if master_singlehomed:
1922
        raise errors.OpPrereqError("The master has no private ip but the"
1923
                                   " new node has one")
1924
      else:
1925
        raise errors.OpPrereqError("The master has a private ip but the"
1926
                                   " new node doesn't have one")
1927

    
1928
    # checks reachablity
1929
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1930
      raise errors.OpPrereqError("Node not reachable by ping")
1931

    
1932
    if not newbie_singlehomed:
1933
      # check reachability from my secondary ip to newbie's secondary ip
1934
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1935
                           source=myself.secondary_ip):
1936
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1937
                                   " based ping to noded port")
1938

    
1939
    self.new_node = objects.Node(name=node,
1940
                                 primary_ip=primary_ip,
1941
                                 secondary_ip=secondary_ip)
1942

    
1943
  def Exec(self, feedback_fn):
1944
    """Adds the new node to the cluster.
1945

1946
    """
1947
    new_node = self.new_node
1948
    node = new_node.name
1949

    
1950
    # check connectivity
1951
    result = self.rpc.call_version([node])[node]
1952
    result.Raise()
1953
    if result.data:
1954
      if constants.PROTOCOL_VERSION == result.data:
1955
        logging.info("Communication to node %s fine, sw version %s match",
1956
                     node, result.data)
1957
      else:
1958
        raise errors.OpExecError("Version mismatch master version %s,"
1959
                                 " node version %s" %
1960
                                 (constants.PROTOCOL_VERSION, result.data))
1961
    else:
1962
      raise errors.OpExecError("Cannot get version from the new node")
1963

    
1964
    # setup ssh on node
1965
    logging.info("Copy ssh key to node %s", node)
1966
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1967
    keyarray = []
1968
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1969
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1970
                priv_key, pub_key]
1971

    
1972
    for i in keyfiles:
1973
      f = open(i, 'r')
1974
      try:
1975
        keyarray.append(f.read())
1976
      finally:
1977
        f.close()
1978

    
1979
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1980
                                    keyarray[2],
1981
                                    keyarray[3], keyarray[4], keyarray[5])
1982

    
1983
    if result.failed or not result.data:
1984
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1985

    
1986
    # Add node to our /etc/hosts, and add key to known_hosts
1987
    utils.AddHostToEtcHosts(new_node.name)
1988

    
1989
    if new_node.secondary_ip != new_node.primary_ip:
1990
      result = self.rpc.call_node_has_ip_address(new_node.name,
1991
                                                 new_node.secondary_ip)
1992
      if result.failed or not result.data:
1993
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1994
                                 " you gave (%s). Please fix and re-run this"
1995
                                 " command." % new_node.secondary_ip)
1996

    
1997
    node_verify_list = [self.cfg.GetMasterNode()]
1998
    node_verify_param = {
1999
      'nodelist': [node],
2000
      # TODO: do a node-net-test as well?
2001
    }
2002

    
2003
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2004
                                       self.cfg.GetClusterName())
2005
    for verifier in node_verify_list:
2006
      if result.failed or not result[verifier].data:
2007
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2008
                                 " for remote verification" % verifier)
2009
      if result[verifier].data['nodelist']:
2010
        for failed in result[verifier].data['nodelist']:
2011
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2012
                      (verifier, result[verifier]['nodelist'][failed]))
2013
        raise errors.OpExecError("ssh/hostname verification failed.")
2014

    
2015
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2016
    # including the node just added
2017
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2018
    dist_nodes = self.cfg.GetNodeList()
2019
    if not self.op.readd:
2020
      dist_nodes.append(node)
2021
    if myself.name in dist_nodes:
2022
      dist_nodes.remove(myself.name)
2023

    
2024
    logging.debug("Copying hosts and known_hosts to all nodes")
2025
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2026
      result = self.rpc.call_upload_file(dist_nodes, fname)
2027
      for to_node in dist_nodes:
2028
        if result[to_node].failed or not result[to_node]:
2029
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2030

    
2031
    to_copy = []
2032
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2033
      to_copy.append(constants.VNC_PASSWORD_FILE)
2034
    for fname in to_copy:
2035
      result = self.rpc.call_upload_file([node], fname)
2036
      if result[node].failed or not result[node]:
2037
        logging.error("Could not copy file %s to node %s", fname, node)
2038

    
2039
    if self.op.readd:
2040
      self.context.ReaddNode(new_node)
2041
    else:
2042
      self.context.AddNode(new_node)
2043

    
2044

    
2045
class LUSetNodeParams(LogicalUnit):
2046
  """Modifies the parameters of a node.
2047

2048
  """
2049
  HPATH = "node-modify"
2050
  HTYPE = constants.HTYPE_NODE
2051
  _OP_REQP = ["node_name"]
2052
  REQ_BGL = False
2053

    
2054
  def CheckArguments(self):
2055
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2056
    if node_name is None:
2057
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2058
    self.op.node_name = node_name
2059
    if not hasattr(self.op, 'master_candidate'):
2060
      raise errors.OpPrereqError("Please pass at least one modification")
2061
    self.op.master_candidate = bool(self.op.master_candidate)
2062

    
2063
  def ExpandNames(self):
2064
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2065

    
2066
  def BuildHooksEnv(self):
2067
    """Build hooks env.
2068

2069
    This runs on the master node.
2070

2071
    """
2072
    env = {
2073
      "OP_TARGET": self.op.node_name,
2074
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2075
      }
2076
    nl = [self.cfg.GetMasterNode(),
2077
          self.op.node_name]
2078
    return env, nl, nl
2079

    
2080
  def CheckPrereq(self):
2081
    """Check prerequisites.
2082

2083
    This only checks the instance list against the existing names.
2084

2085
    """
2086
    force = self.force = self.op.force
2087

    
2088
    if self.op.master_candidate == False:
2089
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2090
      node_info = self.cfg.GetAllNodesInfo().values()
2091
      num_candidates = len([node for node in node_info
2092
                            if node.master_candidate])
2093
      if num_candidates <= cp_size:
2094
        msg = ("Not enough master candidates (desired"
2095
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2096
        if force:
2097
          self.LogWarning(msg)
2098
        else:
2099
          raise errors.OpPrereqError(msg)
2100

    
2101
    return
2102

    
2103
  def Exec(self, feedback_fn):
2104
    """Modifies a node.
2105

2106
    """
2107
    node = self.cfg.GetNodeInfo(self.op.node_name)
2108

    
2109
    result = []
2110

    
2111
    if self.op.master_candidate is not None:
2112
      node.master_candidate = self.op.master_candidate
2113
      result.append(("master_candidate", str(self.op.master_candidate)))
2114

    
2115
    # this will trigger configuration file update, if needed
2116
    self.cfg.Update(node)
2117
    # this will trigger job queue propagation or cleanup
2118
    self.context.ReaddNode(node)
2119

    
2120
    return result
2121

    
2122

    
2123
class LUQueryClusterInfo(NoHooksLU):
2124
  """Query cluster configuration.
2125

2126
  """
2127
  _OP_REQP = []
2128
  REQ_BGL = False
2129

    
2130
  def ExpandNames(self):
2131
    self.needed_locks = {}
2132

    
2133
  def CheckPrereq(self):
2134
    """No prerequsites needed for this LU.
2135

2136
    """
2137
    pass
2138

    
2139
  def Exec(self, feedback_fn):
2140
    """Return cluster config.
2141

2142
    """
2143
    cluster = self.cfg.GetClusterInfo()
2144
    result = {
2145
      "software_version": constants.RELEASE_VERSION,
2146
      "protocol_version": constants.PROTOCOL_VERSION,
2147
      "config_version": constants.CONFIG_VERSION,
2148
      "os_api_version": constants.OS_API_VERSION,
2149
      "export_version": constants.EXPORT_VERSION,
2150
      "architecture": (platform.architecture()[0], platform.machine()),
2151
      "name": cluster.cluster_name,
2152
      "master": cluster.master_node,
2153
      "default_hypervisor": cluster.default_hypervisor,
2154
      "enabled_hypervisors": cluster.enabled_hypervisors,
2155
      "hvparams": cluster.hvparams,
2156
      "beparams": cluster.beparams,
2157
      "candidate_pool_size": cluster.candidate_pool_size,
2158
      }
2159

    
2160
    return result
2161

    
2162

    
2163
class LUQueryConfigValues(NoHooksLU):
2164
  """Return configuration values.
2165

2166
  """
2167
  _OP_REQP = []
2168
  REQ_BGL = False
2169
  _FIELDS_DYNAMIC = utils.FieldSet()
2170
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2171

    
2172
  def ExpandNames(self):
2173
    self.needed_locks = {}
2174

    
2175
    _CheckOutputFields(static=self._FIELDS_STATIC,
2176
                       dynamic=self._FIELDS_DYNAMIC,
2177
                       selected=self.op.output_fields)
2178

    
2179
  def CheckPrereq(self):
2180
    """No prerequisites.
2181

2182
    """
2183
    pass
2184

    
2185
  def Exec(self, feedback_fn):
2186
    """Dump a representation of the cluster config to the standard output.
2187

2188
    """
2189
    values = []
2190
    for field in self.op.output_fields:
2191
      if field == "cluster_name":
2192
        entry = self.cfg.GetClusterName()
2193
      elif field == "master_node":
2194
        entry = self.cfg.GetMasterNode()
2195
      elif field == "drain_flag":
2196
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2197
      else:
2198
        raise errors.ParameterError(field)
2199
      values.append(entry)
2200
    return values
2201

    
2202

    
2203
class LUActivateInstanceDisks(NoHooksLU):
2204
  """Bring up an instance's disks.
2205

2206
  """
2207
  _OP_REQP = ["instance_name"]
2208
  REQ_BGL = False
2209

    
2210
  def ExpandNames(self):
2211
    self._ExpandAndLockInstance()
2212
    self.needed_locks[locking.LEVEL_NODE] = []
2213
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2214

    
2215
  def DeclareLocks(self, level):
2216
    if level == locking.LEVEL_NODE:
2217
      self._LockInstancesNodes()
2218

    
2219
  def CheckPrereq(self):
2220
    """Check prerequisites.
2221

2222
    This checks that the instance is in the cluster.
2223

2224
    """
2225
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2226
    assert self.instance is not None, \
2227
      "Cannot retrieve locked instance %s" % self.op.instance_name
2228

    
2229
  def Exec(self, feedback_fn):
2230
    """Activate the disks.
2231

2232
    """
2233
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2234
    if not disks_ok:
2235
      raise errors.OpExecError("Cannot activate block devices")
2236

    
2237
    return disks_info
2238

    
2239

    
2240
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2241
  """Prepare the block devices for an instance.
2242

2243
  This sets up the block devices on all nodes.
2244

2245
  @type lu: L{LogicalUnit}
2246
  @param lu: the logical unit on whose behalf we execute
2247
  @type instance: L{objects.Instance}
2248
  @param instance: the instance for whose disks we assemble
2249
  @type ignore_secondaries: boolean
2250
  @param ignore_secondaries: if true, errors on secondary nodes
2251
      won't result in an error return from the function
2252
  @return: False if the operation failed, otherwise a list of
2253
      (host, instance_visible_name, node_visible_name)
2254
      with the mapping from node devices to instance devices
2255

2256
  """
2257
  device_info = []
2258
  disks_ok = True
2259
  iname = instance.name
2260
  # With the two passes mechanism we try to reduce the window of
2261
  # opportunity for the race condition of switching DRBD to primary
2262
  # before handshaking occured, but we do not eliminate it
2263

    
2264
  # The proper fix would be to wait (with some limits) until the
2265
  # connection has been made and drbd transitions from WFConnection
2266
  # into any other network-connected state (Connected, SyncTarget,
2267
  # SyncSource, etc.)
2268

    
2269
  # 1st pass, assemble on all nodes in secondary mode
2270
  for inst_disk in instance.disks:
2271
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2272
      lu.cfg.SetDiskID(node_disk, node)
2273
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2274
      if result.failed or not result:
2275
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2276
                           " (is_primary=False, pass=1)",
2277
                           inst_disk.iv_name, node)
2278
        if not ignore_secondaries:
2279
          disks_ok = False
2280

    
2281
  # FIXME: race condition on drbd migration to primary
2282

    
2283
  # 2nd pass, do only the primary node
2284
  for inst_disk in instance.disks:
2285
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2286
      if node != instance.primary_node:
2287
        continue
2288
      lu.cfg.SetDiskID(node_disk, node)
2289
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2290
      if result.failed or not result:
2291
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2292
                           " (is_primary=True, pass=2)",
2293
                           inst_disk.iv_name, node)
2294
        disks_ok = False
2295
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2296

    
2297
  # leave the disks configured for the primary node
2298
  # this is a workaround that would be fixed better by
2299
  # improving the logical/physical id handling
2300
  for disk in instance.disks:
2301
    lu.cfg.SetDiskID(disk, instance.primary_node)
2302

    
2303
  return disks_ok, device_info
2304

    
2305

    
2306
def _StartInstanceDisks(lu, instance, force):
2307
  """Start the disks of an instance.
2308

2309
  """
2310
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2311
                                           ignore_secondaries=force)
2312
  if not disks_ok:
2313
    _ShutdownInstanceDisks(lu, instance)
2314
    if force is not None and not force:
2315
      lu.proc.LogWarning("", hint="If the message above refers to a"
2316
                         " secondary node,"
2317
                         " you can retry the operation using '--force'.")
2318
    raise errors.OpExecError("Disk consistency error")
2319

    
2320

    
2321
class LUDeactivateInstanceDisks(NoHooksLU):
2322
  """Shutdown an instance's disks.
2323

2324
  """
2325
  _OP_REQP = ["instance_name"]
2326
  REQ_BGL = False
2327

    
2328
  def ExpandNames(self):
2329
    self._ExpandAndLockInstance()
2330
    self.needed_locks[locking.LEVEL_NODE] = []
2331
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2332

    
2333
  def DeclareLocks(self, level):
2334
    if level == locking.LEVEL_NODE:
2335
      self._LockInstancesNodes()
2336

    
2337
  def CheckPrereq(self):
2338
    """Check prerequisites.
2339

2340
    This checks that the instance is in the cluster.
2341

2342
    """
2343
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2344
    assert self.instance is not None, \
2345
      "Cannot retrieve locked instance %s" % self.op.instance_name
2346

    
2347
  def Exec(self, feedback_fn):
2348
    """Deactivate the disks
2349

2350
    """
2351
    instance = self.instance
2352
    _SafeShutdownInstanceDisks(self, instance)
2353

    
2354

    
2355
def _SafeShutdownInstanceDisks(lu, instance):
2356
  """Shutdown block devices of an instance.
2357

2358
  This function checks if an instance is running, before calling
2359
  _ShutdownInstanceDisks.
2360

2361
  """
2362
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2363
                                      [instance.hypervisor])
2364
  ins_l = ins_l[instance.primary_node]
2365
  if ins_l.failed or not isinstance(ins_l.data, list):
2366
    raise errors.OpExecError("Can't contact node '%s'" %
2367
                             instance.primary_node)
2368

    
2369
  if instance.name in ins_l.data:
2370
    raise errors.OpExecError("Instance is running, can't shutdown"
2371
                             " block devices.")
2372

    
2373
  _ShutdownInstanceDisks(lu, instance)
2374

    
2375

    
2376
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2377
  """Shutdown block devices of an instance.
2378

2379
  This does the shutdown on all nodes of the instance.
2380

2381
  If the ignore_primary is false, errors on the primary node are
2382
  ignored.
2383

2384
  """
2385
  result = True
2386
  for disk in instance.disks:
2387
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2388
      lu.cfg.SetDiskID(top_disk, node)
2389
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2390
      if result.failed or not result.data:
2391
        logging.error("Could not shutdown block device %s on node %s",
2392
                      disk.iv_name, node)
2393
        if not ignore_primary or node != instance.primary_node:
2394
          result = False
2395
  return result
2396

    
2397

    
2398
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2399
  """Checks if a node has enough free memory.
2400

2401
  This function check if a given node has the needed amount of free
2402
  memory. In case the node has less memory or we cannot get the
2403
  information from the node, this function raise an OpPrereqError
2404
  exception.
2405

2406
  @type lu: C{LogicalUnit}
2407
  @param lu: a logical unit from which we get configuration data
2408
  @type node: C{str}
2409
  @param node: the node to check
2410
  @type reason: C{str}
2411
  @param reason: string to use in the error message
2412
  @type requested: C{int}
2413
  @param requested: the amount of memory in MiB to check for
2414
  @type hypervisor: C{str}
2415
  @param hypervisor: the hypervisor to ask for memory stats
2416
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2417
      we cannot check the node
2418

2419
  """
2420
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2421
  nodeinfo[node].Raise()
2422
  free_mem = nodeinfo[node].data.get('memory_free')
2423
  if not isinstance(free_mem, int):
2424
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2425
                             " was '%s'" % (node, free_mem))
2426
  if requested > free_mem:
2427
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2428
                             " needed %s MiB, available %s MiB" %
2429
                             (node, reason, requested, free_mem))
2430

    
2431

    
2432
class LUStartupInstance(LogicalUnit):
2433
  """Starts an instance.
2434

2435
  """
2436
  HPATH = "instance-start"
2437
  HTYPE = constants.HTYPE_INSTANCE
2438
  _OP_REQP = ["instance_name", "force"]
2439
  REQ_BGL = False
2440

    
2441
  def ExpandNames(self):
2442
    self._ExpandAndLockInstance()
2443

    
2444
  def BuildHooksEnv(self):
2445
    """Build hooks env.
2446

2447
    This runs on master, primary and secondary nodes of the instance.
2448

2449
    """
2450
    env = {
2451
      "FORCE": self.op.force,
2452
      }
2453
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2454
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2455
          list(self.instance.secondary_nodes))
2456
    return env, nl, nl
2457

    
2458
  def CheckPrereq(self):
2459
    """Check prerequisites.
2460

2461
    This checks that the instance is in the cluster.
2462

2463
    """
2464
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2465
    assert self.instance is not None, \
2466
      "Cannot retrieve locked instance %s" % self.op.instance_name
2467

    
2468
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2469
    # check bridges existance
2470
    _CheckInstanceBridgesExist(self, instance)
2471

    
2472
    _CheckNodeFreeMemory(self, instance.primary_node,
2473
                         "starting instance %s" % instance.name,
2474
                         bep[constants.BE_MEMORY], instance.hypervisor)
2475

    
2476
  def Exec(self, feedback_fn):
2477
    """Start the instance.
2478

2479
    """
2480
    instance = self.instance
2481
    force = self.op.force
2482
    extra_args = getattr(self.op, "extra_args", "")
2483

    
2484
    self.cfg.MarkInstanceUp(instance.name)
2485

    
2486
    node_current = instance.primary_node
2487

    
2488
    _StartInstanceDisks(self, instance, force)
2489

    
2490
    result = self.rpc.call_instance_start(node_current, instance, extra_args)
2491
    if result.failed or not result.data:
2492
      _ShutdownInstanceDisks(self, instance)
2493
      raise errors.OpExecError("Could not start instance")
2494

    
2495

    
2496
class LURebootInstance(LogicalUnit):
2497
  """Reboot an instance.
2498

2499
  """
2500
  HPATH = "instance-reboot"
2501
  HTYPE = constants.HTYPE_INSTANCE
2502
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2503
  REQ_BGL = False
2504

    
2505
  def ExpandNames(self):
2506
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2507
                                   constants.INSTANCE_REBOOT_HARD,
2508
                                   constants.INSTANCE_REBOOT_FULL]:
2509
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2510
                                  (constants.INSTANCE_REBOOT_SOFT,
2511
                                   constants.INSTANCE_REBOOT_HARD,
2512
                                   constants.INSTANCE_REBOOT_FULL))
2513
    self._ExpandAndLockInstance()
2514

    
2515
  def BuildHooksEnv(self):
2516
    """Build hooks env.
2517

2518
    This runs on master, primary and secondary nodes of the instance.
2519

2520
    """
2521
    env = {
2522
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2523
      }
2524
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2525
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2526
          list(self.instance.secondary_nodes))
2527
    return env, nl, nl
2528

    
2529
  def CheckPrereq(self):
2530
    """Check prerequisites.
2531

2532
    This checks that the instance is in the cluster.
2533

2534
    """
2535
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2536
    assert self.instance is not None, \
2537
      "Cannot retrieve locked instance %s" % self.op.instance_name
2538

    
2539
    # check bridges existance
2540
    _CheckInstanceBridgesExist(self, instance)
2541

    
2542
  def Exec(self, feedback_fn):
2543
    """Reboot the instance.
2544

2545
    """
2546
    instance = self.instance
2547
    ignore_secondaries = self.op.ignore_secondaries
2548
    reboot_type = self.op.reboot_type
2549
    extra_args = getattr(self.op, "extra_args", "")
2550

    
2551
    node_current = instance.primary_node
2552

    
2553
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2554
                       constants.INSTANCE_REBOOT_HARD]:
2555
      result = self.rpc.call_instance_reboot(node_current, instance,
2556
                                             reboot_type, extra_args)
2557
      if result.failed or not result.data:
2558
        raise errors.OpExecError("Could not reboot instance")
2559
    else:
2560
      if not self.rpc.call_instance_shutdown(node_current, instance):
2561
        raise errors.OpExecError("could not shutdown instance for full reboot")
2562
      _ShutdownInstanceDisks(self, instance)
2563
      _StartInstanceDisks(self, instance, ignore_secondaries)
2564
      result = self.rpc.call_instance_start(node_current, instance, extra_args)
2565
      if result.failed or not result.data:
2566
        _ShutdownInstanceDisks(self, instance)
2567
        raise errors.OpExecError("Could not start instance for full reboot")
2568

    
2569
    self.cfg.MarkInstanceUp(instance.name)
2570

    
2571

    
2572
class LUShutdownInstance(LogicalUnit):
2573
  """Shutdown an instance.
2574

2575
  """
2576
  HPATH = "instance-stop"
2577
  HTYPE = constants.HTYPE_INSTANCE
2578
  _OP_REQP = ["instance_name"]
2579
  REQ_BGL = False
2580

    
2581
  def ExpandNames(self):
2582
    self._ExpandAndLockInstance()
2583

    
2584
  def BuildHooksEnv(self):
2585
    """Build hooks env.
2586

2587
    This runs on master, primary and secondary nodes of the instance.
2588

2589
    """
2590
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2591
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2592
          list(self.instance.secondary_nodes))
2593
    return env, nl, nl
2594

    
2595
  def CheckPrereq(self):
2596
    """Check prerequisites.
2597

2598
    This checks that the instance is in the cluster.
2599

2600
    """
2601
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2602
    assert self.instance is not None, \
2603
      "Cannot retrieve locked instance %s" % self.op.instance_name
2604

    
2605
  def Exec(self, feedback_fn):
2606
    """Shutdown the instance.
2607

2608
    """
2609
    instance = self.instance
2610
    node_current = instance.primary_node
2611
    self.cfg.MarkInstanceDown(instance.name)
2612
    result = self.rpc.call_instance_shutdown(node_current, instance)
2613
    if result.failed or not result.data:
2614
      self.proc.LogWarning("Could not shutdown instance")
2615

    
2616
    _ShutdownInstanceDisks(self, instance)
2617

    
2618

    
2619
class LUReinstallInstance(LogicalUnit):
2620
  """Reinstall an instance.
2621

2622
  """
2623
  HPATH = "instance-reinstall"
2624
  HTYPE = constants.HTYPE_INSTANCE
2625
  _OP_REQP = ["instance_name"]
2626
  REQ_BGL = False
2627

    
2628
  def ExpandNames(self):
2629
    self._ExpandAndLockInstance()
2630

    
2631
  def BuildHooksEnv(self):
2632
    """Build hooks env.
2633

2634
    This runs on master, primary and secondary nodes of the instance.
2635

2636
    """
2637
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2638
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2639
          list(self.instance.secondary_nodes))
2640
    return env, nl, nl
2641

    
2642
  def CheckPrereq(self):
2643
    """Check prerequisites.
2644

2645
    This checks that the instance is in the cluster and is not running.
2646

2647
    """
2648
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2649
    assert instance is not None, \
2650
      "Cannot retrieve locked instance %s" % self.op.instance_name
2651

    
2652
    if instance.disk_template == constants.DT_DISKLESS:
2653
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2654
                                 self.op.instance_name)
2655
    if instance.status != "down":
2656
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2657
                                 self.op.instance_name)
2658
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2659
                                              instance.name,
2660
                                              instance.hypervisor)
2661
    if remote_info.failed or remote_info.data:
2662
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2663
                                 (self.op.instance_name,
2664
                                  instance.primary_node))
2665

    
2666
    self.op.os_type = getattr(self.op, "os_type", None)
2667
    if self.op.os_type is not None:
2668
      # OS verification
2669
      pnode = self.cfg.GetNodeInfo(
2670
        self.cfg.ExpandNodeName(instance.primary_node))
2671
      if pnode is None:
2672
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2673
                                   self.op.pnode)
2674
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2675
      result.Raise()
2676
      if not isinstance(result.data, objects.OS):
2677
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2678
                                   " primary node"  % self.op.os_type)
2679

    
2680
    self.instance = instance
2681

    
2682
  def Exec(self, feedback_fn):
2683
    """Reinstall the instance.
2684

2685
    """
2686
    inst = self.instance
2687

    
2688
    if self.op.os_type is not None:
2689
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2690
      inst.os = self.op.os_type
2691
      self.cfg.Update(inst)
2692

    
2693
    _StartInstanceDisks(self, inst, None)
2694
    try:
2695
      feedback_fn("Running the instance OS create scripts...")
2696
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2697
      result.Raise()
2698
      if not result.data:
2699
        raise errors.OpExecError("Could not install OS for instance %s"
2700
                                 " on node %s" %
2701
                                 (inst.name, inst.primary_node))
2702
    finally:
2703
      _ShutdownInstanceDisks(self, inst)
2704

    
2705

    
2706
class LURenameInstance(LogicalUnit):
2707
  """Rename an instance.
2708

2709
  """
2710
  HPATH = "instance-rename"
2711
  HTYPE = constants.HTYPE_INSTANCE
2712
  _OP_REQP = ["instance_name", "new_name"]
2713

    
2714
  def BuildHooksEnv(self):
2715
    """Build hooks env.
2716

2717
    This runs on master, primary and secondary nodes of the instance.
2718

2719
    """
2720
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2721
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2722
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2723
          list(self.instance.secondary_nodes))
2724
    return env, nl, nl
2725

    
2726
  def CheckPrereq(self):
2727
    """Check prerequisites.
2728

2729
    This checks that the instance is in the cluster and is not running.
2730

2731
    """
2732
    instance = self.cfg.GetInstanceInfo(
2733
      self.cfg.ExpandInstanceName(self.op.instance_name))
2734
    if instance is None:
2735
      raise errors.OpPrereqError("Instance '%s' not known" %
2736
                                 self.op.instance_name)
2737
    if instance.status != "down":
2738
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2739
                                 self.op.instance_name)
2740
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2741
                                              instance.name,
2742
                                              instance.hypervisor)
2743
    remote_info.Raise()
2744
    if remote_info.data:
2745
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2746
                                 (self.op.instance_name,
2747
                                  instance.primary_node))
2748
    self.instance = instance
2749

    
2750
    # new name verification
2751
    name_info = utils.HostInfo(self.op.new_name)
2752

    
2753
    self.op.new_name = new_name = name_info.name
2754
    instance_list = self.cfg.GetInstanceList()
2755
    if new_name in instance_list:
2756
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2757
                                 new_name)
2758

    
2759
    if not getattr(self.op, "ignore_ip", False):
2760
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2761
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2762
                                   (name_info.ip, new_name))
2763

    
2764

    
2765
  def Exec(self, feedback_fn):
2766
    """Reinstall the instance.
2767

2768
    """
2769
    inst = self.instance
2770
    old_name = inst.name
2771

    
2772
    if inst.disk_template == constants.DT_FILE:
2773
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2774

    
2775
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2776
    # Change the instance lock. This is definitely safe while we hold the BGL
2777
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2778
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2779

    
2780
    # re-read the instance from the configuration after rename
2781
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2782

    
2783
    if inst.disk_template == constants.DT_FILE:
2784
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2785
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2786
                                                     old_file_storage_dir,
2787
                                                     new_file_storage_dir)
2788
      result.Raise()
2789
      if not result.data:
2790
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2791
                                 " directory '%s' to '%s' (but the instance"
2792
                                 " has been renamed in Ganeti)" % (
2793
                                 inst.primary_node, old_file_storage_dir,
2794
                                 new_file_storage_dir))
2795

    
2796
      if not result.data[0]:
2797
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2798
                                 " (but the instance has been renamed in"
2799
                                 " Ganeti)" % (old_file_storage_dir,
2800
                                               new_file_storage_dir))
2801

    
2802
    _StartInstanceDisks(self, inst, None)
2803
    try:
2804
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2805
                                                 old_name)
2806
      if result.failed or not result.data:
2807
        msg = ("Could not run OS rename script for instance %s on node %s"
2808
               " (but the instance has been renamed in Ganeti)" %
2809
               (inst.name, inst.primary_node))
2810
        self.proc.LogWarning(msg)
2811
    finally:
2812
      _ShutdownInstanceDisks(self, inst)
2813

    
2814

    
2815
class LURemoveInstance(LogicalUnit):
2816
  """Remove an instance.
2817

2818
  """
2819
  HPATH = "instance-remove"
2820
  HTYPE = constants.HTYPE_INSTANCE
2821
  _OP_REQP = ["instance_name", "ignore_failures"]
2822
  REQ_BGL = False
2823

    
2824
  def ExpandNames(self):
2825
    self._ExpandAndLockInstance()
2826
    self.needed_locks[locking.LEVEL_NODE] = []
2827
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2828

    
2829
  def DeclareLocks(self, level):
2830
    if level == locking.LEVEL_NODE:
2831
      self._LockInstancesNodes()
2832

    
2833
  def BuildHooksEnv(self):
2834
    """Build hooks env.
2835

2836
    This runs on master, primary and secondary nodes of the instance.
2837

2838
    """
2839
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2840
    nl = [self.cfg.GetMasterNode()]
2841
    return env, nl, nl
2842

    
2843
  def CheckPrereq(self):
2844
    """Check prerequisites.
2845

2846
    This checks that the instance is in the cluster.
2847

2848
    """
2849
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2850
    assert self.instance is not None, \
2851
      "Cannot retrieve locked instance %s" % self.op.instance_name
2852

    
2853
  def Exec(self, feedback_fn):
2854
    """Remove the instance.
2855

2856
    """
2857
    instance = self.instance
2858
    logging.info("Shutting down instance %s on node %s",
2859
                 instance.name, instance.primary_node)
2860

    
2861
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2862
    if result.failed or not result.data:
2863
      if self.op.ignore_failures:
2864
        feedback_fn("Warning: can't shutdown instance")
2865
      else:
2866
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2867
                                 (instance.name, instance.primary_node))
2868

    
2869
    logging.info("Removing block devices for instance %s", instance.name)
2870

    
2871
    if not _RemoveDisks(self, instance):
2872
      if self.op.ignore_failures:
2873
        feedback_fn("Warning: can't remove instance's disks")
2874
      else:
2875
        raise errors.OpExecError("Can't remove instance's disks")
2876

    
2877
    logging.info("Removing instance %s out of cluster config", instance.name)
2878

    
2879
    self.cfg.RemoveInstance(instance.name)
2880
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2881

    
2882

    
2883
class LUQueryInstances(NoHooksLU):
2884
  """Logical unit for querying instances.
2885

2886
  """
2887
  _OP_REQP = ["output_fields", "names"]
2888
  REQ_BGL = False
2889
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2890
                                    "admin_state", "admin_ram",
2891
                                    "disk_template", "ip", "mac", "bridge",
2892
                                    "sda_size", "sdb_size", "vcpus", "tags",
2893
                                    "network_port", "beparams",
2894
                                    "(disk).(size)/([0-9]+)",
2895
                                    "(disk).(sizes)",
2896
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2897
                                    "(nic).(macs|ips|bridges)",
2898
                                    "(disk|nic).(count)",
2899
                                    "serial_no", "hypervisor", "hvparams",] +
2900
                                  ["hv/%s" % name
2901
                                   for name in constants.HVS_PARAMETERS] +
2902
                                  ["be/%s" % name
2903
                                   for name in constants.BES_PARAMETERS])
2904
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2905

    
2906

    
2907
  def ExpandNames(self):
2908
    _CheckOutputFields(static=self._FIELDS_STATIC,
2909
                       dynamic=self._FIELDS_DYNAMIC,
2910
                       selected=self.op.output_fields)
2911

    
2912
    self.needed_locks = {}
2913
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2914
    self.share_locks[locking.LEVEL_NODE] = 1
2915

    
2916
    if self.op.names:
2917
      self.wanted = _GetWantedInstances(self, self.op.names)
2918
    else:
2919
      self.wanted = locking.ALL_SET
2920

    
2921
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2922
    if self.do_locking:
2923
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2924
      self.needed_locks[locking.LEVEL_NODE] = []
2925
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2926

    
2927
  def DeclareLocks(self, level):
2928
    if level == locking.LEVEL_NODE and self.do_locking:
2929
      self._LockInstancesNodes()
2930

    
2931
  def CheckPrereq(self):
2932
    """Check prerequisites.
2933

2934
    """
2935
    pass
2936

    
2937
  def Exec(self, feedback_fn):
2938
    """Computes the list of nodes and their attributes.
2939

2940
    """
2941
    all_info = self.cfg.GetAllInstancesInfo()
2942
    if self.do_locking:
2943
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2944
    elif self.wanted != locking.ALL_SET:
2945
      instance_names = self.wanted
2946
      missing = set(instance_names).difference(all_info.keys())
2947
      if missing:
2948
        raise errors.OpExecError(
2949
          "Some instances were removed before retrieving their data: %s"
2950
          % missing)
2951
    else:
2952
      instance_names = all_info.keys()
2953

    
2954
    instance_names = utils.NiceSort(instance_names)
2955
    instance_list = [all_info[iname] for iname in instance_names]
2956

    
2957
    # begin data gathering
2958

    
2959
    nodes = frozenset([inst.primary_node for inst in instance_list])
2960
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2961

    
2962
    bad_nodes = []
2963
    if self.do_locking:
2964
      live_data = {}
2965
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2966
      for name in nodes:
2967
        result = node_data[name]
2968
        if result.failed:
2969
          bad_nodes.append(name)
2970
        else:
2971
          if result.data:
2972
            live_data.update(result.data)
2973
            # else no instance is alive
2974
    else:
2975
      live_data = dict([(name, {}) for name in instance_names])
2976

    
2977
    # end data gathering
2978

    
2979
    HVPREFIX = "hv/"
2980
    BEPREFIX = "be/"
2981
    output = []
2982
    for instance in instance_list:
2983
      iout = []
2984
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2985
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2986
      for field in self.op.output_fields:
2987
        st_match = self._FIELDS_STATIC.Matches(field)
2988
        if field == "name":
2989
          val = instance.name
2990
        elif field == "os":
2991
          val = instance.os
2992
        elif field == "pnode":
2993
          val = instance.primary_node
2994
        elif field == "snodes":
2995
          val = list(instance.secondary_nodes)
2996
        elif field == "admin_state":
2997
          val = (instance.status != "down")
2998
        elif field == "oper_state":
2999
          if instance.primary_node in bad_nodes:
3000
            val = None
3001
          else:
3002
            val = bool(live_data.get(instance.name))
3003
        elif field == "status":
3004
          if instance.primary_node in bad_nodes:
3005
            val = "ERROR_nodedown"
3006
          else:
3007
            running = bool(live_data.get(instance.name))
3008
            if running:
3009
              if instance.status != "down":
3010
                val = "running"
3011
              else:
3012
                val = "ERROR_up"
3013
            else:
3014
              if instance.status != "down":
3015
                val = "ERROR_down"
3016
              else:
3017
                val = "ADMIN_down"
3018
        elif field == "oper_ram":
3019
          if instance.primary_node in bad_nodes:
3020
            val = None
3021
          elif instance.name in live_data:
3022
            val = live_data[instance.name].get("memory", "?")
3023
          else:
3024
            val = "-"
3025
        elif field == "disk_template":
3026
          val = instance.disk_template
3027
        elif field == "ip":
3028
          val = instance.nics[0].ip
3029
        elif field == "bridge":
3030
          val = instance.nics[0].bridge
3031
        elif field == "mac":
3032
          val = instance.nics[0].mac
3033
        elif field == "sda_size" or field == "sdb_size":
3034
          idx = ord(field[2]) - ord('a')
3035
          try:
3036
            val = instance.FindDisk(idx).size
3037
          except errors.OpPrereqError:
3038
            val = None
3039
        elif field == "tags":
3040
          val = list(instance.GetTags())
3041
        elif field == "serial_no":
3042
          val = instance.serial_no
3043
        elif field == "network_port":
3044
          val = instance.network_port
3045
        elif field == "hypervisor":
3046
          val = instance.hypervisor
3047
        elif field == "hvparams":
3048
          val = i_hv
3049
        elif (field.startswith(HVPREFIX) and
3050
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3051
          val = i_hv.get(field[len(HVPREFIX):], None)
3052
        elif field == "beparams":
3053
          val = i_be
3054
        elif (field.startswith(BEPREFIX) and
3055
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3056
          val = i_be.get(field[len(BEPREFIX):], None)
3057
        elif st_match and st_match.groups():
3058
          # matches a variable list
3059
          st_groups = st_match.groups()
3060
          if st_groups and st_groups[0] == "disk":
3061
            if st_groups[1] == "count":
3062
              val = len(instance.disks)
3063
            elif st_groups[1] == "sizes":
3064
              val = [disk.size for disk in instance.disks]
3065
            elif st_groups[1] == "size":
3066
              try:
3067
                val = instance.FindDisk(st_groups[2]).size
3068
              except errors.OpPrereqError:
3069
                val = None
3070
            else:
3071
              assert False, "Unhandled disk parameter"
3072
          elif st_groups[0] == "nic":
3073
            if st_groups[1] == "count":
3074
              val = len(instance.nics)
3075
            elif st_groups[1] == "macs":
3076
              val = [nic.mac for nic in instance.nics]
3077
            elif st_groups[1] == "ips":
3078
              val = [nic.ip for nic in instance.nics]
3079
            elif st_groups[1] == "bridges":
3080
              val = [nic.bridge for nic in instance.nics]
3081
            else:
3082
              # index-based item
3083
              nic_idx = int(st_groups[2])
3084
              if nic_idx >= len(instance.nics):
3085
                val = None
3086
              else:
3087
                if st_groups[1] == "mac":
3088
                  val = instance.nics[nic_idx].mac
3089
                elif st_groups[1] == "ip":
3090
                  val = instance.nics[nic_idx].ip
3091
                elif st_groups[1] == "bridge":
3092
                  val = instance.nics[nic_idx].bridge
3093
                else:
3094
                  assert False, "Unhandled NIC parameter"
3095
          else:
3096
            assert False, "Unhandled variable parameter"
3097
        else:
3098
          raise errors.ParameterError(field)
3099
        iout.append(val)
3100
      output.append(iout)
3101

    
3102
    return output
3103

    
3104

    
3105
class LUFailoverInstance(LogicalUnit):
3106
  """Failover an instance.
3107

3108
  """
3109
  HPATH = "instance-failover"
3110
  HTYPE = constants.HTYPE_INSTANCE
3111
  _OP_REQP = ["instance_name", "ignore_consistency"]
3112
  REQ_BGL = False
3113

    
3114
  def ExpandNames(self):
3115
    self._ExpandAndLockInstance()
3116
    self.needed_locks[locking.LEVEL_NODE] = []
3117
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3118

    
3119
  def DeclareLocks(self, level):
3120
    if level == locking.LEVEL_NODE:
3121
      self._LockInstancesNodes()
3122

    
3123
  def BuildHooksEnv(self):
3124
    """Build hooks env.
3125

3126
    This runs on master, primary and secondary nodes of the instance.
3127

3128
    """
3129
    env = {
3130
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3131
      }
3132
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3133
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3134
    return env, nl, nl
3135

    
3136
  def CheckPrereq(self):
3137
    """Check prerequisites.
3138

3139
    This checks that the instance is in the cluster.
3140

3141
    """
3142
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3143
    assert self.instance is not None, \
3144
      "Cannot retrieve locked instance %s" % self.op.instance_name
3145

    
3146
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3147
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3148
      raise errors.OpPrereqError("Instance's disk layout is not"
3149
                                 " network mirrored, cannot failover.")
3150

    
3151
    secondary_nodes = instance.secondary_nodes
3152
    if not secondary_nodes:
3153
      raise errors.ProgrammerError("no secondary node but using "
3154
                                   "a mirrored disk template")
3155

    
3156
    target_node = secondary_nodes[0]
3157
    # check memory requirements on the secondary node
3158
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3159
                         instance.name, bep[constants.BE_MEMORY],
3160
                         instance.hypervisor)
3161

    
3162
    # check bridge existance
3163
    brlist = [nic.bridge for nic in instance.nics]
3164
    result = self.rpc.call_bridges_exist(target_node, brlist)
3165
    result.Raise()
3166
    if not result.data:
3167
      raise errors.OpPrereqError("One or more target bridges %s does not"
3168
                                 " exist on destination node '%s'" %
3169
                                 (brlist, target_node))
3170

    
3171
  def Exec(self, feedback_fn):
3172
    """Failover an instance.
3173

3174
    The failover is done by shutting it down on its present node and
3175
    starting it on the secondary.
3176

3177
    """
3178
    instance = self.instance
3179

    
3180
    source_node = instance.primary_node
3181
    target_node = instance.secondary_nodes[0]
3182

    
3183
    feedback_fn("* checking disk consistency between source and target")
3184
    for dev in instance.disks:
3185
      # for drbd, these are drbd over lvm
3186
      if not _CheckDiskConsistency(self, dev, target_node, False):
3187
        if instance.status == "up" and not self.op.ignore_consistency:
3188
          raise errors.OpExecError("Disk %s is degraded on target node,"
3189
                                   " aborting failover." % dev.iv_name)
3190

    
3191
    feedback_fn("* shutting down instance on source node")
3192
    logging.info("Shutting down instance %s on node %s",
3193
                 instance.name, source_node)
3194

    
3195
    result = self.rpc.call_instance_shutdown(source_node, instance)
3196
    if result.failed or not result.data:
3197
      if self.op.ignore_consistency:
3198
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3199
                             " Proceeding"
3200
                             " anyway. Please make sure node %s is down",
3201
                             instance.name, source_node, source_node)
3202
      else:
3203
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3204
                                 (instance.name, source_node))
3205

    
3206
    feedback_fn("* deactivating the instance's disks on source node")
3207
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3208
      raise errors.OpExecError("Can't shut down the instance's disks.")
3209

    
3210
    instance.primary_node = target_node
3211
    # distribute new instance config to the other nodes
3212
    self.cfg.Update(instance)
3213

    
3214
    # Only start the instance if it's marked as up
3215
    if instance.status == "up":
3216
      feedback_fn("* activating the instance's disks on target node")
3217
      logging.info("Starting instance %s on node %s",
3218
                   instance.name, target_node)
3219

    
3220
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3221
                                               ignore_secondaries=True)
3222
      if not disks_ok:
3223
        _ShutdownInstanceDisks(self, instance)
3224
        raise errors.OpExecError("Can't activate the instance's disks")
3225

    
3226
      feedback_fn("* starting the instance on the target node")
3227
      result = self.rpc.call_instance_start(target_node, instance, None)
3228
      if result.failed or not result.data:
3229
        _ShutdownInstanceDisks(self, instance)
3230
        raise errors.OpExecError("Could not start instance %s on node %s." %
3231
                                 (instance.name, target_node))
3232

    
3233

    
3234
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3235
  """Create a tree of block devices on the primary node.
3236

3237
  This always creates all devices.
3238

3239
  """
3240
  if device.children:
3241
    for child in device.children:
3242
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3243
        return False
3244

    
3245
  lu.cfg.SetDiskID(device, node)
3246
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3247
                                       instance.name, True, info)
3248
  if new_id.failed or not new_id.data:
3249
    return False
3250
  if device.physical_id is None:
3251
    device.physical_id = new_id
3252
  return True
3253

    
3254

    
3255
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3256
  """Create a tree of block devices on a secondary node.
3257

3258
  If this device type has to be created on secondaries, create it and
3259
  all its children.
3260

3261
  If not, just recurse to children keeping the same 'force' value.
3262

3263
  """
3264
  if device.CreateOnSecondary():
3265
    force = True
3266
  if device.children:
3267
    for child in device.children:
3268
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3269
                                        child, force, info):
3270
        return False
3271

    
3272
  if not force:
3273
    return True
3274
  lu.cfg.SetDiskID(device, node)
3275
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3276
                                       instance.name, False, info)
3277
  if new_id.failed or not new_id.data:
3278
    return False
3279
  if device.physical_id is None:
3280
    device.physical_id = new_id
3281
  return True
3282

    
3283

    
3284
def _GenerateUniqueNames(lu, exts):
3285
  """Generate a suitable LV name.
3286

3287
  This will generate a logical volume name for the given instance.
3288

3289
  """
3290
  results = []
3291
  for val in exts:
3292
    new_id = lu.cfg.GenerateUniqueID()
3293
    results.append("%s%s" % (new_id, val))
3294
  return results
3295

    
3296

    
3297
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3298
                         p_minor, s_minor):
3299
  """Generate a drbd8 device complete with its children.
3300

3301
  """
3302
  port = lu.cfg.AllocatePort()
3303
  vgname = lu.cfg.GetVGName()
3304
  shared_secret = lu.cfg.GenerateDRBDSecret()
3305
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3306
                          logical_id=(vgname, names[0]))
3307
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3308
                          logical_id=(vgname, names[1]))
3309
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3310
                          logical_id=(primary, secondary, port,
3311
                                      p_minor, s_minor,
3312
                                      shared_secret),
3313
                          children=[dev_data, dev_meta],
3314
                          iv_name=iv_name)
3315
  return drbd_dev
3316

    
3317

    
3318
def _GenerateDiskTemplate(lu, template_name,
3319
                          instance_name, primary_node,
3320
                          secondary_nodes, disk_info,
3321
                          file_storage_dir, file_driver,
3322
                          base_index):
3323
  """Generate the entire disk layout for a given template type.
3324

3325
  """
3326
  #TODO: compute space requirements
3327

    
3328
  vgname = lu.cfg.GetVGName()
3329
  disk_count = len(disk_info)
3330
  disks = []
3331
  if template_name == constants.DT_DISKLESS:
3332
    pass
3333
  elif template_name == constants.DT_PLAIN:
3334
    if len(secondary_nodes) != 0:
3335
      raise errors.ProgrammerError("Wrong template configuration")
3336

    
3337
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3338
                                      for i in range(disk_count)])
3339
    for idx, disk in enumerate(disk_info):
3340
      disk_index = idx + base_index
3341
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3342
                              logical_id=(vgname, names[idx]),
3343
                              iv_name="disk/%d" % disk_index)
3344
      disks.append(disk_dev)
3345
  elif template_name == constants.DT_DRBD8:
3346
    if len(secondary_nodes) != 1:
3347
      raise errors.ProgrammerError("Wrong template configuration")
3348
    remote_node = secondary_nodes[0]
3349
    minors = lu.cfg.AllocateDRBDMinor(
3350
      [primary_node, remote_node] * len(disk_info), instance_name)
3351

    
3352
    names = _GenerateUniqueNames(lu,
3353
                                 [".disk%d_%s" % (i, s)
3354
                                  for i in range(disk_count)
3355
                                  for s in ("data", "meta")
3356
                                  ])
3357
    for idx, disk in enumerate(disk_info):
3358
      disk_index = idx + base_index
3359
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3360
                                      disk["size"], names[idx*2:idx*2+2],
3361
                                      "disk/%d" % disk_index,
3362
                                      minors[idx*2], minors[idx*2+1])
3363
      disks.append(disk_dev)
3364
  elif template_name == constants.DT_FILE:
3365
    if len(secondary_nodes) != 0:
3366
      raise errors.ProgrammerError("Wrong template configuration")
3367

    
3368
    for idx, disk in enumerate(disk_info):
3369
      disk_index = idx + base_index
3370
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3371
                              iv_name="disk/%d" % disk_index,
3372
                              logical_id=(file_driver,
3373
                                          "%s/disk%d" % (file_storage_dir,
3374
                                                         idx)))
3375
      disks.append(disk_dev)
3376
  else:
3377
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3378
  return disks
3379

    
3380

    
3381
def _GetInstanceInfoText(instance):
3382
  """Compute that text that should be added to the disk's metadata.
3383

3384
  """
3385
  return "originstname+%s" % instance.name
3386

    
3387

    
3388
def _CreateDisks(lu, instance):
3389
  """Create all disks for an instance.
3390

3391
  This abstracts away some work from AddInstance.
3392

3393
  @type lu: L{LogicalUnit}
3394
  @param lu: the logical unit on whose behalf we execute
3395
  @type instance: L{objects.Instance}
3396
  @param instance: the instance whose disks we should create
3397
  @rtype: boolean
3398
  @return: the success of the creation
3399

3400
  """
3401
  info = _GetInstanceInfoText(instance)
3402

    
3403
  if instance.disk_template == constants.DT_FILE:
3404
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3405
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3406
                                                 file_storage_dir)
3407

    
3408
    if result.failed or not result.data:
3409
      logging.error("Could not connect to node '%s'", instance.primary_node)
3410
      return False
3411

    
3412
    if not result.data[0]:
3413
      logging.error("Failed to create directory '%s'", file_storage_dir)
3414
      return False
3415

    
3416
  # Note: this needs to be kept in sync with adding of disks in
3417
  # LUSetInstanceParams
3418
  for device in instance.disks:
3419
    logging.info("Creating volume %s for instance %s",
3420
                 device.iv_name, instance.name)
3421
    #HARDCODE
3422
    for secondary_node in instance.secondary_nodes:
3423
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3424
                                        device, False, info):
3425
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3426
                      device.iv_name, device, secondary_node)
3427
        return False
3428
    #HARDCODE
3429
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3430
                                    instance, device, info):
3431
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3432
      return False
3433

    
3434
  return True
3435

    
3436

    
3437
def _RemoveDisks(lu, instance):
3438
  """Remove all disks for an instance.
3439

3440
  This abstracts away some work from `AddInstance()` and
3441
  `RemoveInstance()`. Note that in case some of the devices couldn't
3442
  be removed, the removal will continue with the other ones (compare
3443
  with `_CreateDisks()`).
3444

3445
  @type lu: L{LogicalUnit}
3446
  @param lu: the logical unit on whose behalf we execute
3447
  @type instance: L{objects.Instance}
3448
  @param instance: the instance whose disks we should remove
3449
  @rtype: boolean
3450
  @return: the success of the removal
3451

3452
  """
3453
  logging.info("Removing block devices for instance %s", instance.name)
3454

    
3455
  result = True
3456
  for device in instance.disks:
3457
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3458
      lu.cfg.SetDiskID(disk, node)
3459
      result = lu.rpc.call_blockdev_remove(node, disk)
3460
      if result.failed or not result.data:
3461
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3462
                           " continuing anyway", device.iv_name, node)
3463
        result = False
3464

    
3465
  if instance.disk_template == constants.DT_FILE:
3466
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3467
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3468
                                                 file_storage_dir)
3469
    if result.failed or not result.data:
3470
      logging.error("Could not remove directory '%s'", file_storage_dir)
3471
      result = False
3472

    
3473
  return result
3474

    
3475

    
3476
def _ComputeDiskSize(disk_template, disks):
3477
  """Compute disk size requirements in the volume group
3478

3479
  """
3480
  # Required free disk space as a function of disk and swap space
3481
  req_size_dict = {
3482
    constants.DT_DISKLESS: None,
3483
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3484
    # 128 MB are added for drbd metadata for each disk
3485
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3486
    constants.DT_FILE: None,
3487
  }
3488

    
3489
  if disk_template not in req_size_dict:
3490
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3491
                                 " is unknown" %  disk_template)
3492

    
3493
  return req_size_dict[disk_template]
3494

    
3495

    
3496
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3497
  """Hypervisor parameter validation.
3498

3499
  This function abstract the hypervisor parameter validation to be
3500
  used in both instance create and instance modify.
3501

3502
  @type lu: L{LogicalUnit}
3503
  @param lu: the logical unit for which we check
3504
  @type nodenames: list
3505
  @param nodenames: the list of nodes on which we should check
3506
  @type hvname: string
3507
  @param hvname: the name of the hypervisor we should use
3508
  @type hvparams: dict
3509
  @param hvparams: the parameters which we need to check
3510
  @raise errors.OpPrereqError: if the parameters are not valid
3511

3512
  """
3513
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3514
                                                  hvname,
3515
                                                  hvparams)
3516
  for node in nodenames:
3517
    info = hvinfo[node]
3518
    info.Raise()
3519
    if not info.data or not isinstance(info.data, (tuple, list)):
3520
      raise errors.OpPrereqError("Cannot get current information"
3521
                                 " from node '%s' (%s)" % (node, info.data))
3522
    if not info.data[0]:
3523
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3524
                                 " %s" % info.data[1])
3525

    
3526

    
3527
class LUCreateInstance(LogicalUnit):
3528
  """Create an instance.
3529

3530
  """
3531
  HPATH = "instance-add"
3532
  HTYPE = constants.HTYPE_INSTANCE
3533
  _OP_REQP = ["instance_name", "disks", "disk_template",
3534
              "mode", "start",
3535
              "wait_for_sync", "ip_check", "nics",
3536
              "hvparams", "beparams"]
3537
  REQ_BGL = False
3538

    
3539
  def _ExpandNode(self, node):
3540
    """Expands and checks one node name.
3541

3542
    """
3543
    node_full = self.cfg.ExpandNodeName(node)
3544
    if node_full is None:
3545
      raise errors.OpPrereqError("Unknown node %s" % node)
3546
    return node_full
3547

    
3548
  def ExpandNames(self):
3549
    """ExpandNames for CreateInstance.
3550

3551
    Figure out the right locks for instance creation.
3552

3553
    """
3554
    self.needed_locks = {}
3555

    
3556
    # set optional parameters to none if they don't exist
3557
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3558
      if not hasattr(self.op, attr):
3559
        setattr(self.op, attr, None)
3560

    
3561
    # cheap checks, mostly valid constants given
3562

    
3563
    # verify creation mode
3564
    if self.op.mode not in (constants.INSTANCE_CREATE,
3565
                            constants.INSTANCE_IMPORT):
3566
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3567
                                 self.op.mode)
3568

    
3569
    # disk template and mirror node verification
3570
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3571
      raise errors.OpPrereqError("Invalid disk template name")
3572

    
3573
    if self.op.hypervisor is None:
3574
      self.op.hypervisor = self.cfg.GetHypervisorType()
3575

    
3576
    cluster = self.cfg.GetClusterInfo()
3577
    enabled_hvs = cluster.enabled_hypervisors
3578
    if self.op.hypervisor not in enabled_hvs:
3579
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3580
                                 " cluster (%s)" % (self.op.hypervisor,
3581
                                  ",".join(enabled_hvs)))
3582

    
3583
    # check hypervisor parameter syntax (locally)
3584

    
3585
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3586
                                  self.op.hvparams)
3587
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3588
    hv_type.CheckParameterSyntax(filled_hvp)
3589

    
3590
    # fill and remember the beparams dict
3591
    utils.CheckBEParams(self.op.beparams)
3592
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3593
                                    self.op.beparams)
3594

    
3595
    #### instance parameters check
3596

    
3597
    # instance name verification
3598
    hostname1 = utils.HostInfo(self.op.instance_name)
3599
    self.op.instance_name = instance_name = hostname1.name
3600

    
3601
    # this is just a preventive check, but someone might still add this
3602
    # instance in the meantime, and creation will fail at lock-add time
3603
    if instance_name in self.cfg.GetInstanceList():
3604
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3605
                                 instance_name)
3606

    
3607
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3608

    
3609
    # NIC buildup
3610
    self.nics = []
3611
    for nic in self.op.nics:
3612
      # ip validity checks
3613
      ip = nic.get("ip", None)
3614
      if ip is None or ip.lower() == "none":
3615
        nic_ip = None
3616
      elif ip.lower() == constants.VALUE_AUTO:
3617
        nic_ip = hostname1.ip
3618
      else:
3619
        if not utils.IsValidIP(ip):
3620
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3621
                                     " like a valid IP" % ip)
3622
        nic_ip = ip
3623

    
3624
      # MAC address verification
3625
      mac = nic.get("mac", constants.VALUE_AUTO)
3626
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3627
        if not utils.IsValidMac(mac.lower()):
3628
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3629
                                     mac)
3630
      # bridge verification
3631
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3632
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3633

    
3634
    # disk checks/pre-build
3635
    self.disks = []
3636
    for disk in self.op.disks:
3637
      mode = disk.get("mode", constants.DISK_RDWR)
3638
      if mode not in constants.DISK_ACCESS_SET:
3639
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3640
                                   mode)
3641
      size = disk.get("size", None)
3642
      if size is None:
3643
        raise errors.OpPrereqError("Missing disk size")
3644
      try:
3645
        size = int(size)
3646
      except ValueError:
3647
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3648
      self.disks.append({"size": size, "mode": mode})
3649

    
3650
    # used in CheckPrereq for ip ping check
3651
    self.check_ip = hostname1.ip
3652

    
3653
    # file storage checks
3654
    if (self.op.file_driver and
3655
        not self.op.file_driver in constants.FILE_DRIVER):
3656
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3657
                                 self.op.file_driver)
3658

    
3659
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3660
      raise errors.OpPrereqError("File storage directory path not absolute")
3661

    
3662
    ### Node/iallocator related checks
3663
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3664
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3665
                                 " node must be given")
3666

    
3667
    if self.op.iallocator:
3668
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3669
    else:
3670
      self.op.pnode = self._ExpandNode(self.op.pnode)
3671
      nodelist = [self.op.pnode]
3672
      if self.op.snode is not None:
3673
        self.op.snode = self._ExpandNode(self.op.snode)
3674
        nodelist.append(self.op.snode)
3675
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3676

    
3677
    # in case of import lock the source node too
3678
    if self.op.mode == constants.INSTANCE_IMPORT:
3679
      src_node = getattr(self.op, "src_node", None)
3680
      src_path = getattr(self.op, "src_path", None)
3681

    
3682
      if src_path is None:
3683
        self.op.src_path = src_path = self.op.instance_name
3684

    
3685
      if src_node is None:
3686
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3687
        self.op.src_node = None
3688
        if os.path.isabs(src_path):
3689
          raise errors.OpPrereqError("Importing an instance from an absolute"
3690
                                     " path requires a source node option.")
3691
      else:
3692
        self.op.src_node = src_node = self._ExpandNode(src_node)
3693
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3694
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
3695
        if not os.path.isabs(src_path):
3696
          self.op.src_path = src_path = \
3697
            os.path.join(constants.EXPORT_DIR, src_path)
3698

    
3699
    else: # INSTANCE_CREATE
3700
      if getattr(self.op, "os_type", None) is None:
3701
        raise errors.OpPrereqError("No guest OS specified")
3702

    
3703
  def _RunAllocator(self):
3704
    """Run the allocator based on input opcode.
3705

3706
    """
3707
    nics = [n.ToDict() for n in self.nics]
3708
    ial = IAllocator(self,
3709
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3710
                     name=self.op.instance_name,
3711
                     disk_template=self.op.disk_template,
3712
                     tags=[],
3713
                     os=self.op.os_type,
3714
                     vcpus=self.be_full[constants.BE_VCPUS],
3715
                     mem_size=self.be_full[constants.BE_MEMORY],
3716
                     disks=self.disks,
3717
                     nics=nics,
3718
                     hypervisor=self.op.hypervisor,
3719
                     )
3720

    
3721
    ial.Run(self.op.iallocator)
3722

    
3723
    if not ial.success:
3724
      raise errors.OpPrereqError("Can't compute nodes using"
3725
                                 " iallocator '%s': %s" % (self.op.iallocator,
3726
                                                           ial.info))
3727
    if len(ial.nodes) != ial.required_nodes:
3728
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3729
                                 " of nodes (%s), required %s" %
3730
                                 (self.op.iallocator, len(ial.nodes),
3731
                                  ial.required_nodes))
3732
    self.op.pnode = ial.nodes[0]
3733
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3734
                 self.op.instance_name, self.op.iallocator,
3735
                 ", ".join(ial.nodes))
3736
    if ial.required_nodes == 2:
3737
      self.op.snode = ial.nodes[1]
3738

    
3739
  def BuildHooksEnv(self):
3740
    """Build hooks env.
3741

3742
    This runs on master, primary and secondary nodes of the instance.
3743

3744
    """
3745
    env = {
3746
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3747
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3748
      "INSTANCE_ADD_MODE": self.op.mode,
3749
      }
3750
    if self.op.mode == constants.INSTANCE_IMPORT:
3751
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3752
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3753
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3754

    
3755
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3756
      primary_node=self.op.pnode,
3757
      secondary_nodes=self.secondaries,
3758
      status=self.instance_status,
3759
      os_type=self.op.os_type,
3760
      memory=self.be_full[constants.BE_MEMORY],
3761
      vcpus=self.be_full[constants.BE_VCPUS],
3762
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3763
    ))
3764

    
3765
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3766
          self.secondaries)
3767
    return env, nl, nl
3768

    
3769

    
3770
  def CheckPrereq(self):
3771
    """Check prerequisites.
3772

3773
    """
3774
    if (not self.cfg.GetVGName() and
3775
        self.op.disk_template not in constants.DTS_NOT_LVM):
3776
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3777
                                 " instances")
3778

    
3779

    
3780
    if self.op.mode == constants.INSTANCE_IMPORT:
3781
      src_node = self.op.src_node
3782
      src_path = self.op.src_path
3783

    
3784
      if src_node is None:
3785
        exp_list = self.rpc.call_export_list(
3786
          self.acquired_locks[locking.LEVEL_NODE])
3787
        found = False
3788
        for node in exp_list:
3789
          if not exp_list[node].failed and src_path in exp_list[node].data:
3790
            found = True
3791
            self.op.src_node = src_node = node
3792
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3793
                                                       src_path)
3794
            break
3795
        if not found:
3796
          raise errors.OpPrereqError("No export found for relative path %s" %
3797
                                      src_path)
3798

    
3799
      result = self.rpc.call_export_info(src_node, src_path)
3800
      result.Raise()
3801
      if not result.data:
3802
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3803

    
3804
      export_info = result.data
3805
      if not export_info.has_section(constants.INISECT_EXP):
3806
        raise errors.ProgrammerError("Corrupted export config")
3807

    
3808
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3809
      if (int(ei_version) != constants.EXPORT_VERSION):
3810
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3811
                                   (ei_version, constants.EXPORT_VERSION))
3812

    
3813
      # Check that the new instance doesn't have less disks than the export
3814
      instance_disks = len(self.disks)
3815
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3816
      if instance_disks < export_disks:
3817
        raise errors.OpPrereqError("Not enough disks to import."
3818
                                   " (instance: %d, export: %d)" %
3819
                                   (instance_disks, export_disks))
3820

    
3821
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3822
      disk_images = []
3823
      for idx in range(export_disks):
3824
        option = 'disk%d_dump' % idx
3825
        if export_info.has_option(constants.INISECT_INS, option):
3826
          # FIXME: are the old os-es, disk sizes, etc. useful?
3827
          export_name = export_info.get(constants.INISECT_INS, option)
3828
          image = os.path.join(src_path, export_name)
3829
          disk_images.append(image)
3830
        else:
3831
          disk_images.append(False)
3832

    
3833
      self.src_images = disk_images
3834

    
3835
      old_name = export_info.get(constants.INISECT_INS, 'name')
3836
      # FIXME: int() here could throw a ValueError on broken exports
3837
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3838
      if self.op.instance_name == old_name:
3839
        for idx, nic in enumerate(self.nics):
3840
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3841
            nic_mac_ini = 'nic%d_mac' % idx
3842
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3843

    
3844
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3845
    if self.op.start and not self.op.ip_check:
3846
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3847
                                 " adding an instance in start mode")
3848

    
3849
    if self.op.ip_check:
3850
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3851
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3852
                                   (self.check_ip, self.op.instance_name))
3853

    
3854
    #### allocator run
3855

    
3856
    if self.op.iallocator is not None:
3857
      self._RunAllocator()
3858

    
3859
    #### node related checks
3860

    
3861
    # check primary node
3862
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3863
    assert self.pnode is not None, \
3864
      "Cannot retrieve locked node %s" % self.op.pnode
3865
    self.secondaries = []
3866

    
3867
    # mirror node verification
3868
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3869
      if self.op.snode is None:
3870
        raise errors.OpPrereqError("The networked disk templates need"
3871
                                   " a mirror node")
3872
      if self.op.snode == pnode.name:
3873
        raise errors.OpPrereqError("The secondary node cannot be"
3874
                                   " the primary node.")
3875
      self.secondaries.append(self.op.snode)
3876

    
3877
    nodenames = [pnode.name] + self.secondaries
3878

    
3879
    req_size = _ComputeDiskSize(self.op.disk_template,
3880
                                self.disks)
3881

    
3882
    # Check lv size requirements
3883
    if req_size is not None:
3884
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3885
                                         self.op.hypervisor)
3886
      for node in nodenames:
3887
        info = nodeinfo[node]
3888
        info.Raise()
3889
        info = info.data
3890
        if not info:
3891
          raise errors.OpPrereqError("Cannot get current information"
3892
                                     " from node '%s'" % node)
3893
        vg_free = info.get('vg_free', None)
3894
        if not isinstance(vg_free, int):
3895
          raise errors.OpPrereqError("Can't compute free disk space on"
3896
                                     " node %s" % node)
3897
        if req_size > info['vg_free']:
3898
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3899
                                     " %d MB available, %d MB required" %
3900
                                     (node, info['vg_free'], req_size))
3901

    
3902
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3903

    
3904
    # os verification
3905
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3906
    result.Raise()
3907
    if not isinstance(result.data, objects.OS):
3908
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3909
                                 " primary node"  % self.op.os_type)
3910

    
3911
    # bridge check on primary node
3912
    bridges = [n.bridge for n in self.nics]
3913
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3914
    result.Raise()
3915
    if not result.data:
3916
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
3917
                                 " exist on destination node '%s'" %
3918
                                 (",".join(bridges), pnode.name))
3919

    
3920
    # memory check on primary node
3921
    if self.op.start:
3922
      _CheckNodeFreeMemory(self, self.pnode.name,
3923
                           "creating instance %s" % self.op.instance_name,
3924
                           self.be_full[constants.BE_MEMORY],
3925
                           self.op.hypervisor)
3926

    
3927
    if self.op.start:
3928
      self.instance_status = 'up'
3929
    else:
3930
      self.instance_status = 'down'
3931

    
3932
  def Exec(self, feedback_fn):
3933
    """Create and add the instance to the cluster.
3934

3935
    """
3936
    instance = self.op.instance_name
3937
    pnode_name = self.pnode.name
3938

    
3939
    for nic in self.nics:
3940
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3941
        nic.mac = self.cfg.GenerateMAC()
3942

    
3943
    ht_kind = self.op.hypervisor
3944
    if ht_kind in constants.HTS_REQ_PORT:
3945
      network_port = self.cfg.AllocatePort()
3946
    else:
3947
      network_port = None
3948

    
3949
    ##if self.op.vnc_bind_address is None:
3950
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3951

    
3952
    # this is needed because os.path.join does not accept None arguments
3953
    if self.op.file_storage_dir is None:
3954
      string_file_storage_dir = ""
3955
    else:
3956
      string_file_storage_dir = self.op.file_storage_dir
3957

    
3958
    # build the full file storage dir path
3959
    file_storage_dir = os.path.normpath(os.path.join(
3960
                                        self.cfg.GetFileStorageDir(),
3961
                                        string_file_storage_dir, instance))
3962

    
3963

    
3964
    disks = _GenerateDiskTemplate(self,
3965
                                  self.op.disk_template,
3966
                                  instance, pnode_name,
3967
                                  self.secondaries,
3968
                                  self.disks,
3969
                                  file_storage_dir,
3970
                                  self.op.file_driver,
3971
                                  0)
3972

    
3973
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3974
                            primary_node=pnode_name,
3975
                            nics=self.nics, disks=disks,
3976
                            disk_template=self.op.disk_template,
3977
                            status=self.instance_status,
3978
                            network_port=network_port,
3979
                            beparams=self.op.beparams,
3980
                            hvparams=self.op.hvparams,
3981
                            hypervisor=self.op.hypervisor,
3982
                            )
3983

    
3984
    feedback_fn("* creating instance disks...")
3985
    if not _CreateDisks(self, iobj):
3986
      _RemoveDisks(self, iobj)
3987
      self.cfg.ReleaseDRBDMinors(instance)
3988
      raise errors.OpExecError("Device creation failed, reverting...")
3989

    
3990
    feedback_fn("adding instance %s to cluster config" % instance)
3991

    
3992
    self.cfg.AddInstance(iobj)
3993
    # Declare that we don't want to remove the instance lock anymore, as we've
3994
    # added the instance to the config
3995
    del self.remove_locks[locking.LEVEL_INSTANCE]
3996
    # Remove the temp. assignements for the instance's drbds
3997
    self.cfg.ReleaseDRBDMinors(instance)
3998
    # Unlock all the nodes
3999
    if self.op.mode == constants.INSTANCE_IMPORT:
4000
      nodes_keep = [self.op.src_node]
4001
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4002
                       if node != self.op.src_node]
4003
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4004
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4005
    else:
4006
      self.context.glm.release(locking.LEVEL_NODE)
4007
      del self.acquired_locks[locking.LEVEL_NODE]