Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ f08ce603

History | View | Annotate | Download (213.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = lu.cfg.GetInstanceList()
396
  return utils.NiceSort(wanted)
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419
                          memory, vcpus, nics):
420
  """Builds instance related env variables for hooks
421

422
  This builds the hook environment from individual variables.
423

424
  @type name: string
425
  @param name: the name of the instance
426
  @type primary_node: string
427
  @param primary_node: the name of the instance's primary node
428
  @type secondary_nodes: list
429
  @param secondary_nodes: list of secondary nodes as strings
430
  @type os_type: string
431
  @param os_type: the name of the instance's OS
432
  @type status: string
433
  @param status: the desired status of the instances
434
  @type memory: string
435
  @param memory: the memory size of the instance
436
  @type vcpus: string
437
  @param vcpus: the count of VCPUs the instance has
438
  @type nics: list
439
  @param nics: list of tuples (ip, bridge, mac) representing
440
      the NICs the instance  has
441
  @rtype: dict
442
  @return: the hook environment for this instance
443

444
  """
445
  env = {
446
    "OP_TARGET": name,
447
    "INSTANCE_NAME": name,
448
    "INSTANCE_PRIMARY": primary_node,
449
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450
    "INSTANCE_OS_TYPE": os_type,
451
    "INSTANCE_STATUS": status,
452
    "INSTANCE_MEMORY": memory,
453
    "INSTANCE_VCPUS": vcpus,
454
  }
455

    
456
  if nics:
457
    nic_count = len(nics)
458
    for idx, (ip, bridge, mac) in enumerate(nics):
459
      if ip is None:
460
        ip = ""
461
      env["INSTANCE_NIC%d_IP" % idx] = ip
462
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
464
  else:
465
    nic_count = 0
466

    
467
  env["INSTANCE_NIC_COUNT"] = nic_count
468

    
469
  return env
470

    
471

    
472
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473
  """Builds instance related env variables for hooks from an object.
474

475
  @type lu: L{LogicalUnit}
476
  @param lu: the logical unit on whose behalf we execute
477
  @type instance: L{objects.Instance}
478
  @param instance: the instance for which we should build the
479
      environment
480
  @type override: dict
481
  @param override: dictionary with key/values that will override
482
      our values
483
  @rtype: dict
484
  @return: the hook environment dictionary
485

486
  """
487
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
488
  args = {
489
    'name': instance.name,
490
    'primary_node': instance.primary_node,
491
    'secondary_nodes': instance.secondary_nodes,
492
    'os_type': instance.os,
493
    'status': instance.os,
494
    'memory': bep[constants.BE_MEMORY],
495
    'vcpus': bep[constants.BE_VCPUS],
496
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
497
  }
498
  if override:
499
    args.update(override)
500
  return _BuildInstanceHookEnv(**args)
501

    
502

    
503
def _CheckInstanceBridgesExist(lu, instance):
504
  """Check that the brigdes needed by an instance exist.
505

506
  """
507
  # check bridges existance
508
  brlist = [nic.bridge for nic in instance.nics]
509
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
510
  result.Raise()
511
  if not result.data:
512
    raise errors.OpPrereqError("One or more target bridges %s does not"
513
                               " exist on destination node '%s'" %
514
                               (brlist, instance.primary_node))
515

    
516

    
517
class LUDestroyCluster(NoHooksLU):
518
  """Logical unit for destroying the cluster.
519

520
  """
521
  _OP_REQP = []
522

    
523
  def CheckPrereq(self):
524
    """Check prerequisites.
525

526
    This checks whether the cluster is empty.
527

528
    Any errors are signalled by raising errors.OpPrereqError.
529

530
    """
531
    master = self.cfg.GetMasterNode()
532

    
533
    nodelist = self.cfg.GetNodeList()
534
    if len(nodelist) != 1 or nodelist[0] != master:
535
      raise errors.OpPrereqError("There are still %d node(s) in"
536
                                 " this cluster." % (len(nodelist) - 1))
537
    instancelist = self.cfg.GetInstanceList()
538
    if instancelist:
539
      raise errors.OpPrereqError("There are still %d instance(s) in"
540
                                 " this cluster." % len(instancelist))
541

    
542
  def Exec(self, feedback_fn):
543
    """Destroys the cluster.
544

545
    """
546
    master = self.cfg.GetMasterNode()
547
    result = self.rpc.call_node_stop_master(master, False)
548
    result.Raise()
549
    if not result.data:
550
      raise errors.OpExecError("Could not disable the master role")
551
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552
    utils.CreateBackup(priv_key)
553
    utils.CreateBackup(pub_key)
554
    return master
555

    
556

    
557
class LUVerifyCluster(LogicalUnit):
558
  """Verifies the cluster status.
559

560
  """
561
  HPATH = "cluster-verify"
562
  HTYPE = constants.HTYPE_CLUSTER
563
  _OP_REQP = ["skip_checks"]
564
  REQ_BGL = False
565

    
566
  def ExpandNames(self):
567
    self.needed_locks = {
568
      locking.LEVEL_NODE: locking.ALL_SET,
569
      locking.LEVEL_INSTANCE: locking.ALL_SET,
570
    }
571
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
572

    
573
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574
                  node_result, feedback_fn, master_files):
575
    """Run multiple tests against a node.
576

577
    Test list:
578

579
      - compares ganeti version
580
      - checks vg existance and size > 20G
581
      - checks config file checksum
582
      - checks ssh to other nodes
583

584
    @type nodeinfo: L{objects.Node}
585
    @param nodeinfo: the node to check
586
    @param file_list: required list of files
587
    @param local_cksum: dictionary of local files and their checksums
588
    @param node_result: the results from the node
589
    @param feedback_fn: function used to accumulate results
590
    @param master_files: list of files that only masters should have
591

592
    """
593
    node = nodeinfo.name
594

    
595
    # main result, node_result should be a non-empty dict
596
    if not node_result or not isinstance(node_result, dict):
597
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
598
      return True
599

    
600
    # compares ganeti version
601
    local_version = constants.PROTOCOL_VERSION
602
    remote_version = node_result.get('version', None)
603
    if not remote_version:
604
      feedback_fn("  - ERROR: connection to %s failed" % (node))
605
      return True
606

    
607
    if local_version != remote_version:
608
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
609
                      (local_version, node, remote_version))
610
      return True
611

    
612
    # checks vg existance and size > 20G
613

    
614
    bad = False
615
    vglist = node_result.get(constants.NV_VGLIST, None)
616
    if not vglist:
617
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
618
                      (node,))
619
      bad = True
620
    else:
621
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622
                                            constants.MIN_VG_SIZE)
623
      if vgstatus:
624
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
625
        bad = True
626

    
627
    # checks config file checksum
628

    
629
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
630
    if not isinstance(remote_cksum, dict):
631
      bad = True
632
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
633
    else:
634
      for file_name in file_list:
635
        node_is_mc = nodeinfo.master_candidate
636
        must_have_file = file_name not in master_files
637
        if file_name not in remote_cksum:
638
          if node_is_mc or must_have_file:
639
            bad = True
640
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
641
        elif remote_cksum[file_name] != local_cksum[file_name]:
642
          if node_is_mc or must_have_file:
643
            bad = True
644
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
645
          else:
646
            # not candidate and this is not a must-have file
647
            bad = True
648
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
649
                        " '%s'" % file_name)
650
        else:
651
          # all good, except non-master/non-must have combination
652
          if not node_is_mc and not must_have_file:
653
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
654
                        " candidates" % file_name)
655

    
656
    # checks ssh to any
657

    
658
    if constants.NV_NODELIST not in node_result:
659
      bad = True
660
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
661
    else:
662
      if node_result[constants.NV_NODELIST]:
663
        bad = True
664
        for node in node_result[constants.NV_NODELIST]:
665
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
666
                          (node, node_result[constants.NV_NODELIST][node]))
667

    
668
    if constants.NV_NODENETTEST not in node_result:
669
      bad = True
670
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
671
    else:
672
      if node_result[constants.NV_NODENETTEST]:
673
        bad = True
674
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
675
        for node in nlist:
676
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
677
                          (node, node_result[constants.NV_NODENETTEST][node]))
678

    
679
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680
    if isinstance(hyp_result, dict):
681
      for hv_name, hv_result in hyp_result.iteritems():
682
        if hv_result is not None:
683
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
684
                      (hv_name, hv_result))
685
    return bad
686

    
687
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688
                      node_instance, feedback_fn):
689
    """Verify an instance.
690

691
    This function checks to see if the required block devices are
692
    available on the instance's node.
693

694
    """
695
    bad = False
696

    
697
    node_current = instanceconfig.primary_node
698

    
699
    node_vol_should = {}
700
    instanceconfig.MapLVsByNode(node_vol_should)
701

    
702
    for node in node_vol_should:
703
      for volume in node_vol_should[node]:
704
        if node not in node_vol_is or volume not in node_vol_is[node]:
705
          feedback_fn("  - ERROR: volume %s missing on node %s" %
706
                          (volume, node))
707
          bad = True
708

    
709
    if not instanceconfig.status == 'down':
710
      if (node_current not in node_instance or
711
          not instance in node_instance[node_current]):
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758
    """Verify N+1 Memory Resilience.
759

760
    Check that if one single node dies we can still start all the instances it
761
    was primary for.
762

763
    """
764
    bad = False
765

    
766
    for node, nodeinfo in node_info.iteritems():
767
      # This code checks that every node which is now listed as secondary has
768
      # enough memory to host all instances it is supposed to should a single
769
      # other node in the cluster fail.
770
      # FIXME: not ready for failover to an arbitrary node
771
      # FIXME: does not support file-backed instances
772
      # WARNING: we currently take into account down instances as well as up
773
      # ones, considering that even if they're down someone might want to start
774
      # them even in the event of a node failure.
775
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776
        needed_mem = 0
777
        for instance in instances:
778
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779
          if bep[constants.BE_AUTO_BALANCE]:
780
            needed_mem += bep[constants.BE_MEMORY]
781
        if nodeinfo['mfree'] < needed_mem:
782
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
783
                      " failovers should node %s fail" % (node, prinode))
784
          bad = True
785
    return bad
786

    
787
  def CheckPrereq(self):
788
    """Check prerequisites.
789

790
    Transform the list of checks we're going to skip into a set and check that
791
    all its members are valid.
792

793
    """
794
    self.skip_set = frozenset(self.op.skip_checks)
795
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
797

    
798
  def BuildHooksEnv(self):
799
    """Build hooks env.
800

801
    Cluster-Verify hooks just rone in the post phase and their failure makes
802
    the output be logged in the verify output and the verification to fail.
803

804
    """
805
    all_nodes = self.cfg.GetNodeList()
806
    # TODO: populate the environment with useful information for verify hooks
807
    env = {}
808
    return env, [], all_nodes
809

    
810
  def Exec(self, feedback_fn):
811
    """Verify integrity of cluster, performing various test on nodes.
812

813
    """
814
    bad = False
815
    feedback_fn("* Verifying global settings")
816
    for msg in self.cfg.VerifyConfig():
817
      feedback_fn("  - ERROR: %s" % msg)
818

    
819
    vg_name = self.cfg.GetVGName()
820
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
822
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824
    i_non_redundant = [] # Non redundant instances
825
    i_non_a_balanced = [] # Non auto-balanced instances
826
    node_volume = {}
827
    node_instance = {}
828
    node_info = {}
829
    instance_cfg = {}
830

    
831
    # FIXME: verify OS list
832
    # do local checksums
833
    master_files = [constants.CLUSTER_CONF_FILE]
834

    
835
    file_names = ssconf.SimpleStore().GetFileList()
836
    file_names.append(constants.SSL_CERT_FILE)
837
    file_names.extend(master_files)
838

    
839
    local_checksums = utils.FingerprintFiles(file_names)
840

    
841
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842
    node_verify_param = {
843
      constants.NV_FILELIST: file_names,
844
      constants.NV_NODELIST: nodelist,
845
      constants.NV_HYPERVISOR: hypervisors,
846
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847
                                  node.secondary_ip) for node in nodeinfo],
848
      constants.NV_LVLIST: vg_name,
849
      constants.NV_INSTANCELIST: hypervisors,
850
      constants.NV_VGLIST: None,
851
      constants.NV_VERSION: None,
852
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
853
      }
854
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855
                                           self.cfg.GetClusterName())
856

    
857
    cluster = self.cfg.GetClusterInfo()
858
    master_node = self.cfg.GetMasterNode()
859
    for node_i in nodeinfo:
860
      node = node_i.name
861
      nresult = all_nvinfo[node].data
862

    
863
      if node == master_node:
864
        ntype = "master"
865
      elif node_i.master_candidate:
866
        ntype = "master candidate"
867
      else:
868
        ntype = "regular"
869
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
870

    
871
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
872
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
873
        bad = True
874
        continue
875

    
876
      result = self._VerifyNode(node_i, file_names, local_checksums,
877
                                nresult, feedback_fn, master_files)
878
      bad = bad or result
879

    
880
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881
      if isinstance(lvdata, basestring):
882
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
883
                    (node, lvdata.encode('string_escape')))
884
        bad = True
885
        node_volume[node] = {}
886
      elif not isinstance(lvdata, dict):
887
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
888
        bad = True
889
        continue
890
      else:
891
        node_volume[node] = lvdata
892

    
893
      # node_instance
894
      idata = nresult.get(constants.NV_INSTANCELIST, None)
895
      if not isinstance(idata, list):
896
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
897
                    (node,))
898
        bad = True
899
        continue
900

    
901
      node_instance[node] = idata
902

    
903
      # node_info
904
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
905
      if not isinstance(nodeinfo, dict):
906
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
907
        bad = True
908
        continue
909

    
910
      try:
911
        node_info[node] = {
912
          "mfree": int(nodeinfo['memory_free']),
913
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
914
          "pinst": [],
915
          "sinst": [],
916
          # dictionary holding all instances this node is secondary for,
917
          # grouped by their primary node. Each key is a cluster node, and each
918
          # value is a list of instances which have the key as primary and the
919
          # current node as secondary.  this is handy to calculate N+1 memory
920
          # availability if you can only failover from a primary to its
921
          # secondary.
922
          "sinst-by-pnode": {},
923
        }
924
      except ValueError:
925
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926
        bad = True
927
        continue
928

    
929
    node_vol_should = {}
930

    
931
    for instance in instancelist:
932
      feedback_fn("* Verifying instance %s" % instance)
933
      inst_config = self.cfg.GetInstanceInfo(instance)
934
      result =  self._VerifyInstance(instance, inst_config, node_volume,
935
                                     node_instance, feedback_fn)
936
      bad = bad or result
937

    
938
      inst_config.MapLVsByNode(node_vol_should)
939

    
940
      instance_cfg[instance] = inst_config
941

    
942
      pnode = inst_config.primary_node
943
      if pnode in node_info:
944
        node_info[pnode]['pinst'].append(instance)
945
      else:
946
        feedback_fn("  - ERROR: instance %s, connection to primary node"
947
                    " %s failed" % (instance, pnode))
948
        bad = True
949

    
950
      # If the instance is non-redundant we cannot survive losing its primary
951
      # node, so we are not N+1 compliant. On the other hand we have no disk
952
      # templates with more than one secondary so that situation is not well
953
      # supported either.
954
      # FIXME: does not support file-backed instances
955
      if len(inst_config.secondary_nodes) == 0:
956
        i_non_redundant.append(instance)
957
      elif len(inst_config.secondary_nodes) > 1:
958
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
959
                    % instance)
960

    
961
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962
        i_non_a_balanced.append(instance)
963

    
964
      for snode in inst_config.secondary_nodes:
965
        if snode in node_info:
966
          node_info[snode]['sinst'].append(instance)
967
          if pnode not in node_info[snode]['sinst-by-pnode']:
968
            node_info[snode]['sinst-by-pnode'][pnode] = []
969
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
970
        else:
971
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
972
                      " %s failed" % (instance, snode))
973

    
974
    feedback_fn("* Verifying orphan volumes")
975
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976
                                       feedback_fn)
977
    bad = bad or result
978

    
979
    feedback_fn("* Verifying remaining instances")
980
    result = self._VerifyOrphanInstances(instancelist, node_instance,
981
                                         feedback_fn)
982
    bad = bad or result
983

    
984
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985
      feedback_fn("* Verifying N+1 Memory redundancy")
986
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
987
      bad = bad or result
988

    
989
    feedback_fn("* Other Notes")
990
    if i_non_redundant:
991
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
992
                  % len(i_non_redundant))
993

    
994
    if i_non_a_balanced:
995
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
996
                  % len(i_non_a_balanced))
997

    
998
    return not bad
999

    
1000
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001
    """Analize the post-hooks' result
1002

1003
    This method analyses the hook result, handles it, and sends some
1004
    nicely-formatted feedback back to the user.
1005

1006
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008
    @param hooks_results: the results of the multi-node hooks rpc call
1009
    @param feedback_fn: function used send feedback back to the caller
1010
    @param lu_result: previous Exec result
1011
    @return: the new Exec result, based on the previous result
1012
        and hook results
1013

1014
    """
1015
    # We only really run POST phase hooks, and are only interested in
1016
    # their results
1017
    if phase == constants.HOOKS_PHASE_POST:
1018
      # Used to change hooks' output to proper indentation
1019
      indent_re = re.compile('^', re.M)
1020
      feedback_fn("* Hooks Results")
1021
      if not hooks_results:
1022
        feedback_fn("  - ERROR: general communication failure")
1023
        lu_result = 1
1024
      else:
1025
        for node_name in hooks_results:
1026
          show_node_header = True
1027
          res = hooks_results[node_name]
1028
          if res.failed or res.data is False or not isinstance(res.data, list):
1029
            feedback_fn("    Communication failure in hooks execution")
1030
            lu_result = 1
1031
            continue
1032
          for script, hkr, output in res.data:
1033
            if hkr == constants.HKR_FAIL:
1034
              # The node header is only shown once, if there are
1035
              # failing hooks on that node
1036
              if show_node_header:
1037
                feedback_fn("  Node %s:" % node_name)
1038
                show_node_header = False
1039
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1040
              output = indent_re.sub('      ', output)
1041
              feedback_fn("%s" % output)
1042
              lu_result = 1
1043

    
1044
      return lu_result
1045

    
1046

    
1047
class LUVerifyDisks(NoHooksLU):
1048
  """Verifies the cluster disks status.
1049

1050
  """
1051
  _OP_REQP = []
1052
  REQ_BGL = False
1053

    
1054
  def ExpandNames(self):
1055
    self.needed_locks = {
1056
      locking.LEVEL_NODE: locking.ALL_SET,
1057
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1058
    }
1059
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1060

    
1061
  def CheckPrereq(self):
1062
    """Check prerequisites.
1063

1064
    This has no prerequisites.
1065

1066
    """
1067
    pass
1068

    
1069
  def Exec(self, feedback_fn):
1070
    """Verify integrity of cluster disks.
1071

1072
    """
1073
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1074

    
1075
    vg_name = self.cfg.GetVGName()
1076
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1077
    instances = [self.cfg.GetInstanceInfo(name)
1078
                 for name in self.cfg.GetInstanceList()]
1079

    
1080
    nv_dict = {}
1081
    for inst in instances:
1082
      inst_lvs = {}
1083
      if (inst.status != "up" or
1084
          inst.disk_template not in constants.DTS_NET_MIRROR):
1085
        continue
1086
      inst.MapLVsByNode(inst_lvs)
1087
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088
      for node, vol_list in inst_lvs.iteritems():
1089
        for vol in vol_list:
1090
          nv_dict[(node, vol)] = inst
1091

    
1092
    if not nv_dict:
1093
      return result
1094

    
1095
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1096

    
1097
    to_act = set()
1098
    for node in nodes:
1099
      # node_volume
1100
      lvs = node_lvs[node]
1101
      if lvs.failed:
1102
        self.LogWarning("Connection to node %s failed: %s" %
1103
                        (node, lvs.data))
1104
        continue
1105
      lvs = lvs.data
1106
      if isinstance(lvs, basestring):
1107
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108
        res_nlvm[node] = lvs
1109
      elif not isinstance(lvs, dict):
1110
        logging.warning("Connection to node %s failed or invalid data"
1111
                        " returned", node)
1112
        res_nodes.append(node)
1113
        continue
1114

    
1115
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116
        inst = nv_dict.pop((node, lv_name), None)
1117
        if (not lv_online and inst is not None
1118
            and inst.name not in res_instances):
1119
          res_instances.append(inst.name)
1120

    
1121
    # any leftover items in nv_dict are missing LVs, let's arrange the
1122
    # data better
1123
    for key, inst in nv_dict.iteritems():
1124
      if inst.name not in res_missing:
1125
        res_missing[inst.name] = []
1126
      res_missing[inst.name].append(key)
1127

    
1128
    return result
1129

    
1130

    
1131
class LURenameCluster(LogicalUnit):
1132
  """Rename the cluster.
1133

1134
  """
1135
  HPATH = "cluster-rename"
1136
  HTYPE = constants.HTYPE_CLUSTER
1137
  _OP_REQP = ["name"]
1138

    
1139
  def BuildHooksEnv(self):
1140
    """Build hooks env.
1141

1142
    """
1143
    env = {
1144
      "OP_TARGET": self.cfg.GetClusterName(),
1145
      "NEW_NAME": self.op.name,
1146
      }
1147
    mn = self.cfg.GetMasterNode()
1148
    return env, [mn], [mn]
1149

    
1150
  def CheckPrereq(self):
1151
    """Verify that the passed name is a valid one.
1152

1153
    """
1154
    hostname = utils.HostInfo(self.op.name)
1155

    
1156
    new_name = hostname.name
1157
    self.ip = new_ip = hostname.ip
1158
    old_name = self.cfg.GetClusterName()
1159
    old_ip = self.cfg.GetMasterIP()
1160
    if new_name == old_name and new_ip == old_ip:
1161
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162
                                 " cluster has changed")
1163
    if new_ip != old_ip:
1164
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166
                                   " reachable on the network. Aborting." %
1167
                                   new_ip)
1168

    
1169
    self.op.name = new_name
1170

    
1171
  def Exec(self, feedback_fn):
1172
    """Rename the cluster.
1173

1174
    """
1175
    clustername = self.op.name
1176
    ip = self.ip
1177

    
1178
    # shutdown the master IP
1179
    master = self.cfg.GetMasterNode()
1180
    result = self.rpc.call_node_stop_master(master, False)
1181
    if result.failed or not result.data:
1182
      raise errors.OpExecError("Could not disable the master role")
1183

    
1184
    try:
1185
      cluster = self.cfg.GetClusterInfo()
1186
      cluster.cluster_name = clustername
1187
      cluster.master_ip = ip
1188
      self.cfg.Update(cluster)
1189

    
1190
      # update the known hosts file
1191
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192
      node_list = self.cfg.GetNodeList()
1193
      try:
1194
        node_list.remove(master)
1195
      except ValueError:
1196
        pass
1197
      result = self.rpc.call_upload_file(node_list,
1198
                                         constants.SSH_KNOWN_HOSTS_FILE)
1199
      for to_node, to_result in result.iteritems():
1200
        if to_result.failed or not to_result.data:
1201
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1202

    
1203
    finally:
1204
      result = self.rpc.call_node_start_master(master, False)
1205
      if result.failed or not result.data:
1206
        self.LogWarning("Could not re-enable the master role on"
1207
                        " the master, please restart manually.")
1208

    
1209

    
1210
def _RecursiveCheckIfLVMBased(disk):
1211
  """Check if the given disk or its children are lvm-based.
1212

1213
  @type disk: L{objects.Disk}
1214
  @param disk: the disk to check
1215
  @rtype: booleean
1216
  @return: boolean indicating whether a LD_LV dev_type was found or not
1217

1218
  """
1219
  if disk.children:
1220
    for chdisk in disk.children:
1221
      if _RecursiveCheckIfLVMBased(chdisk):
1222
        return True
1223
  return disk.dev_type == constants.LD_LV
1224

    
1225

    
1226
class LUSetClusterParams(LogicalUnit):
1227
  """Change the parameters of the cluster.
1228

1229
  """
1230
  HPATH = "cluster-modify"
1231
  HTYPE = constants.HTYPE_CLUSTER
1232
  _OP_REQP = []
1233
  REQ_BGL = False
1234

    
1235
  def CheckParameters(self):
1236
    """Check parameters
1237

1238
    """
1239
    if not hasattr(self.op, "candidate_pool_size"):
1240
      self.op.candidate_pool_size = None
1241
    if self.op.candidate_pool_size is not None:
1242
      try:
1243
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244
      except ValueError, err:
1245
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1246
                                   str(err))
1247
      if self.op.candidate_pool_size < 1:
1248
        raise errors.OpPrereqError("At least one master candidate needed")
1249

    
1250
  def ExpandNames(self):
1251
    # FIXME: in the future maybe other cluster params won't require checking on
1252
    # all nodes to be modified.
1253
    self.needed_locks = {
1254
      locking.LEVEL_NODE: locking.ALL_SET,
1255
    }
1256
    self.share_locks[locking.LEVEL_NODE] = 1
1257

    
1258
  def BuildHooksEnv(self):
1259
    """Build hooks env.
1260

1261
    """
1262
    env = {
1263
      "OP_TARGET": self.cfg.GetClusterName(),
1264
      "NEW_VG_NAME": self.op.vg_name,
1265
      }
1266
    mn = self.cfg.GetMasterNode()
1267
    return env, [mn], [mn]
1268

    
1269
  def CheckPrereq(self):
1270
    """Check prerequisites.
1271

1272
    This checks whether the given params don't conflict and
1273
    if the given volume group is valid.
1274

1275
    """
1276
    # FIXME: This only works because there is only one parameter that can be
1277
    # changed or removed.
1278
    if self.op.vg_name is not None and not self.op.vg_name:
1279
      instances = self.cfg.GetAllInstancesInfo().values()
1280
      for inst in instances:
1281
        for disk in inst.disks:
1282
          if _RecursiveCheckIfLVMBased(disk):
1283
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1284
                                       " lvm-based instances exist")
1285

    
1286
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1287

    
1288
    # if vg_name not None, checks given volume group on all nodes
1289
    if self.op.vg_name:
1290
      vglist = self.rpc.call_vg_list(node_list)
1291
      for node in node_list:
1292
        if vglist[node].failed:
1293
          # ignoring down node
1294
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1295
          continue
1296
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1297
                                              self.op.vg_name,
1298
                                              constants.MIN_VG_SIZE)
1299
        if vgstatus:
1300
          raise errors.OpPrereqError("Error on node '%s': %s" %
1301
                                     (node, vgstatus))
1302

    
1303
    self.cluster = cluster = self.cfg.GetClusterInfo()
1304
    # validate beparams changes
1305
    if self.op.beparams:
1306
      utils.CheckBEParams(self.op.beparams)
1307
      self.new_beparams = cluster.FillDict(
1308
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1309

    
1310
    # hypervisor list/parameters
1311
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312
    if self.op.hvparams:
1313
      if not isinstance(self.op.hvparams, dict):
1314
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315
      for hv_name, hv_dict in self.op.hvparams.items():
1316
        if hv_name not in self.new_hvparams:
1317
          self.new_hvparams[hv_name] = hv_dict
1318
        else:
1319
          self.new_hvparams[hv_name].update(hv_dict)
1320

    
1321
    if self.op.enabled_hypervisors is not None:
1322
      self.hv_list = self.op.enabled_hypervisors
1323
    else:
1324
      self.hv_list = cluster.enabled_hypervisors
1325

    
1326
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327
      # either the enabled list has changed, or the parameters have, validate
1328
      for hv_name, hv_params in self.new_hvparams.items():
1329
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330
            (self.op.enabled_hypervisors and
1331
             hv_name in self.op.enabled_hypervisors)):
1332
          # either this is a new hypervisor, or its parameters have changed
1333
          hv_class = hypervisor.GetHypervisor(hv_name)
1334
          hv_class.CheckParameterSyntax(hv_params)
1335
          _CheckHVParams(self, node_list, hv_name, hv_params)
1336

    
1337
  def Exec(self, feedback_fn):
1338
    """Change the parameters of the cluster.
1339

1340
    """
1341
    if self.op.vg_name is not None:
1342
      if self.op.vg_name != self.cfg.GetVGName():
1343
        self.cfg.SetVGName(self.op.vg_name)
1344
      else:
1345
        feedback_fn("Cluster LVM configuration already in desired"
1346
                    " state, not changing")
1347
    if self.op.hvparams:
1348
      self.cluster.hvparams = self.new_hvparams
1349
    if self.op.enabled_hypervisors is not None:
1350
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351
    if self.op.beparams:
1352
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353
    if self.op.candidate_pool_size is not None:
1354
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1355

    
1356
    self.cfg.Update(self.cluster)
1357

    
1358
    # we want to update nodes after the cluster so that if any errors
1359
    # happen, we have recorded and saved the cluster info
1360
    if self.op.candidate_pool_size is not None:
1361
      node_info = self.cfg.GetAllNodesInfo().values()
1362
      num_candidates = len([node for node in node_info
1363
                            if node.master_candidate])
1364
      num_nodes = len(node_info)
1365
      if num_candidates < self.op.candidate_pool_size:
1366
        random.shuffle(node_info)
1367
        for node in node_info:
1368
          if num_candidates >= self.op.candidate_pool_size:
1369
            break
1370
          if node.master_candidate:
1371
            continue
1372
          node.master_candidate = True
1373
          self.LogInfo("Promoting node %s to master candidate", node.name)
1374
          self.cfg.Update(node)
1375
          self.context.ReaddNode(node)
1376
          num_candidates += 1
1377
      elif num_candidates > self.op.candidate_pool_size:
1378
        self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379
                     " of candidate_pool_size (%d)" %
1380
                     (num_candidates, self.op.candidate_pool_size))
1381

    
1382

    
1383
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384
  """Sleep and poll for an instance's disk to sync.
1385

1386
  """
1387
  if not instance.disks:
1388
    return True
1389

    
1390
  if not oneshot:
1391
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1392

    
1393
  node = instance.primary_node
1394

    
1395
  for dev in instance.disks:
1396
    lu.cfg.SetDiskID(dev, node)
1397

    
1398
  retries = 0
1399
  while True:
1400
    max_time = 0
1401
    done = True
1402
    cumul_degraded = False
1403
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404
    if rstats.failed or not rstats.data:
1405
      lu.LogWarning("Can't get any data from node %s", node)
1406
      retries += 1
1407
      if retries >= 10:
1408
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1409
                                 " aborting." % node)
1410
      time.sleep(6)
1411
      continue
1412
    rstats = rstats.data
1413
    retries = 0
1414
    for i in range(len(rstats)):
1415
      mstat = rstats[i]
1416
      if mstat is None:
1417
        lu.LogWarning("Can't compute data for node %s/%s",
1418
                           node, instance.disks[i].iv_name)
1419
        continue
1420
      # we ignore the ldisk parameter
1421
      perc_done, est_time, is_degraded, _ = mstat
1422
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423
      if perc_done is not None:
1424
        done = False
1425
        if est_time is not None:
1426
          rem_time = "%d estimated seconds remaining" % est_time
1427
          max_time = est_time
1428
        else:
1429
          rem_time = "no time estimate"
1430
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431
                        (instance.disks[i].iv_name, perc_done, rem_time))
1432
    if done or oneshot:
1433
      break
1434

    
1435
    time.sleep(min(60, max_time))
1436

    
1437
  if done:
1438
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439
  return not cumul_degraded
1440

    
1441

    
1442
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443
  """Check that mirrors are not degraded.
1444

1445
  The ldisk parameter, if True, will change the test from the
1446
  is_degraded attribute (which represents overall non-ok status for
1447
  the device(s)) to the ldisk (representing the local storage status).
1448

1449
  """
1450
  lu.cfg.SetDiskID(dev, node)
1451
  if ldisk:
1452
    idx = 6
1453
  else:
1454
    idx = 5
1455

    
1456
  result = True
1457
  if on_primary or dev.AssembleOnSecondary():
1458
    rstats = lu.rpc.call_blockdev_find(node, dev)
1459
    if rstats.failed or not rstats.data:
1460
      logging.warning("Node %s: disk degraded, not found or node down", node)
1461
      result = False
1462
    else:
1463
      result = result and (not rstats.data[idx])
1464
  if dev.children:
1465
    for child in dev.children:
1466
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1467

    
1468
  return result
1469

    
1470

    
1471
class LUDiagnoseOS(NoHooksLU):
1472
  """Logical unit for OS diagnose/query.
1473

1474
  """
1475
  _OP_REQP = ["output_fields", "names"]
1476
  REQ_BGL = False
1477
  _FIELDS_STATIC = utils.FieldSet()
1478
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1479

    
1480
  def ExpandNames(self):
1481
    if self.op.names:
1482
      raise errors.OpPrereqError("Selective OS query not supported")
1483

    
1484
    _CheckOutputFields(static=self._FIELDS_STATIC,
1485
                       dynamic=self._FIELDS_DYNAMIC,
1486
                       selected=self.op.output_fields)
1487

    
1488
    # Lock all nodes, in shared mode
1489
    self.needed_locks = {}
1490
    self.share_locks[locking.LEVEL_NODE] = 1
1491
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1492

    
1493
  def CheckPrereq(self):
1494
    """Check prerequisites.
1495

1496
    """
1497

    
1498
  @staticmethod
1499
  def _DiagnoseByOS(node_list, rlist):
1500
    """Remaps a per-node return list into an a per-os per-node dictionary
1501

1502
    @param node_list: a list with the names of all nodes
1503
    @param rlist: a map with node names as keys and OS objects as values
1504

1505
    @rtype: dict
1506
    @returns: a dictionary with osnames as keys and as value another map, with
1507
        nodes as keys and list of OS objects as values, eg::
1508

1509
          {"debian-etch": {"node1": [<object>,...],
1510
                           "node2": [<object>,]}
1511
          }
1512

1513
    """
1514
    all_os = {}
1515
    for node_name, nr in rlist.iteritems():
1516
      if nr.failed or not nr.data:
1517
        continue
1518
      for os_obj in nr.data:
1519
        if os_obj.name not in all_os:
1520
          # build a list of nodes for this os containing empty lists
1521
          # for each node in node_list
1522
          all_os[os_obj.name] = {}
1523
          for nname in node_list:
1524
            all_os[os_obj.name][nname] = []
1525
        all_os[os_obj.name][node_name].append(os_obj)
1526
    return all_os
1527

    
1528
  def Exec(self, feedback_fn):
1529
    """Compute the list of OSes.
1530

1531
    """
1532
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1533
    node_data = self.rpc.call_os_diagnose(node_list)
1534
    if node_data == False:
1535
      raise errors.OpExecError("Can't gather the list of OSes")
1536
    pol = self._DiagnoseByOS(node_list, node_data)
1537
    output = []
1538
    for os_name, os_data in pol.iteritems():
1539
      row = []
1540
      for field in self.op.output_fields:
1541
        if field == "name":
1542
          val = os_name
1543
        elif field == "valid":
1544
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1545
        elif field == "node_status":
1546
          val = {}
1547
          for node_name, nos_list in os_data.iteritems():
1548
            val[node_name] = [(v.status, v.path) for v in nos_list]
1549
        else:
1550
          raise errors.ParameterError(field)
1551
        row.append(val)
1552
      output.append(row)
1553

    
1554
    return output
1555

    
1556

    
1557
class LURemoveNode(LogicalUnit):
1558
  """Logical unit for removing a node.
1559

1560
  """
1561
  HPATH = "node-remove"
1562
  HTYPE = constants.HTYPE_NODE
1563
  _OP_REQP = ["node_name"]
1564

    
1565
  def BuildHooksEnv(self):
1566
    """Build hooks env.
1567

1568
    This doesn't run on the target node in the pre phase as a failed
1569
    node would then be impossible to remove.
1570

1571
    """
1572
    env = {
1573
      "OP_TARGET": self.op.node_name,
1574
      "NODE_NAME": self.op.node_name,
1575
      }
1576
    all_nodes = self.cfg.GetNodeList()
1577
    all_nodes.remove(self.op.node_name)
1578
    return env, all_nodes, all_nodes
1579

    
1580
  def CheckPrereq(self):
1581
    """Check prerequisites.
1582

1583
    This checks:
1584
     - the node exists in the configuration
1585
     - it does not have primary or secondary instances
1586
     - it's not the master
1587

1588
    Any errors are signalled by raising errors.OpPrereqError.
1589

1590
    """
1591
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1592
    if node is None:
1593
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1594

    
1595
    instance_list = self.cfg.GetInstanceList()
1596

    
1597
    masternode = self.cfg.GetMasterNode()
1598
    if node.name == masternode:
1599
      raise errors.OpPrereqError("Node is the master node,"
1600
                                 " you need to failover first.")
1601

    
1602
    for instance_name in instance_list:
1603
      instance = self.cfg.GetInstanceInfo(instance_name)
1604
      if node.name == instance.primary_node:
1605
        raise errors.OpPrereqError("Instance %s still running on the node,"
1606
                                   " please remove first." % instance_name)
1607
      if node.name in instance.secondary_nodes:
1608
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609
                                   " please remove first." % instance_name)
1610
    self.op.node_name = node.name
1611
    self.node = node
1612

    
1613
  def Exec(self, feedback_fn):
1614
    """Removes the node from the cluster.
1615

1616
    """
1617
    node = self.node
1618
    logging.info("Stopping the node daemon and removing configs from node %s",
1619
                 node.name)
1620

    
1621
    self.context.RemoveNode(node.name)
1622

    
1623
    self.rpc.call_node_leave_cluster(node.name)
1624

    
1625

    
1626
class LUQueryNodes(NoHooksLU):
1627
  """Logical unit for querying nodes.
1628

1629
  """
1630
  _OP_REQP = ["output_fields", "names"]
1631
  REQ_BGL = False
1632
  _FIELDS_DYNAMIC = utils.FieldSet(
1633
    "dtotal", "dfree",
1634
    "mtotal", "mnode", "mfree",
1635
    "bootid",
1636
    "ctotal",
1637
    )
1638

    
1639
  _FIELDS_STATIC = utils.FieldSet(
1640
    "name", "pinst_cnt", "sinst_cnt",
1641
    "pinst_list", "sinst_list",
1642
    "pip", "sip", "tags",
1643
    "serial_no",
1644
    "master_candidate",
1645
    "master",
1646
    )
1647

    
1648
  def ExpandNames(self):
1649
    _CheckOutputFields(static=self._FIELDS_STATIC,
1650
                       dynamic=self._FIELDS_DYNAMIC,
1651
                       selected=self.op.output_fields)
1652

    
1653
    self.needed_locks = {}
1654
    self.share_locks[locking.LEVEL_NODE] = 1
1655

    
1656
    if self.op.names:
1657
      self.wanted = _GetWantedNodes(self, self.op.names)
1658
    else:
1659
      self.wanted = locking.ALL_SET
1660

    
1661
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1662
    if self.do_locking:
1663
      # if we don't request only static fields, we need to lock the nodes
1664
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1665

    
1666

    
1667
  def CheckPrereq(self):
1668
    """Check prerequisites.
1669

1670
    """
1671
    # The validation of the node list is done in the _GetWantedNodes,
1672
    # if non empty, and if empty, there's no validation to do
1673
    pass
1674

    
1675
  def Exec(self, feedback_fn):
1676
    """Computes the list of nodes and their attributes.
1677

1678
    """
1679
    all_info = self.cfg.GetAllNodesInfo()
1680
    if self.do_locking:
1681
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1682
    elif self.wanted != locking.ALL_SET:
1683
      nodenames = self.wanted
1684
      missing = set(nodenames).difference(all_info.keys())
1685
      if missing:
1686
        raise errors.OpExecError(
1687
          "Some nodes were removed before retrieving their data: %s" % missing)
1688
    else:
1689
      nodenames = all_info.keys()
1690

    
1691
    nodenames = utils.NiceSort(nodenames)
1692
    nodelist = [all_info[name] for name in nodenames]
1693

    
1694
    # begin data gathering
1695

    
1696
    if self.do_locking:
1697
      live_data = {}
1698
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1699
                                          self.cfg.GetHypervisorType())
1700
      for name in nodenames:
1701
        nodeinfo = node_data[name]
1702
        if not nodeinfo.failed and nodeinfo.data:
1703
          nodeinfo = nodeinfo.data
1704
          fn = utils.TryConvert
1705
          live_data[name] = {
1706
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1707
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1708
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1709
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1710
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1711
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1712
            "bootid": nodeinfo.get('bootid', None),
1713
            }
1714
        else:
1715
          live_data[name] = {}
1716
    else:
1717
      live_data = dict.fromkeys(nodenames, {})
1718

    
1719
    node_to_primary = dict([(name, set()) for name in nodenames])
1720
    node_to_secondary = dict([(name, set()) for name in nodenames])
1721

    
1722
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1723
                             "sinst_cnt", "sinst_list"))
1724
    if inst_fields & frozenset(self.op.output_fields):
1725
      instancelist = self.cfg.GetInstanceList()
1726

    
1727
      for instance_name in instancelist:
1728
        inst = self.cfg.GetInstanceInfo(instance_name)
1729
        if inst.primary_node in node_to_primary:
1730
          node_to_primary[inst.primary_node].add(inst.name)
1731
        for secnode in inst.secondary_nodes:
1732
          if secnode in node_to_secondary:
1733
            node_to_secondary[secnode].add(inst.name)
1734

    
1735
    master_node = self.cfg.GetMasterNode()
1736

    
1737
    # end data gathering
1738

    
1739
    output = []
1740
    for node in nodelist:
1741
      node_output = []
1742
      for field in self.op.output_fields:
1743
        if field == "name":
1744
          val = node.name
1745
        elif field == "pinst_list":
1746
          val = list(node_to_primary[node.name])
1747
        elif field == "sinst_list":
1748
          val = list(node_to_secondary[node.name])
1749
        elif field == "pinst_cnt":
1750
          val = len(node_to_primary[node.name])
1751
        elif field == "sinst_cnt":
1752
          val = len(node_to_secondary[node.name])
1753
        elif field == "pip":
1754
          val = node.primary_ip
1755
        elif field == "sip":
1756
          val = node.secondary_ip
1757
        elif field == "tags":
1758
          val = list(node.GetTags())
1759
        elif field == "serial_no":
1760
          val = node.serial_no
1761
        elif field == "master_candidate":
1762
          val = node.master_candidate
1763
        elif field == "master":
1764
          val = node.name == master_node
1765
        elif self._FIELDS_DYNAMIC.Matches(field):
1766
          val = live_data[node.name].get(field, None)
1767
        else:
1768
          raise errors.ParameterError(field)
1769
        node_output.append(val)
1770
      output.append(node_output)
1771

    
1772
    return output
1773

    
1774

    
1775
class LUQueryNodeVolumes(NoHooksLU):
1776
  """Logical unit for getting volumes on node(s).
1777

1778
  """
1779
  _OP_REQP = ["nodes", "output_fields"]
1780
  REQ_BGL = False
1781
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1782
  _FIELDS_STATIC = utils.FieldSet("node")
1783

    
1784
  def ExpandNames(self):
1785
    _CheckOutputFields(static=self._FIELDS_STATIC,
1786
                       dynamic=self._FIELDS_DYNAMIC,
1787
                       selected=self.op.output_fields)
1788

    
1789
    self.needed_locks = {}
1790
    self.share_locks[locking.LEVEL_NODE] = 1
1791
    if not self.op.nodes:
1792
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1793
    else:
1794
      self.needed_locks[locking.LEVEL_NODE] = \
1795
        _GetWantedNodes(self, self.op.nodes)
1796

    
1797
  def CheckPrereq(self):
1798
    """Check prerequisites.
1799

1800
    This checks that the fields required are valid output fields.
1801

1802
    """
1803
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1804

    
1805
  def Exec(self, feedback_fn):
1806
    """Computes the list of nodes and their attributes.
1807

1808
    """
1809
    nodenames = self.nodes
1810
    volumes = self.rpc.call_node_volumes(nodenames)
1811

    
1812
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1813
             in self.cfg.GetInstanceList()]
1814

    
1815
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1816

    
1817
    output = []
1818
    for node in nodenames:
1819
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1820
        continue
1821

    
1822
      node_vols = volumes[node].data[:]
1823
      node_vols.sort(key=lambda vol: vol['dev'])
1824

    
1825
      for vol in node_vols:
1826
        node_output = []
1827
        for field in self.op.output_fields:
1828
          if field == "node":
1829
            val = node
1830
          elif field == "phys":
1831
            val = vol['dev']
1832
          elif field == "vg":
1833
            val = vol['vg']
1834
          elif field == "name":
1835
            val = vol['name']
1836
          elif field == "size":
1837
            val = int(float(vol['size']))
1838
          elif field == "instance":
1839
            for inst in ilist:
1840
              if node not in lv_by_node[inst]:
1841
                continue
1842
              if vol['name'] in lv_by_node[inst][node]:
1843
                val = inst.name
1844
                break
1845
            else:
1846
              val = '-'
1847
          else:
1848
            raise errors.ParameterError(field)
1849
          node_output.append(str(val))
1850

    
1851
        output.append(node_output)
1852

    
1853
    return output
1854

    
1855

    
1856
class LUAddNode(LogicalUnit):
1857
  """Logical unit for adding node to the cluster.
1858

1859
  """
1860
  HPATH = "node-add"
1861
  HTYPE = constants.HTYPE_NODE
1862
  _OP_REQP = ["node_name"]
1863

    
1864
  def BuildHooksEnv(self):
1865
    """Build hooks env.
1866

1867
    This will run on all nodes before, and on all nodes + the new node after.
1868

1869
    """
1870
    env = {
1871
      "OP_TARGET": self.op.node_name,
1872
      "NODE_NAME": self.op.node_name,
1873
      "NODE_PIP": self.op.primary_ip,
1874
      "NODE_SIP": self.op.secondary_ip,
1875
      }
1876
    nodes_0 = self.cfg.GetNodeList()
1877
    nodes_1 = nodes_0 + [self.op.node_name, ]
1878
    return env, nodes_0, nodes_1
1879

    
1880
  def CheckPrereq(self):
1881
    """Check prerequisites.
1882

1883
    This checks:
1884
     - the new node is not already in the config
1885
     - it is resolvable
1886
     - its parameters (single/dual homed) matches the cluster
1887

1888
    Any errors are signalled by raising errors.OpPrereqError.
1889

1890
    """
1891
    node_name = self.op.node_name
1892
    cfg = self.cfg
1893

    
1894
    dns_data = utils.HostInfo(node_name)
1895

    
1896
    node = dns_data.name
1897
    primary_ip = self.op.primary_ip = dns_data.ip
1898
    secondary_ip = getattr(self.op, "secondary_ip", None)
1899
    if secondary_ip is None:
1900
      secondary_ip = primary_ip
1901
    if not utils.IsValidIP(secondary_ip):
1902
      raise errors.OpPrereqError("Invalid secondary IP given")
1903
    self.op.secondary_ip = secondary_ip
1904

    
1905
    node_list = cfg.GetNodeList()
1906
    if not self.op.readd and node in node_list:
1907
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1908
                                 node)
1909
    elif self.op.readd and node not in node_list:
1910
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1911

    
1912
    for existing_node_name in node_list:
1913
      existing_node = cfg.GetNodeInfo(existing_node_name)
1914

    
1915
      if self.op.readd and node == existing_node_name:
1916
        if (existing_node.primary_ip != primary_ip or
1917
            existing_node.secondary_ip != secondary_ip):
1918
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1919
                                     " address configuration as before")
1920
        continue
1921

    
1922
      if (existing_node.primary_ip == primary_ip or
1923
          existing_node.secondary_ip == primary_ip or
1924
          existing_node.primary_ip == secondary_ip or
1925
          existing_node.secondary_ip == secondary_ip):
1926
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1927
                                   " existing node %s" % existing_node.name)
1928

    
1929
    # check that the type of the node (single versus dual homed) is the
1930
    # same as for the master
1931
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1932
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1933
    newbie_singlehomed = secondary_ip == primary_ip
1934
    if master_singlehomed != newbie_singlehomed:
1935
      if master_singlehomed:
1936
        raise errors.OpPrereqError("The master has no private ip but the"
1937
                                   " new node has one")
1938
      else:
1939
        raise errors.OpPrereqError("The master has a private ip but the"
1940
                                   " new node doesn't have one")
1941

    
1942
    # checks reachablity
1943
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1944
      raise errors.OpPrereqError("Node not reachable by ping")
1945

    
1946
    if not newbie_singlehomed:
1947
      # check reachability from my secondary ip to newbie's secondary ip
1948
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1949
                           source=myself.secondary_ip):
1950
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1951
                                   " based ping to noded port")
1952

    
1953
    self.new_node = objects.Node(name=node,
1954
                                 primary_ip=primary_ip,
1955
                                 secondary_ip=secondary_ip)
1956

    
1957
  def Exec(self, feedback_fn):
1958
    """Adds the new node to the cluster.
1959

1960
    """
1961
    new_node = self.new_node
1962
    node = new_node.name
1963

    
1964
    # check connectivity
1965
    result = self.rpc.call_version([node])[node]
1966
    result.Raise()
1967
    if result.data:
1968
      if constants.PROTOCOL_VERSION == result.data:
1969
        logging.info("Communication to node %s fine, sw version %s match",
1970
                     node, result.data)
1971
      else:
1972
        raise errors.OpExecError("Version mismatch master version %s,"
1973
                                 " node version %s" %
1974
                                 (constants.PROTOCOL_VERSION, result.data))
1975
    else:
1976
      raise errors.OpExecError("Cannot get version from the new node")
1977

    
1978
    # setup ssh on node
1979
    logging.info("Copy ssh key to node %s", node)
1980
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1981
    keyarray = []
1982
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1983
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1984
                priv_key, pub_key]
1985

    
1986
    for i in keyfiles:
1987
      f = open(i, 'r')
1988
      try:
1989
        keyarray.append(f.read())
1990
      finally:
1991
        f.close()
1992

    
1993
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1994
                                    keyarray[2],
1995
                                    keyarray[3], keyarray[4], keyarray[5])
1996

    
1997
    if result.failed or not result.data:
1998
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1999

    
2000
    # Add node to our /etc/hosts, and add key to known_hosts
2001
    utils.AddHostToEtcHosts(new_node.name)
2002

    
2003
    if new_node.secondary_ip != new_node.primary_ip:
2004
      result = self.rpc.call_node_has_ip_address(new_node.name,
2005
                                                 new_node.secondary_ip)
2006
      if result.failed or not result.data:
2007
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2008
                                 " you gave (%s). Please fix and re-run this"
2009
                                 " command." % new_node.secondary_ip)
2010

    
2011
    node_verify_list = [self.cfg.GetMasterNode()]
2012
    node_verify_param = {
2013
      'nodelist': [node],
2014
      # TODO: do a node-net-test as well?
2015
    }
2016

    
2017
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2018
                                       self.cfg.GetClusterName())
2019
    for verifier in node_verify_list:
2020
      if result[verifier].failed or not result[verifier].data:
2021
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2022
                                 " for remote verification" % verifier)
2023
      if result[verifier].data['nodelist']:
2024
        for failed in result[verifier].data['nodelist']:
2025
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2026
                      (verifier, result[verifier]['nodelist'][failed]))
2027
        raise errors.OpExecError("ssh/hostname verification failed.")
2028

    
2029
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2030
    # including the node just added
2031
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2032
    dist_nodes = self.cfg.GetNodeList()
2033
    if not self.op.readd:
2034
      dist_nodes.append(node)
2035
    if myself.name in dist_nodes:
2036
      dist_nodes.remove(myself.name)
2037

    
2038
    logging.debug("Copying hosts and known_hosts to all nodes")
2039
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2040
      result = self.rpc.call_upload_file(dist_nodes, fname)
2041
      for to_node, to_result in result.iteritems():
2042
        if to_result.failed or not to_result.data:
2043
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2044

    
2045
    to_copy = []
2046
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2047
      to_copy.append(constants.VNC_PASSWORD_FILE)
2048
    for fname in to_copy:
2049
      result = self.rpc.call_upload_file([node], fname)
2050
      if result[node].failed or not result[node]:
2051
        logging.error("Could not copy file %s to node %s", fname, node)
2052

    
2053
    if self.op.readd:
2054
      self.context.ReaddNode(new_node)
2055
    else:
2056
      self.context.AddNode(new_node)
2057

    
2058

    
2059
class LUSetNodeParams(LogicalUnit):
2060
  """Modifies the parameters of a node.
2061

2062
  """
2063
  HPATH = "node-modify"
2064
  HTYPE = constants.HTYPE_NODE
2065
  _OP_REQP = ["node_name"]
2066
  REQ_BGL = False
2067

    
2068
  def CheckArguments(self):
2069
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2070
    if node_name is None:
2071
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2072
    self.op.node_name = node_name
2073
    if not hasattr(self.op, 'master_candidate'):
2074
      raise errors.OpPrereqError("Please pass at least one modification")
2075
    self.op.master_candidate = bool(self.op.master_candidate)
2076

    
2077
  def ExpandNames(self):
2078
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2079

    
2080
  def BuildHooksEnv(self):
2081
    """Build hooks env.
2082

2083
    This runs on the master node.
2084

2085
    """
2086
    env = {
2087
      "OP_TARGET": self.op.node_name,
2088
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2089
      }
2090
    nl = [self.cfg.GetMasterNode(),
2091
          self.op.node_name]
2092
    return env, nl, nl
2093

    
2094
  def CheckPrereq(self):
2095
    """Check prerequisites.
2096

2097
    This only checks the instance list against the existing names.
2098

2099
    """
2100
    force = self.force = self.op.force
2101

    
2102
    if self.op.master_candidate == False:
2103
      if self.op.node_name == self.cfg.GetMasterNode():
2104
        raise errors.OpPrereqError("The master node has to be a"
2105
                                   " master candidate")
2106
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2107
      node_info = self.cfg.GetAllNodesInfo().values()
2108
      num_candidates = len([node for node in node_info
2109
                            if node.master_candidate])
2110
      if num_candidates <= cp_size:
2111
        msg = ("Not enough master candidates (desired"
2112
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2113
        if force:
2114
          self.LogWarning(msg)
2115
        else:
2116
          raise errors.OpPrereqError(msg)
2117

    
2118
    return
2119

    
2120
  def Exec(self, feedback_fn):
2121
    """Modifies a node.
2122

2123
    """
2124
    node = self.cfg.GetNodeInfo(self.op.node_name)
2125

    
2126
    result = []
2127

    
2128
    if self.op.master_candidate is not None:
2129
      node.master_candidate = self.op.master_candidate
2130
      result.append(("master_candidate", str(self.op.master_candidate)))
2131

    
2132
    # this will trigger configuration file update, if needed
2133
    self.cfg.Update(node)
2134
    # this will trigger job queue propagation or cleanup
2135
    if self.op.node_name != self.cfg.GetMasterNode():
2136
      self.context.ReaddNode(node)
2137

    
2138
    return result
2139

    
2140

    
2141
class LUQueryClusterInfo(NoHooksLU):
2142
  """Query cluster configuration.
2143

2144
  """
2145
  _OP_REQP = []
2146
  REQ_BGL = False
2147

    
2148
  def ExpandNames(self):
2149
    self.needed_locks = {}
2150

    
2151
  def CheckPrereq(self):
2152
    """No prerequsites needed for this LU.
2153

2154
    """
2155
    pass
2156

    
2157
  def Exec(self, feedback_fn):
2158
    """Return cluster config.
2159

2160
    """
2161
    cluster = self.cfg.GetClusterInfo()
2162
    result = {
2163
      "software_version": constants.RELEASE_VERSION,
2164
      "protocol_version": constants.PROTOCOL_VERSION,
2165
      "config_version": constants.CONFIG_VERSION,
2166
      "os_api_version": constants.OS_API_VERSION,
2167
      "export_version": constants.EXPORT_VERSION,
2168
      "architecture": (platform.architecture()[0], platform.machine()),
2169
      "name": cluster.cluster_name,
2170
      "master": cluster.master_node,
2171
      "default_hypervisor": cluster.default_hypervisor,
2172
      "enabled_hypervisors": cluster.enabled_hypervisors,
2173
      "hvparams": cluster.hvparams,
2174
      "beparams": cluster.beparams,
2175
      "candidate_pool_size": cluster.candidate_pool_size,
2176
      }
2177

    
2178
    return result
2179

    
2180

    
2181
class LUQueryConfigValues(NoHooksLU):
2182
  """Return configuration values.
2183

2184
  """
2185
  _OP_REQP = []
2186
  REQ_BGL = False
2187
  _FIELDS_DYNAMIC = utils.FieldSet()
2188
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2189

    
2190
  def ExpandNames(self):
2191
    self.needed_locks = {}
2192

    
2193
    _CheckOutputFields(static=self._FIELDS_STATIC,
2194
                       dynamic=self._FIELDS_DYNAMIC,
2195
                       selected=self.op.output_fields)
2196

    
2197
  def CheckPrereq(self):
2198
    """No prerequisites.
2199

2200
    """
2201
    pass
2202

    
2203
  def Exec(self, feedback_fn):
2204
    """Dump a representation of the cluster config to the standard output.
2205

2206
    """
2207
    values = []
2208
    for field in self.op.output_fields:
2209
      if field == "cluster_name":
2210
        entry = self.cfg.GetClusterName()
2211
      elif field == "master_node":
2212
        entry = self.cfg.GetMasterNode()
2213
      elif field == "drain_flag":
2214
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2215
      else:
2216
        raise errors.ParameterError(field)
2217
      values.append(entry)
2218
    return values
2219

    
2220

    
2221
class LUActivateInstanceDisks(NoHooksLU):
2222
  """Bring up an instance's disks.
2223

2224
  """
2225
  _OP_REQP = ["instance_name"]
2226
  REQ_BGL = False
2227

    
2228
  def ExpandNames(self):
2229
    self._ExpandAndLockInstance()
2230
    self.needed_locks[locking.LEVEL_NODE] = []
2231
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2232

    
2233
  def DeclareLocks(self, level):
2234
    if level == locking.LEVEL_NODE:
2235
      self._LockInstancesNodes()
2236

    
2237
  def CheckPrereq(self):
2238
    """Check prerequisites.
2239

2240
    This checks that the instance is in the cluster.
2241

2242
    """
2243
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2244
    assert self.instance is not None, \
2245
      "Cannot retrieve locked instance %s" % self.op.instance_name
2246

    
2247
  def Exec(self, feedback_fn):
2248
    """Activate the disks.
2249

2250
    """
2251
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2252
    if not disks_ok:
2253
      raise errors.OpExecError("Cannot activate block devices")
2254

    
2255
    return disks_info
2256

    
2257

    
2258
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2259
  """Prepare the block devices for an instance.
2260

2261
  This sets up the block devices on all nodes.
2262

2263
  @type lu: L{LogicalUnit}
2264
  @param lu: the logical unit on whose behalf we execute
2265
  @type instance: L{objects.Instance}
2266
  @param instance: the instance for whose disks we assemble
2267
  @type ignore_secondaries: boolean
2268
  @param ignore_secondaries: if true, errors on secondary nodes
2269
      won't result in an error return from the function
2270
  @return: False if the operation failed, otherwise a list of
2271
      (host, instance_visible_name, node_visible_name)
2272
      with the mapping from node devices to instance devices
2273

2274
  """
2275
  device_info = []
2276
  disks_ok = True
2277
  iname = instance.name
2278
  # With the two passes mechanism we try to reduce the window of
2279
  # opportunity for the race condition of switching DRBD to primary
2280
  # before handshaking occured, but we do not eliminate it
2281

    
2282
  # The proper fix would be to wait (with some limits) until the
2283
  # connection has been made and drbd transitions from WFConnection
2284
  # into any other network-connected state (Connected, SyncTarget,
2285
  # SyncSource, etc.)
2286

    
2287
  # 1st pass, assemble on all nodes in secondary mode
2288
  for inst_disk in instance.disks:
2289
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2290
      lu.cfg.SetDiskID(node_disk, node)
2291
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2292
      if result.failed or not result:
2293
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2294
                           " (is_primary=False, pass=1)",
2295
                           inst_disk.iv_name, node)
2296
        if not ignore_secondaries:
2297
          disks_ok = False
2298

    
2299
  # FIXME: race condition on drbd migration to primary
2300

    
2301
  # 2nd pass, do only the primary node
2302
  for inst_disk in instance.disks:
2303
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2304
      if node != instance.primary_node:
2305
        continue
2306
      lu.cfg.SetDiskID(node_disk, node)
2307
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2308
      if result.failed or not result:
2309
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2310
                           " (is_primary=True, pass=2)",
2311
                           inst_disk.iv_name, node)
2312
        disks_ok = False
2313
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2314

    
2315
  # leave the disks configured for the primary node
2316
  # this is a workaround that would be fixed better by
2317
  # improving the logical/physical id handling
2318
  for disk in instance.disks:
2319
    lu.cfg.SetDiskID(disk, instance.primary_node)
2320

    
2321
  return disks_ok, device_info
2322

    
2323

    
2324
def _StartInstanceDisks(lu, instance, force):
2325
  """Start the disks of an instance.
2326

2327
  """
2328
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2329
                                           ignore_secondaries=force)
2330
  if not disks_ok:
2331
    _ShutdownInstanceDisks(lu, instance)
2332
    if force is not None and not force:
2333
      lu.proc.LogWarning("", hint="If the message above refers to a"
2334
                         " secondary node,"
2335
                         " you can retry the operation using '--force'.")
2336
    raise errors.OpExecError("Disk consistency error")
2337

    
2338

    
2339
class LUDeactivateInstanceDisks(NoHooksLU):
2340
  """Shutdown an instance's disks.
2341

2342
  """
2343
  _OP_REQP = ["instance_name"]
2344
  REQ_BGL = False
2345

    
2346
  def ExpandNames(self):
2347
    self._ExpandAndLockInstance()
2348
    self.needed_locks[locking.LEVEL_NODE] = []
2349
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2350

    
2351
  def DeclareLocks(self, level):
2352
    if level == locking.LEVEL_NODE:
2353
      self._LockInstancesNodes()
2354

    
2355
  def CheckPrereq(self):
2356
    """Check prerequisites.
2357

2358
    This checks that the instance is in the cluster.
2359

2360
    """
2361
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2362
    assert self.instance is not None, \
2363
      "Cannot retrieve locked instance %s" % self.op.instance_name
2364

    
2365
  def Exec(self, feedback_fn):
2366
    """Deactivate the disks
2367

2368
    """
2369
    instance = self.instance
2370
    _SafeShutdownInstanceDisks(self, instance)
2371

    
2372

    
2373
def _SafeShutdownInstanceDisks(lu, instance):
2374
  """Shutdown block devices of an instance.
2375

2376
  This function checks if an instance is running, before calling
2377
  _ShutdownInstanceDisks.
2378

2379
  """
2380
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2381
                                      [instance.hypervisor])
2382
  ins_l = ins_l[instance.primary_node]
2383
  if ins_l.failed or not isinstance(ins_l.data, list):
2384
    raise errors.OpExecError("Can't contact node '%s'" %
2385
                             instance.primary_node)
2386

    
2387
  if instance.name in ins_l.data:
2388
    raise errors.OpExecError("Instance is running, can't shutdown"
2389
                             " block devices.")
2390

    
2391
  _ShutdownInstanceDisks(lu, instance)
2392

    
2393

    
2394
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2395
  """Shutdown block devices of an instance.
2396

2397
  This does the shutdown on all nodes of the instance.
2398

2399
  If the ignore_primary is false, errors on the primary node are
2400
  ignored.
2401

2402
  """
2403
  result = True
2404
  for disk in instance.disks:
2405
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2406
      lu.cfg.SetDiskID(top_disk, node)
2407
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2408
      if result.failed or not result.data:
2409
        logging.error("Could not shutdown block device %s on node %s",
2410
                      disk.iv_name, node)
2411
        if not ignore_primary or node != instance.primary_node:
2412
          result = False
2413
  return result
2414

    
2415

    
2416
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2417
  """Checks if a node has enough free memory.
2418

2419
  This function check if a given node has the needed amount of free
2420
  memory. In case the node has less memory or we cannot get the
2421
  information from the node, this function raise an OpPrereqError
2422
  exception.
2423

2424
  @type lu: C{LogicalUnit}
2425
  @param lu: a logical unit from which we get configuration data
2426
  @type node: C{str}
2427
  @param node: the node to check
2428
  @type reason: C{str}
2429
  @param reason: string to use in the error message
2430
  @type requested: C{int}
2431
  @param requested: the amount of memory in MiB to check for
2432
  @type hypervisor: C{str}
2433
  @param hypervisor: the hypervisor to ask for memory stats
2434
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2435
      we cannot check the node
2436

2437
  """
2438
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2439
  nodeinfo[node].Raise()
2440
  free_mem = nodeinfo[node].data.get('memory_free')
2441
  if not isinstance(free_mem, int):
2442
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2443
                             " was '%s'" % (node, free_mem))
2444
  if requested > free_mem:
2445
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2446
                             " needed %s MiB, available %s MiB" %
2447
                             (node, reason, requested, free_mem))
2448

    
2449

    
2450
class LUStartupInstance(LogicalUnit):
2451
  """Starts an instance.
2452

2453
  """
2454
  HPATH = "instance-start"
2455
  HTYPE = constants.HTYPE_INSTANCE
2456
  _OP_REQP = ["instance_name", "force"]
2457
  REQ_BGL = False
2458

    
2459
  def ExpandNames(self):
2460
    self._ExpandAndLockInstance()
2461

    
2462
  def BuildHooksEnv(self):
2463
    """Build hooks env.
2464

2465
    This runs on master, primary and secondary nodes of the instance.
2466

2467
    """
2468
    env = {
2469
      "FORCE": self.op.force,
2470
      }
2471
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2472
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2473
          list(self.instance.secondary_nodes))
2474
    return env, nl, nl
2475

    
2476
  def CheckPrereq(self):
2477
    """Check prerequisites.
2478

2479
    This checks that the instance is in the cluster.
2480

2481
    """
2482
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2483
    assert self.instance is not None, \
2484
      "Cannot retrieve locked instance %s" % self.op.instance_name
2485

    
2486
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2487
    # check bridges existance
2488
    _CheckInstanceBridgesExist(self, instance)
2489

    
2490
    _CheckNodeFreeMemory(self, instance.primary_node,
2491
                         "starting instance %s" % instance.name,
2492
                         bep[constants.BE_MEMORY], instance.hypervisor)
2493

    
2494
  def Exec(self, feedback_fn):
2495
    """Start the instance.
2496

2497
    """
2498
    instance = self.instance
2499
    force = self.op.force
2500
    extra_args = getattr(self.op, "extra_args", "")
2501

    
2502
    self.cfg.MarkInstanceUp(instance.name)
2503

    
2504
    node_current = instance.primary_node
2505

    
2506
    _StartInstanceDisks(self, instance, force)
2507

    
2508
    result = self.rpc.call_instance_start(node_current, instance, extra_args)
2509
    if result.failed or not result.data:
2510
      _ShutdownInstanceDisks(self, instance)
2511
      raise errors.OpExecError("Could not start instance")
2512

    
2513

    
2514
class LURebootInstance(LogicalUnit):
2515
  """Reboot an instance.
2516

2517
  """
2518
  HPATH = "instance-reboot"
2519
  HTYPE = constants.HTYPE_INSTANCE
2520
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2521
  REQ_BGL = False
2522

    
2523
  def ExpandNames(self):
2524
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2525
                                   constants.INSTANCE_REBOOT_HARD,
2526
                                   constants.INSTANCE_REBOOT_FULL]:
2527
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2528
                                  (constants.INSTANCE_REBOOT_SOFT,
2529
                                   constants.INSTANCE_REBOOT_HARD,
2530
                                   constants.INSTANCE_REBOOT_FULL))
2531
    self._ExpandAndLockInstance()
2532

    
2533
  def BuildHooksEnv(self):
2534
    """Build hooks env.
2535

2536
    This runs on master, primary and secondary nodes of the instance.
2537

2538
    """
2539
    env = {
2540
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2541
      }
2542
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2543
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2544
          list(self.instance.secondary_nodes))
2545
    return env, nl, nl
2546

    
2547
  def CheckPrereq(self):
2548
    """Check prerequisites.
2549

2550
    This checks that the instance is in the cluster.
2551

2552
    """
2553
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2554
    assert self.instance is not None, \
2555
      "Cannot retrieve locked instance %s" % self.op.instance_name
2556

    
2557
    # check bridges existance
2558
    _CheckInstanceBridgesExist(self, instance)
2559

    
2560
  def Exec(self, feedback_fn):
2561
    """Reboot the instance.
2562

2563
    """
2564
    instance = self.instance
2565
    ignore_secondaries = self.op.ignore_secondaries
2566
    reboot_type = self.op.reboot_type
2567
    extra_args = getattr(self.op, "extra_args", "")
2568

    
2569
    node_current = instance.primary_node
2570

    
2571
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2572
                       constants.INSTANCE_REBOOT_HARD]:
2573
      result = self.rpc.call_instance_reboot(node_current, instance,
2574
                                             reboot_type, extra_args)
2575
      if result.failed or not result.data:
2576
        raise errors.OpExecError("Could not reboot instance")
2577
    else:
2578
      if not self.rpc.call_instance_shutdown(node_current, instance):
2579
        raise errors.OpExecError("could not shutdown instance for full reboot")
2580
      _ShutdownInstanceDisks(self, instance)
2581
      _StartInstanceDisks(self, instance, ignore_secondaries)
2582
      result = self.rpc.call_instance_start(node_current, instance, extra_args)
2583
      if result.failed or not result.data:
2584
        _ShutdownInstanceDisks(self, instance)
2585
        raise errors.OpExecError("Could not start instance for full reboot")
2586

    
2587
    self.cfg.MarkInstanceUp(instance.name)
2588

    
2589

    
2590
class LUShutdownInstance(LogicalUnit):
2591
  """Shutdown an instance.
2592

2593
  """
2594
  HPATH = "instance-stop"
2595
  HTYPE = constants.HTYPE_INSTANCE
2596
  _OP_REQP = ["instance_name"]
2597
  REQ_BGL = False
2598

    
2599
  def ExpandNames(self):
2600
    self._ExpandAndLockInstance()
2601

    
2602
  def BuildHooksEnv(self):
2603
    """Build hooks env.
2604

2605
    This runs on master, primary and secondary nodes of the instance.
2606

2607
    """
2608
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2609
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2610
          list(self.instance.secondary_nodes))
2611
    return env, nl, nl
2612

    
2613
  def CheckPrereq(self):
2614
    """Check prerequisites.
2615

2616
    This checks that the instance is in the cluster.
2617

2618
    """
2619
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2620
    assert self.instance is not None, \
2621
      "Cannot retrieve locked instance %s" % self.op.instance_name
2622

    
2623
  def Exec(self, feedback_fn):
2624
    """Shutdown the instance.
2625

2626
    """
2627
    instance = self.instance
2628
    node_current = instance.primary_node
2629
    self.cfg.MarkInstanceDown(instance.name)
2630
    result = self.rpc.call_instance_shutdown(node_current, instance)
2631
    if result.failed or not result.data:
2632
      self.proc.LogWarning("Could not shutdown instance")
2633

    
2634
    _ShutdownInstanceDisks(self, instance)
2635

    
2636

    
2637
class LUReinstallInstance(LogicalUnit):
2638
  """Reinstall an instance.
2639

2640
  """
2641
  HPATH = "instance-reinstall"
2642
  HTYPE = constants.HTYPE_INSTANCE
2643
  _OP_REQP = ["instance_name"]
2644
  REQ_BGL = False
2645

    
2646
  def ExpandNames(self):
2647
    self._ExpandAndLockInstance()
2648

    
2649
  def BuildHooksEnv(self):
2650
    """Build hooks env.
2651

2652
    This runs on master, primary and secondary nodes of the instance.
2653

2654
    """
2655
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2656
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2657
          list(self.instance.secondary_nodes))
2658
    return env, nl, nl
2659

    
2660
  def CheckPrereq(self):
2661
    """Check prerequisites.
2662

2663
    This checks that the instance is in the cluster and is not running.
2664

2665
    """
2666
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2667
    assert instance is not None, \
2668
      "Cannot retrieve locked instance %s" % self.op.instance_name
2669

    
2670
    if instance.disk_template == constants.DT_DISKLESS:
2671
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2672
                                 self.op.instance_name)
2673
    if instance.status != "down":
2674
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2675
                                 self.op.instance_name)
2676
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2677
                                              instance.name,
2678
                                              instance.hypervisor)
2679
    if remote_info.failed or remote_info.data:
2680
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2681
                                 (self.op.instance_name,
2682
                                  instance.primary_node))
2683

    
2684
    self.op.os_type = getattr(self.op, "os_type", None)
2685
    if self.op.os_type is not None:
2686
      # OS verification
2687
      pnode = self.cfg.GetNodeInfo(
2688
        self.cfg.ExpandNodeName(instance.primary_node))
2689
      if pnode is None:
2690
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2691
                                   self.op.pnode)
2692
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2693
      result.Raise()
2694
      if not isinstance(result.data, objects.OS):
2695
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2696
                                   " primary node"  % self.op.os_type)
2697

    
2698
    self.instance = instance
2699

    
2700
  def Exec(self, feedback_fn):
2701
    """Reinstall the instance.
2702

2703
    """
2704
    inst = self.instance
2705

    
2706
    if self.op.os_type is not None:
2707
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2708
      inst.os = self.op.os_type
2709
      self.cfg.Update(inst)
2710

    
2711
    _StartInstanceDisks(self, inst, None)
2712
    try:
2713
      feedback_fn("Running the instance OS create scripts...")
2714
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2715
      result.Raise()
2716
      if not result.data:
2717
        raise errors.OpExecError("Could not install OS for instance %s"
2718
                                 " on node %s" %
2719
                                 (inst.name, inst.primary_node))
2720
    finally:
2721
      _ShutdownInstanceDisks(self, inst)
2722

    
2723

    
2724
class LURenameInstance(LogicalUnit):
2725
  """Rename an instance.
2726

2727
  """
2728
  HPATH = "instance-rename"
2729
  HTYPE = constants.HTYPE_INSTANCE
2730
  _OP_REQP = ["instance_name", "new_name"]
2731

    
2732
  def BuildHooksEnv(self):
2733
    """Build hooks env.
2734

2735
    This runs on master, primary and secondary nodes of the instance.
2736

2737
    """
2738
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2739
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2740
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2741
          list(self.instance.secondary_nodes))
2742
    return env, nl, nl
2743

    
2744
  def CheckPrereq(self):
2745
    """Check prerequisites.
2746

2747
    This checks that the instance is in the cluster and is not running.
2748

2749
    """
2750
    instance = self.cfg.GetInstanceInfo(
2751
      self.cfg.ExpandInstanceName(self.op.instance_name))
2752
    if instance is None:
2753
      raise errors.OpPrereqError("Instance '%s' not known" %
2754
                                 self.op.instance_name)
2755
    if instance.status != "down":
2756
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2757
                                 self.op.instance_name)
2758
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2759
                                              instance.name,
2760
                                              instance.hypervisor)
2761
    remote_info.Raise()
2762
    if remote_info.data:
2763
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2764
                                 (self.op.instance_name,
2765
                                  instance.primary_node))
2766
    self.instance = instance
2767

    
2768
    # new name verification
2769
    name_info = utils.HostInfo(self.op.new_name)
2770

    
2771
    self.op.new_name = new_name = name_info.name
2772
    instance_list = self.cfg.GetInstanceList()
2773
    if new_name in instance_list:
2774
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2775
                                 new_name)
2776

    
2777
    if not getattr(self.op, "ignore_ip", False):
2778
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2779
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2780
                                   (name_info.ip, new_name))
2781

    
2782

    
2783
  def Exec(self, feedback_fn):
2784
    """Reinstall the instance.
2785

2786
    """
2787
    inst = self.instance
2788
    old_name = inst.name
2789

    
2790
    if inst.disk_template == constants.DT_FILE:
2791
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2792

    
2793
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2794
    # Change the instance lock. This is definitely safe while we hold the BGL
2795
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2796
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2797

    
2798
    # re-read the instance from the configuration after rename
2799
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2800

    
2801
    if inst.disk_template == constants.DT_FILE:
2802
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2803
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2804
                                                     old_file_storage_dir,
2805
                                                     new_file_storage_dir)
2806
      result.Raise()
2807
      if not result.data:
2808
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2809
                                 " directory '%s' to '%s' (but the instance"
2810
                                 " has been renamed in Ganeti)" % (
2811
                                 inst.primary_node, old_file_storage_dir,
2812
                                 new_file_storage_dir))
2813

    
2814
      if not result.data[0]:
2815
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2816
                                 " (but the instance has been renamed in"
2817
                                 " Ganeti)" % (old_file_storage_dir,
2818
                                               new_file_storage_dir))
2819

    
2820
    _StartInstanceDisks(self, inst, None)
2821
    try:
2822
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2823
                                                 old_name)
2824
      if result.failed or not result.data:
2825
        msg = ("Could not run OS rename script for instance %s on node %s"
2826
               " (but the instance has been renamed in Ganeti)" %
2827
               (inst.name, inst.primary_node))
2828
        self.proc.LogWarning(msg)
2829
    finally:
2830
      _ShutdownInstanceDisks(self, inst)
2831

    
2832

    
2833
class LURemoveInstance(LogicalUnit):
2834
  """Remove an instance.
2835

2836
  """
2837
  HPATH = "instance-remove"
2838
  HTYPE = constants.HTYPE_INSTANCE
2839
  _OP_REQP = ["instance_name", "ignore_failures"]
2840
  REQ_BGL = False
2841

    
2842
  def ExpandNames(self):
2843
    self._ExpandAndLockInstance()
2844
    self.needed_locks[locking.LEVEL_NODE] = []
2845
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2846

    
2847
  def DeclareLocks(self, level):
2848
    if level == locking.LEVEL_NODE:
2849
      self._LockInstancesNodes()
2850

    
2851
  def BuildHooksEnv(self):
2852
    """Build hooks env.
2853

2854
    This runs on master, primary and secondary nodes of the instance.
2855

2856
    """
2857
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2858
    nl = [self.cfg.GetMasterNode()]
2859
    return env, nl, nl
2860

    
2861
  def CheckPrereq(self):
2862
    """Check prerequisites.
2863

2864
    This checks that the instance is in the cluster.
2865

2866
    """
2867
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2868
    assert self.instance is not None, \
2869
      "Cannot retrieve locked instance %s" % self.op.instance_name
2870

    
2871
  def Exec(self, feedback_fn):
2872
    """Remove the instance.
2873

2874
    """
2875
    instance = self.instance
2876
    logging.info("Shutting down instance %s on node %s",
2877
                 instance.name, instance.primary_node)
2878

    
2879
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2880
    if result.failed or not result.data:
2881
      if self.op.ignore_failures:
2882
        feedback_fn("Warning: can't shutdown instance")
2883
      else:
2884
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2885
                                 (instance.name, instance.primary_node))
2886

    
2887
    logging.info("Removing block devices for instance %s", instance.name)
2888

    
2889
    if not _RemoveDisks(self, instance):
2890
      if self.op.ignore_failures:
2891
        feedback_fn("Warning: can't remove instance's disks")
2892
      else:
2893
        raise errors.OpExecError("Can't remove instance's disks")
2894

    
2895
    logging.info("Removing instance %s out of cluster config", instance.name)
2896

    
2897
    self.cfg.RemoveInstance(instance.name)
2898
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2899

    
2900

    
2901
class LUQueryInstances(NoHooksLU):
2902
  """Logical unit for querying instances.
2903

2904
  """
2905
  _OP_REQP = ["output_fields", "names"]
2906
  REQ_BGL = False
2907
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2908
                                    "admin_state", "admin_ram",
2909
                                    "disk_template", "ip", "mac", "bridge",
2910
                                    "sda_size", "sdb_size", "vcpus", "tags",
2911
                                    "network_port", "beparams",
2912
                                    "(disk).(size)/([0-9]+)",
2913
                                    "(disk).(sizes)",
2914
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2915
                                    "(nic).(macs|ips|bridges)",
2916
                                    "(disk|nic).(count)",
2917
                                    "serial_no", "hypervisor", "hvparams",] +
2918
                                  ["hv/%s" % name
2919
                                   for name in constants.HVS_PARAMETERS] +
2920
                                  ["be/%s" % name
2921
                                   for name in constants.BES_PARAMETERS])
2922
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2923

    
2924

    
2925
  def ExpandNames(self):
2926
    _CheckOutputFields(static=self._FIELDS_STATIC,
2927
                       dynamic=self._FIELDS_DYNAMIC,
2928
                       selected=self.op.output_fields)
2929

    
2930
    self.needed_locks = {}
2931
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2932
    self.share_locks[locking.LEVEL_NODE] = 1
2933

    
2934
    if self.op.names:
2935
      self.wanted = _GetWantedInstances(self, self.op.names)
2936
    else:
2937
      self.wanted = locking.ALL_SET
2938

    
2939
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2940
    if self.do_locking:
2941
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2942
      self.needed_locks[locking.LEVEL_NODE] = []
2943
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2944

    
2945
  def DeclareLocks(self, level):
2946
    if level == locking.LEVEL_NODE and self.do_locking:
2947
      self._LockInstancesNodes()
2948

    
2949
  def CheckPrereq(self):
2950
    """Check prerequisites.
2951

2952
    """
2953
    pass
2954

    
2955
  def Exec(self, feedback_fn):
2956
    """Computes the list of nodes and their attributes.
2957

2958
    """
2959
    all_info = self.cfg.GetAllInstancesInfo()
2960
    if self.do_locking:
2961
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2962
    elif self.wanted != locking.ALL_SET:
2963
      instance_names = self.wanted
2964
      missing = set(instance_names).difference(all_info.keys())
2965
      if missing:
2966
        raise errors.OpExecError(
2967
          "Some instances were removed before retrieving their data: %s"
2968
          % missing)
2969
    else:
2970
      instance_names = all_info.keys()
2971

    
2972
    instance_names = utils.NiceSort(instance_names)
2973
    instance_list = [all_info[iname] for iname in instance_names]
2974

    
2975
    # begin data gathering
2976

    
2977
    nodes = frozenset([inst.primary_node for inst in instance_list])
2978
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2979

    
2980
    bad_nodes = []
2981
    if self.do_locking:
2982
      live_data = {}
2983
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2984
      for name in nodes:
2985
        result = node_data[name]
2986
        if result.failed:
2987
          bad_nodes.append(name)
2988
        else:
2989
          if result.data:
2990
            live_data.update(result.data)
2991
            # else no instance is alive
2992
    else:
2993
      live_data = dict([(name, {}) for name in instance_names])
2994

    
2995
    # end data gathering
2996

    
2997
    HVPREFIX = "hv/"
2998
    BEPREFIX = "be/"
2999
    output = []
3000
    for instance in instance_list:
3001
      iout = []
3002
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3003
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3004
      for field in self.op.output_fields:
3005
        st_match = self._FIELDS_STATIC.Matches(field)
3006
        if field == "name":
3007
          val = instance.name
3008
        elif field == "os":
3009
          val = instance.os
3010
        elif field == "pnode":
3011
          val = instance.primary_node
3012
        elif field == "snodes":
3013
          val = list(instance.secondary_nodes)
3014
        elif field == "admin_state":
3015
          val = (instance.status != "down")
3016
        elif field == "oper_state":
3017
          if instance.primary_node in bad_nodes:
3018
            val = None
3019
          else:
3020
            val = bool(live_data.get(instance.name))
3021
        elif field == "status":
3022
          if instance.primary_node in bad_nodes:
3023
            val = "ERROR_nodedown"
3024
          else:
3025
            running = bool(live_data.get(instance.name))
3026
            if running:
3027
              if instance.status != "down":
3028
                val = "running"
3029
              else:
3030
                val = "ERROR_up"
3031
            else:
3032
              if instance.status != "down":
3033
                val = "ERROR_down"
3034
              else:
3035
                val = "ADMIN_down"
3036
        elif field == "oper_ram":
3037
          if instance.primary_node in bad_nodes:
3038
            val = None
3039
          elif instance.name in live_data:
3040
            val = live_data[instance.name].get("memory", "?")
3041
          else:
3042
            val = "-"
3043
        elif field == "disk_template":
3044
          val = instance.disk_template
3045
        elif field == "ip":
3046
          val = instance.nics[0].ip
3047
        elif field == "bridge":
3048
          val = instance.nics[0].bridge
3049
        elif field == "mac":
3050
          val = instance.nics[0].mac
3051
        elif field == "sda_size" or field == "sdb_size":
3052
          idx = ord(field[2]) - ord('a')
3053
          try:
3054
            val = instance.FindDisk(idx).size
3055
          except errors.OpPrereqError:
3056
            val = None
3057
        elif field == "tags":
3058
          val = list(instance.GetTags())
3059
        elif field == "serial_no":
3060
          val = instance.serial_no
3061
        elif field == "network_port":
3062
          val = instance.network_port
3063
        elif field == "hypervisor":
3064
          val = instance.hypervisor
3065
        elif field == "hvparams":
3066
          val = i_hv
3067
        elif (field.startswith(HVPREFIX) and
3068
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3069
          val = i_hv.get(field[len(HVPREFIX):], None)
3070
        elif field == "beparams":
3071
          val = i_be
3072
        elif (field.startswith(BEPREFIX) and
3073
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3074
          val = i_be.get(field[len(BEPREFIX):], None)
3075
        elif st_match and st_match.groups():
3076
          # matches a variable list
3077
          st_groups = st_match.groups()
3078
          if st_groups and st_groups[0] == "disk":
3079
            if st_groups[1] == "count":
3080
              val = len(instance.disks)
3081
            elif st_groups[1] == "sizes":
3082
              val = [disk.size for disk in instance.disks]
3083
            elif st_groups[1] == "size":
3084
              try:
3085
                val = instance.FindDisk(st_groups[2]).size
3086
              except errors.OpPrereqError:
3087
                val = None
3088
            else:
3089
              assert False, "Unhandled disk parameter"
3090
          elif st_groups[0] == "nic":
3091
            if st_groups[1] == "count":
3092
              val = len(instance.nics)
3093
            elif st_groups[1] == "macs":
3094
              val = [nic.mac for nic in instance.nics]
3095
            elif st_groups[1] == "ips":
3096
              val = [nic.ip for nic in instance.nics]
3097
            elif st_groups[1] == "bridges":
3098
              val = [nic.bridge for nic in instance.nics]
3099
            else:
3100
              # index-based item
3101
              nic_idx = int(st_groups[2])
3102
              if nic_idx >= len(instance.nics):
3103
                val = None
3104
              else:
3105
                if st_groups[1] == "mac":
3106
                  val = instance.nics[nic_idx].mac
3107
                elif st_groups[1] == "ip":
3108
                  val = instance.nics[nic_idx].ip
3109
                elif st_groups[1] == "bridge":
3110
                  val = instance.nics[nic_idx].bridge
3111
                else:
3112
                  assert False, "Unhandled NIC parameter"
3113
          else:
3114
            assert False, "Unhandled variable parameter"
3115
        else:
3116
          raise errors.ParameterError(field)
3117
        iout.append(val)
3118
      output.append(iout)
3119

    
3120
    return output
3121

    
3122

    
3123
class LUFailoverInstance(LogicalUnit):
3124
  """Failover an instance.
3125

3126
  """
3127
  HPATH = "instance-failover"
3128
  HTYPE = constants.HTYPE_INSTANCE
3129
  _OP_REQP = ["instance_name", "ignore_consistency"]
3130
  REQ_BGL = False
3131

    
3132
  def ExpandNames(self):
3133
    self._ExpandAndLockInstance()
3134
    self.needed_locks[locking.LEVEL_NODE] = []
3135
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3136

    
3137
  def DeclareLocks(self, level):
3138
    if level == locking.LEVEL_NODE:
3139
      self._LockInstancesNodes()
3140

    
3141
  def BuildHooksEnv(self):
3142
    """Build hooks env.
3143

3144
    This runs on master, primary and secondary nodes of the instance.
3145

3146
    """
3147
    env = {
3148
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3149
      }
3150
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3151
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3152
    return env, nl, nl
3153

    
3154
  def CheckPrereq(self):
3155
    """Check prerequisites.
3156

3157
    This checks that the instance is in the cluster.
3158

3159
    """
3160
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3161
    assert self.instance is not None, \
3162
      "Cannot retrieve locked instance %s" % self.op.instance_name
3163

    
3164
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3165
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3166
      raise errors.OpPrereqError("Instance's disk layout is not"
3167
                                 " network mirrored, cannot failover.")
3168

    
3169
    secondary_nodes = instance.secondary_nodes
3170
    if not secondary_nodes:
3171
      raise errors.ProgrammerError("no secondary node but using "
3172
                                   "a mirrored disk template")
3173

    
3174
    target_node = secondary_nodes[0]
3175
    # check memory requirements on the secondary node
3176
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3177
                         instance.name, bep[constants.BE_MEMORY],
3178
                         instance.hypervisor)
3179

    
3180
    # check bridge existance
3181
    brlist = [nic.bridge for nic in instance.nics]
3182
    result = self.rpc.call_bridges_exist(target_node, brlist)
3183
    result.Raise()
3184
    if not result.data:
3185
      raise errors.OpPrereqError("One or more target bridges %s does not"
3186
                                 " exist on destination node '%s'" %
3187
                                 (brlist, target_node))
3188

    
3189
  def Exec(self, feedback_fn):
3190
    """Failover an instance.
3191

3192
    The failover is done by shutting it down on its present node and
3193
    starting it on the secondary.
3194

3195
    """
3196
    instance = self.instance
3197

    
3198
    source_node = instance.primary_node
3199
    target_node = instance.secondary_nodes[0]
3200

    
3201
    feedback_fn("* checking disk consistency between source and target")
3202
    for dev in instance.disks:
3203
      # for drbd, these are drbd over lvm
3204
      if not _CheckDiskConsistency(self, dev, target_node, False):
3205
        if instance.status == "up" and not self.op.ignore_consistency:
3206
          raise errors.OpExecError("Disk %s is degraded on target node,"
3207
                                   " aborting failover." % dev.iv_name)
3208

    
3209
    feedback_fn("* shutting down instance on source node")
3210
    logging.info("Shutting down instance %s on node %s",
3211
                 instance.name, source_node)
3212

    
3213
    result = self.rpc.call_instance_shutdown(source_node, instance)
3214
    if result.failed or not result.data:
3215
      if self.op.ignore_consistency:
3216
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3217
                             " Proceeding"
3218
                             " anyway. Please make sure node %s is down",
3219
                             instance.name, source_node, source_node)
3220
      else:
3221
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3222
                                 (instance.name, source_node))
3223

    
3224
    feedback_fn("* deactivating the instance's disks on source node")
3225
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3226
      raise errors.OpExecError("Can't shut down the instance's disks.")
3227

    
3228
    instance.primary_node = target_node
3229
    # distribute new instance config to the other nodes
3230
    self.cfg.Update(instance)
3231

    
3232
    # Only start the instance if it's marked as up
3233
    if instance.status == "up":
3234
      feedback_fn("* activating the instance's disks on target node")
3235
      logging.info("Starting instance %s on node %s",
3236
                   instance.name, target_node)
3237

    
3238
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3239
                                               ignore_secondaries=True)
3240
      if not disks_ok:
3241
        _ShutdownInstanceDisks(self, instance)
3242
        raise errors.OpExecError("Can't activate the instance's disks")
3243

    
3244
      feedback_fn("* starting the instance on the target node")
3245
      result = self.rpc.call_instance_start(target_node, instance, None)
3246
      if result.failed or not result.data:
3247
        _ShutdownInstanceDisks(self, instance)
3248
        raise errors.OpExecError("Could not start instance %s on node %s." %
3249
                                 (instance.name, target_node))
3250

    
3251

    
3252
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3253
  """Create a tree of block devices on the primary node.
3254

3255
  This always creates all devices.
3256

3257
  """
3258
  if device.children:
3259
    for child in device.children:
3260
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3261
        return False
3262

    
3263
  lu.cfg.SetDiskID(device, node)
3264
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3265
                                       instance.name, True, info)
3266
  if new_id.failed or not new_id.data:
3267
    return False
3268
  if device.physical_id is None:
3269
    device.physical_id = new_id
3270
  return True
3271

    
3272

    
3273
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3274
  """Create a tree of block devices on a secondary node.
3275

3276
  If this device type has to be created on secondaries, create it and
3277
  all its children.
3278

3279
  If not, just recurse to children keeping the same 'force' value.
3280

3281
  """
3282
  if device.CreateOnSecondary():
3283
    force = True
3284
  if device.children:
3285
    for child in device.children:
3286
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3287
                                        child, force, info):
3288
        return False
3289

    
3290
  if not force:
3291
    return True
3292
  lu.cfg.SetDiskID(device, node)
3293
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3294
                                       instance.name, False, info)
3295
  if new_id.failed or not new_id.data:
3296
    return False
3297
  if device.physical_id is None:
3298
    device.physical_id = new_id
3299
  return True
3300

    
3301

    
3302
def _GenerateUniqueNames(lu, exts):
3303
  """Generate a suitable LV name.
3304

3305
  This will generate a logical volume name for the given instance.
3306

3307
  """
3308
  results = []
3309
  for val in exts:
3310
    new_id = lu.cfg.GenerateUniqueID()
3311
    results.append("%s%s" % (new_id, val))
3312
  return results
3313

    
3314

    
3315
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3316
                         p_minor, s_minor):
3317
  """Generate a drbd8 device complete with its children.
3318

3319
  """
3320
  port = lu.cfg.AllocatePort()
3321
  vgname = lu.cfg.GetVGName()
3322
  shared_secret = lu.cfg.GenerateDRBDSecret()
3323
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3324
                          logical_id=(vgname, names[0]))
3325
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3326
                          logical_id=(vgname, names[1]))
3327
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3328
                          logical_id=(primary, secondary, port,
3329
                                      p_minor, s_minor,
3330
                                      shared_secret),
3331
                          children=[dev_data, dev_meta],
3332
                          iv_name=iv_name)
3333
  return drbd_dev
3334

    
3335

    
3336
def _GenerateDiskTemplate(lu, template_name,
3337
                          instance_name, primary_node,
3338
                          secondary_nodes, disk_info,
3339
                          file_storage_dir, file_driver,
3340
                          base_index):
3341
  """Generate the entire disk layout for a given template type.
3342

3343
  """
3344
  #TODO: compute space requirements
3345

    
3346
  vgname = lu.cfg.GetVGName()
3347
  disk_count = len(disk_info)
3348
  disks = []
3349
  if template_name == constants.DT_DISKLESS:
3350
    pass
3351
  elif template_name == constants.DT_PLAIN:
3352
    if len(secondary_nodes) != 0:
3353
      raise errors.ProgrammerError("Wrong template configuration")
3354

    
3355
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3356
                                      for i in range(disk_count)])
3357
    for idx, disk in enumerate(disk_info):
3358
      disk_index = idx + base_index
3359
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3360
                              logical_id=(vgname, names[idx]),
3361
                              iv_name="disk/%d" % disk_index)
3362
      disks.append(disk_dev)
3363
  elif template_name == constants.DT_DRBD8:
3364
    if len(secondary_nodes) != 1:
3365
      raise errors.ProgrammerError("Wrong template configuration")
3366
    remote_node = secondary_nodes[0]
3367
    minors = lu.cfg.AllocateDRBDMinor(
3368
      [primary_node, remote_node] * len(disk_info), instance_name)
3369

    
3370
    names = _GenerateUniqueNames(lu,
3371
                                 [".disk%d_%s" % (i, s)
3372
                                  for i in range(disk_count)
3373
                                  for s in ("data", "meta")
3374
                                  ])
3375
    for idx, disk in enumerate(disk_info):
3376
      disk_index = idx + base_index
3377
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3378
                                      disk["size"], names[idx*2:idx*2+2],
3379
                                      "disk/%d" % disk_index,
3380
                                      minors[idx*2], minors[idx*2+1])
3381
      disks.append(disk_dev)
3382
  elif template_name == constants.DT_FILE:
3383
    if len(secondary_nodes) != 0:
3384
      raise errors.ProgrammerError("Wrong template configuration")
3385

    
3386
    for idx, disk in enumerate(disk_info):
3387
      disk_index = idx + base_index
3388
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3389
                              iv_name="disk/%d" % disk_index,
3390
                              logical_id=(file_driver,
3391
                                          "%s/disk%d" % (file_storage_dir,
3392
                                                         idx)))
3393
      disks.append(disk_dev)
3394
  else:
3395
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3396
  return disks
3397

    
3398

    
3399
def _GetInstanceInfoText(instance):
3400
  """Compute that text that should be added to the disk's metadata.
3401

3402
  """
3403
  return "originstname+%s" % instance.name
3404

    
3405

    
3406
def _CreateDisks(lu, instance):
3407
  """Create all disks for an instance.
3408

3409
  This abstracts away some work from AddInstance.
3410

3411
  @type lu: L{LogicalUnit}
3412
  @param lu: the logical unit on whose behalf we execute
3413
  @type instance: L{objects.Instance}
3414
  @param instance: the instance whose disks we should create
3415
  @rtype: boolean
3416
  @return: the success of the creation
3417

3418
  """
3419
  info = _GetInstanceInfoText(instance)
3420

    
3421
  if instance.disk_template == constants.DT_FILE:
3422
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3423
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3424
                                                 file_storage_dir)
3425

    
3426
    if result.failed or not result.data:
3427
      logging.error("Could not connect to node '%s'", instance.primary_node)
3428
      return False
3429

    
3430
    if not result.data[0]:
3431
      logging.error("Failed to create directory '%s'", file_storage_dir)
3432
      return False
3433

    
3434
  # Note: this needs to be kept in sync with adding of disks in
3435
  # LUSetInstanceParams
3436
  for device in instance.disks:
3437
    logging.info("Creating volume %s for instance %s",
3438
                 device.iv_name, instance.name)
3439
    #HARDCODE
3440
    for secondary_node in instance.secondary_nodes:
3441
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3442
                                        device, False, info):
3443
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3444
                      device.iv_name, device, secondary_node)
3445
        return False
3446
    #HARDCODE
3447
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3448
                                    instance, device, info):
3449
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3450
      return False
3451

    
3452
  return True
3453

    
3454

    
3455
def _RemoveDisks(lu, instance):
3456
  """Remove all disks for an instance.
3457

3458
  This abstracts away some work from `AddInstance()` and
3459
  `RemoveInstance()`. Note that in case some of the devices couldn't
3460
  be removed, the removal will continue with the other ones (compare
3461
  with `_CreateDisks()`).
3462

3463
  @type lu: L{LogicalUnit}
3464
  @param lu: the logical unit on whose behalf we execute
3465
  @type instance: L{objects.Instance}
3466
  @param instance: the instance whose disks we should remove
3467
  @rtype: boolean
3468
  @return: the success of the removal
3469

3470
  """
3471
  logging.info("Removing block devices for instance %s", instance.name)
3472

    
3473
  result = True
3474
  for device in instance.disks:
3475
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3476
      lu.cfg.SetDiskID(disk, node)
3477
      result = lu.rpc.call_blockdev_remove(node, disk)
3478
      if result.failed or not result.data:
3479
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3480
                           " continuing anyway", device.iv_name, node)
3481
        result = False
3482

    
3483
  if instance.disk_template == constants.DT_FILE:
3484
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3485
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3486
                                                 file_storage_dir)
3487
    if result.failed or not result.data:
3488
      logging.error("Could not remove directory '%s'", file_storage_dir)
3489
      result = False
3490

    
3491
  return result
3492

    
3493

    
3494
def _ComputeDiskSize(disk_template, disks):
3495
  """Compute disk size requirements in the volume group
3496

3497
  """
3498
  # Required free disk space as a function of disk and swap space
3499
  req_size_dict = {
3500
    constants.DT_DISKLESS: None,
3501
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3502
    # 128 MB are added for drbd metadata for each disk
3503
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3504
    constants.DT_FILE: None,
3505
  }
3506

    
3507
  if disk_template not in req_size_dict:
3508
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3509
                                 " is unknown" %  disk_template)
3510

    
3511
  return req_size_dict[disk_template]
3512

    
3513

    
3514
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3515
  """Hypervisor parameter validation.
3516

3517
  This function abstract the hypervisor parameter validation to be
3518
  used in both instance create and instance modify.
3519

3520
  @type lu: L{LogicalUnit}
3521
  @param lu: the logical unit for which we check
3522
  @type nodenames: list
3523
  @param nodenames: the list of nodes on which we should check
3524
  @type hvname: string
3525
  @param hvname: the name of the hypervisor we should use
3526
  @type hvparams: dict
3527
  @param hvparams: the parameters which we need to check
3528
  @raise errors.OpPrereqError: if the parameters are not valid
3529

3530
  """
3531
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3532
                                                  hvname,
3533
                                                  hvparams)
3534
  for node in nodenames:
3535
    info = hvinfo[node]
3536
    info.Raise()
3537
    if not info.data or not isinstance(info.data, (tuple, list)):
3538
      raise errors.OpPrereqError("Cannot get current information"
3539
                                 " from node '%s' (%s)" % (node, info.data))
3540
    if not info.data[0]:
3541
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3542
                                 " %s" % info.data[1])
3543

    
3544

    
3545
class LUCreateInstance(LogicalUnit):
3546
  """Create an instance.
3547

3548
  """
3549
  HPATH = "instance-add"
3550
  HTYPE = constants.HTYPE_INSTANCE
3551
  _OP_REQP = ["instance_name", "disks", "disk_template",
3552
              "mode", "start",
3553
              "wait_for_sync", "ip_check", "nics",
3554
              "hvparams", "beparams"]
3555
  REQ_BGL = False
3556

    
3557
  def _ExpandNode(self, node):
3558
    """Expands and checks one node name.
3559

3560
    """
3561
    node_full = self.cfg.ExpandNodeName(node)
3562
    if node_full is None:
3563
      raise errors.OpPrereqError("Unknown node %s" % node)
3564
    return node_full
3565

    
3566
  def ExpandNames(self):
3567
    """ExpandNames for CreateInstance.
3568

3569
    Figure out the right locks for instance creation.
3570

3571
    """
3572
    self.needed_locks = {}
3573

    
3574
    # set optional parameters to none if they don't exist
3575
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3576
      if not hasattr(self.op, attr):
3577
        setattr(self.op, attr, None)
3578

    
3579
    # cheap checks, mostly valid constants given
3580

    
3581
    # verify creation mode
3582
    if self.op.mode not in (constants.INSTANCE_CREATE,
3583
                            constants.INSTANCE_IMPORT):
3584
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3585
                                 self.op.mode)
3586

    
3587
    # disk template and mirror node verification
3588
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3589
      raise errors.OpPrereqError("Invalid disk template name")
3590

    
3591
    if self.op.hypervisor is None:
3592
      self.op.hypervisor = self.cfg.GetHypervisorType()
3593

    
3594
    cluster = self.cfg.GetClusterInfo()
3595
    enabled_hvs = cluster.enabled_hypervisors
3596
    if self.op.hypervisor not in enabled_hvs:
3597
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3598
                                 " cluster (%s)" % (self.op.hypervisor,
3599
                                  ",".join(enabled_hvs)))
3600

    
3601
    # check hypervisor parameter syntax (locally)
3602

    
3603
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3604
                                  self.op.hvparams)
3605
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3606
    hv_type.CheckParameterSyntax(filled_hvp)
3607

    
3608
    # fill and remember the beparams dict
3609
    utils.CheckBEParams(self.op.beparams)
3610
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3611
                                    self.op.beparams)
3612

    
3613
    #### instance parameters check
3614

    
3615
    # instance name verification
3616
    hostname1 = utils.HostInfo(self.op.instance_name)
3617
    self.op.instance_name = instance_name = hostname1.name
3618

    
3619
    # this is just a preventive check, but someone might still add this
3620
    # instance in the meantime, and creation will fail at lock-add time
3621
    if instance_name in self.cfg.GetInstanceList():
3622
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3623
                                 instance_name)
3624

    
3625
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3626

    
3627
    # NIC buildup
3628
    self.nics = []
3629
    for nic in self.op.nics:
3630
      # ip validity checks
3631
      ip = nic.get("ip", None)
3632
      if ip is None or ip.lower() == "none":
3633
        nic_ip = None
3634
      elif ip.lower() == constants.VALUE_AUTO:
3635
        nic_ip = hostname1.ip
3636
      else:
3637
        if not utils.IsValidIP(ip):
3638
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3639
                                     " like a valid IP" % ip)
3640
        nic_ip = ip
3641

    
3642
      # MAC address verification
3643
      mac = nic.get("mac", constants.VALUE_AUTO)
3644
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3645
        if not utils.IsValidMac(mac.lower()):
3646
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3647
                                     mac)
3648
      # bridge verification
3649
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3650
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3651

    
3652
    # disk checks/pre-build
3653
    self.disks = []
3654
    for disk in self.op.disks:
3655
      mode = disk.get("mode", constants.DISK_RDWR)
3656
      if mode not in constants.DISK_ACCESS_SET:
3657
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3658
                                   mode)
3659
      size = disk.get("size", None)
3660
      if size is None:
3661
        raise errors.OpPrereqError("Missing disk size")
3662
      try:
3663
        size = int(size)
3664
      except ValueError:
3665
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3666
      self.disks.append({"size": size, "mode": mode})
3667

    
3668
    # used in CheckPrereq for ip ping check
3669
    self.check_ip = hostname1.ip
3670

    
3671
    # file storage checks
3672
    if (self.op.file_driver and
3673
        not self.op.file_driver in constants.FILE_DRIVER):
3674
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3675
                                 self.op.file_driver)
3676

    
3677
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3678
      raise errors.OpPrereqError("File storage directory path not absolute")
3679

    
3680
    ### Node/iallocator related checks
3681
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3682
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3683
                                 " node must be given")
3684

    
3685
    if self.op.iallocator:
3686
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3687
    else:
3688
      self.op.pnode = self._ExpandNode(self.op.pnode)
3689
      nodelist = [self.op.pnode]
3690
      if self.op.snode is not None:
3691
        self.op.snode = self._ExpandNode(self.op.snode)
3692
        nodelist.append(self.op.snode)
3693
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3694

    
3695
    # in case of import lock the source node too
3696
    if self.op.mode == constants.INSTANCE_IMPORT:
3697
      src_node = getattr(self.op, "src_node", None)
3698
      src_path = getattr(self.op, "src_path", None)
3699

    
3700
      if src_path is None:
3701
        self.op.src_path = src_path = self.op.instance_name
3702

    
3703
      if src_node is None:
3704
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3705
        self.op.src_node = None
3706
        if os.path.isabs(src_path):
3707
          raise errors.OpPrereqError("Importing an instance from an absolute"
3708
                                     " path requires a source node option.")
3709
      else:
3710
        self.op.src_node = src_node = self._ExpandNode(src_node)
3711
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3712
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
3713
        if not os.path.isabs(src_path):
3714
          self.op.src_path = src_path = \
3715
            os.path.join(constants.EXPORT_DIR, src_path)
3716

    
3717
    else: # INSTANCE_CREATE
3718
      if getattr(self.op, "os_type", None) is None:
3719
        raise errors.OpPrereqError("No guest OS specified")
3720

    
3721
  def _RunAllocator(self):
3722
    """Run the allocator based on input opcode.
3723

3724
    """
3725
    nics = [n.ToDict() for n in self.nics]
3726
    ial = IAllocator(self,
3727
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3728
                     name=self.op.instance_name,
3729
                     disk_template=self.op.disk_template,
3730
                     tags=[],
3731
                     os=self.op.os_type,
3732
                     vcpus=self.be_full[constants.BE_VCPUS],
3733
                     mem_size=self.be_full[constants.BE_MEMORY],
3734
                     disks=self.disks,
3735
                     nics=nics,
3736
                     hypervisor=self.op.hypervisor,
3737
                     )
3738

    
3739
    ial.Run(self.op.iallocator)
3740

    
3741
    if not ial.success:
3742
      raise errors.OpPrereqError("Can't compute nodes using"
3743
                                 " iallocator '%s': %s" % (self.op.iallocator,
3744
                                                           ial.info))
3745
    if len(ial.nodes) != ial.required_nodes:
3746
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3747
                                 " of nodes (%s), required %s" %
3748
                                 (self.op.iallocator, len(ial.nodes),
3749
                                  ial.required_nodes))
3750
    self.op.pnode = ial.nodes[0]
3751
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3752
                 self.op.instance_name, self.op.iallocator,
3753
                 ", ".join(ial.nodes))
3754
    if ial.required_nodes == 2:
3755
      self.op.snode = ial.nodes[1]
3756

    
3757
  def BuildHooksEnv(self):
3758
    """Build hooks env.
3759

3760
    This runs on master, primary and secondary nodes of the instance.
3761

3762
    """
3763
    env = {
3764
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3765
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3766
      "INSTANCE_ADD_MODE": self.op.mode,
3767
      }
3768
    if self.op.mode == constants.INSTANCE_IMPORT:
3769
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3770
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3771
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3772

    
3773
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3774
      primary_node=self.op.pnode,
3775
      secondary_nodes=self.secondaries,
3776
      status=self.instance_status,
3777
      os_type=self.op.os_type,
3778
      memory=self.be_full[constants.BE_MEMORY],
3779
      vcpus=self.be_full[constants.BE_VCPUS],
3780
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3781
    ))
3782

    
3783
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3784
          self.secondaries)
3785
    return env, nl, nl
3786

    
3787

    
3788
  def CheckPrereq(self):
3789
    """Check prerequisites.
3790

3791
    """
3792
    if (not self.cfg.GetVGName() and
3793
        self.op.disk_template not in constants.DTS_NOT_LVM):
3794
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3795
                                 " instances")
3796

    
3797

    
3798
    if self.op.mode == constants.INSTANCE_IMPORT:
3799
      src_node = self.op.src_node
3800
      src_path = self.op.src_path
3801

    
3802
      if src_node is None:
3803
        exp_list = self.rpc.call_export_list(
3804
          self.acquired_locks[locking.LEVEL_NODE])
3805
        found = False
3806
        for node in exp_list:
3807
          if not exp_list[node].failed and src_path in exp_list[node].data:
3808
            found = True
3809
            self.op.src_node = src_node = node
3810
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3811
                                                       src_path)
3812
            break
3813
        if not found:
3814
          raise errors.OpPrereqError("No export found for relative path %s" %
3815
                                      src_path)
3816

    
3817
      result = self.rpc.call_export_info(src_node, src_path)
3818
      result.Raise()
3819
      if not result.data:
3820
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3821

    
3822
      export_info = result.data
3823
      if not export_info.has_section(constants.INISECT_EXP):
3824
        raise errors.ProgrammerError("Corrupted export config")
3825

    
3826
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3827
      if (int(ei_version) != constants.EXPORT_VERSION):
3828
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3829
                                   (ei_version, constants.EXPORT_VERSION))
3830

    
3831
      # Check that the new instance doesn't have less disks than the export
3832
      instance_disks = len(self.disks)
3833
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3834
      if instance_disks < export_disks:
3835
        raise errors.OpPrereqError("Not enough disks to import."
3836
                                   " (instance: %d, export: %d)" %
3837
                                   (instance_disks, export_disks))
3838

    
3839
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3840
      disk_images = []
3841
      for idx in range(export_disks):
3842
        option = 'disk%d_dump' % idx
3843
        if export_info.has_option(constants.INISECT_INS, option):
3844
          # FIXME: are the old os-es, disk sizes, etc. useful?
3845
          export_name = export_info.get(constants.INISECT_INS, option)
3846
          image = os.path.join(src_path, export_name)
3847
          disk_images.append(image)
3848
        else:
3849
          disk_images.append(False)
3850

    
3851
      self.src_images = disk_images
3852

    
3853
      old_name = export_info.get(constants.INISECT_INS, 'name')
3854
      # FIXME: int() here could throw a ValueError on broken exports
3855
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3856
      if self.op.instance_name == old_name:
3857
        for idx, nic in enumerate(self.nics):
3858
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3859
            nic_mac_ini = 'nic%d_mac' % idx
3860
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3861

    
3862
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3863
    if self.op.start and not self.op.ip_check:
3864
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3865
                                 " adding an instance in start mode")
3866

    
3867
    if self.op.ip_check:
3868
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3869
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3870
                                   (self.check_ip, self.op.instance_name))
3871

    
3872
    #### allocator run
3873

    
3874
    if self.op.iallocator is not None:
3875
      self._RunAllocator()
3876

    
3877
    #### node related checks
3878

    
3879
    # check primary node
3880
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3881
    assert self.pnode is not None, \
3882
      "Cannot retrieve locked node %s" % self.op.pnode
3883
    self.secondaries = []
3884

    
3885
    # mirror node verification
3886
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3887
      if self.op.snode is None:
3888
        raise errors.OpPrereqError("The networked disk templates need"
3889
                                   " a mirror node")
3890
      if self.op.snode == pnode.name:
3891
        raise errors.OpPrereqError("The secondary node cannot be"
3892
                                   " the primary node.")
3893
      self.secondaries.append(self.op.snode)
3894

    
3895
    nodenames = [pnode.name] + self.secondaries
3896

    
3897
    req_size = _ComputeDiskSize(self.op.disk_template,
3898
                                self.disks)
3899

    
3900
    # Check lv size requirements
3901
    if req_size is not None:
3902
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3903
                                         self.op.hypervisor)
3904
      for node in nodenames:
3905
        info = nodeinfo[node]
3906
        info.Raise()
3907
        info = info.data
3908
        if not info:
3909
          raise errors.OpPrereqError("Cannot get current information"
3910
                                     " from node '%s'" % node)
3911
        vg_free = info.get('vg_free', None)
3912
        if not isinstance(vg_free, int):
3913
          raise errors.OpPrereqError("Can't compute free disk space on"
3914
                                     " node %s" % node)
3915
        if req_size > info['vg_free']:
3916
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3917
                                     " %d MB available, %d MB required" %
3918
                                     (node, info['vg_free'], req_size))
3919

    
3920
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3921

    
3922
    # os verification
3923
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3924
    result.Raise()
3925
    if not isinstance(result.data, objects.OS):
3926
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3927
                                 " primary node"  % self.op.os_type)
3928

    
3929
    # bridge check on primary node
3930
    bridges = [n.bridge for n in self.nics]
3931
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3932
    result.Raise()
3933
    if not result.data:
3934
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
3935
                                 " exist on destination node '%s'" %
3936
                                 (",".join(bridges), pnode.name))
3937

    
3938
    # memory check on primary node
3939
    if self.op.start:
3940
      _CheckNodeFreeMemory(self, self.pnode.name,
3941
                           "creating instance %s" % self.op.instance_name,
3942
                           self.be_full[constants.BE_MEMORY],
3943
                           self.op.hypervisor)
3944

    
3945
    if self.op.start:
3946
      self.instance_status = 'up'
3947
    else:
3948
      self.instance_status = 'down'
3949

    
3950
  def Exec(self, feedback_fn):
3951
    """Create and add the instance to the cluster.
3952

3953
    """
3954
    instance = self.op.instance_name
3955
    pnode_name = self.pnode.name
3956

    
3957
    for nic in self.nics:
3958
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3959
        nic.mac = self.cfg.GenerateMAC()
3960

    
3961
    ht_kind = self.op.hypervisor
3962
    if ht_kind in constants.HTS_REQ_PORT:
3963
      network_port = self.cfg.AllocatePort()
3964
    else:
3965
      network_port = None
3966

    
3967
    ##if self.op.vnc_bind_address is None:
3968
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3969

    
3970
    # this is needed because os.path.join does not accept None arguments
3971
    if self.op.file_storage_dir is None:
3972
      string_file_storage_dir = ""
3973
    else:
3974
      string_file_storage_dir = self.op.file_storage_dir
3975

    
3976
    # build the full file storage dir path
3977
    file_storage_dir = os.path.normpath(os.path.join(
3978
                                        self.cfg.GetFileStorageDir(),
3979
                                        string_file_storage_dir, instance))
3980

    
3981

    
3982
    disks = _GenerateDiskTemplate(self,
3983
                                  self.op.disk_template,
3984
                                  instance, pnode_name,
3985
                                  self.secondaries,
3986
                                  self.disks,
3987
                                  file_storage_dir,
3988
                                  self.op.file_driver,
3989
                                  0)
3990

    
3991
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3992
                            primary_node=pnode_name,
3993
                            nics=self.nics, disks=disks,
3994
                            disk_template=self.op.disk_template,
3995
                            status=self.instance_status,
3996
                            network_port=network_port,
3997
                            beparams=self.op.beparams,
3998
                            hvparams=self.op.hvparams,
3999
                            hypervisor=self.op.hypervisor,
4000
                            )
4001

    
4002
    feedback_fn("* creating instance disks...")
4003
    if not _CreateDisks(self, iobj):
4004
      _RemoveDisks(self, iobj)
4005
      self.cfg.ReleaseDRBDMinors(instance)
4006
      raise errors.OpExecError("Device creation failed, reverting...")
4007

    
4008
    feedback_fn("adding instance %s to cluster config" % instance)
4009

    
4010
    self.cfg.AddInstance(iobj)
4011
    # Declare that we don't want to remove the instance lock anymore, as we've
4012
    # added the instance to the config
4013
    del self.remove_locks[locking.LEVEL_INSTANCE]
4014
    # Remove the temp. assignements for the instance's drbds
4015
    self.cfg.ReleaseDRBDMinors(instance)
4016
    # Unlock all the nodes
4017
    if self.op.mode == constants.INSTANCE_IMPORT:
4018
      nodes_keep = [self.op.src_node]
4019
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4020
                       if node != self.op.src_node]
4021
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4022
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4023
    else:
4024
      self.context.glm.release(locking.LEVEL_NODE)
4025
      del self.acquired_locks[locking.LEVEL_NODE]
4026

    
4027
    if self.op.wait_for_sync:
4028
      disk_abort = not _WaitForSync(self, iobj)
4029
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4030
      # make sure the disks are not degraded (still sync-ing is ok)
4031
      time.sleep(15)
4032
      feedback_fn("* checking mirrors status")
4033
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4034
    else:
4035
      disk_abort = False
4036

    
4037
    if disk_abort:
4038
      _RemoveDisks(self, iobj)
4039
      self.cfg.RemoveInstance(iobj.name)
4040
      # Make sure the instance lock gets removed
4041
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4042
      raise errors.OpExecError("There are some degraded disks for"
4043
                               " this instance")
4044

    
4045
    feedback_fn("creating os for instance %s on node %s" %
4046
                (instance, pnode_name))
4047

    
4048
    if iobj.disk_template != constants.DT_DISKLESS:
4049
      if self.op.mode == constants.INSTANCE_CREATE:
4050
        feedback_fn("* running the instance OS create scripts...")
4051
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4052
        result.Raise()
4053
        if not result.data:
4054
          raise errors.OpExecError("Could not add os for instance %s"
4055
                                   " on node %s" %
4056
                                   (instance, pnode_name))
4057

    
4058
      elif self.op.mode == constants.INSTANCE_IMPORT:
4059
        feedback_fn("* running the instance OS import scripts...")
4060
        src_node = self.op.src_node
4061
        src_images = self.src_images
4062
        cluster_name = self.cfg.GetClusterName()
4063
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4064
                                                         src_node, src_images,
4065
                                                         cluster_name)
4066
        import_result.Raise()
4067
        for idx, result in enumerate(import_result.data):
4068
          if not result:
4069
            self.LogWarning("Could not import the image %s for instance"
4070
                            " %s, disk %d, on node %s" %
4071
                            (src_images[idx], instance, idx, pnode_name))
4072
      else:
4073
        # also checked in the prereq part
4074
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4075
                                     % self.op.mode)
4076

    
4077
    if self.op.start:
4078
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4079
      feedback_fn("* starting instance...")
4080
      result = self.rpc.call_instance_start(pnode_name, iobj, None)
4081
      result.Raise()
4082
      if not result.data:
4083
        raise errors.OpExecError("Could not start instance")
4084

    
4085

    
4086
class LUConnectConsole(NoHooksLU):
4087
  """Connect to an instance's console.
4088

4089
  This is somewhat special in that it returns the command line that
4090
  you need to run on the master node in order to connect to the
4091
  console.
4092

4093
  """
4094
  _OP_REQP = ["instance_name"]
4095
  REQ_BGL = False
4096

    
4097
  def ExpandNames(self):
4098
    self._ExpandAndLockInstance()
4099

    
4100
  def CheckPrereq(self):
4101
    """Check prerequisites.
4102

4103
    This checks that the instance is in the cluster.
4104

4105
    """
4106
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4107
    assert self.instance is not None, \
4108
      "Cannot retrieve locked instance %s" % self.op.instance_name
4109

    
4110
  def Exec(self, feedback_fn):
4111
    """Connect to the console of an instance
4112

4113
    """
4114
    instance = self.instance
4115
    node = instance.primary_node
4116

    
4117
    node_insts = self.rpc.call_instance_list([node],
4118
                                             [instance.hypervisor])[node]
4119
    node_insts.Raise()
4120

    
4121
    if instance.name not in node_insts.data:
4122
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4123

    
4124
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4125

    
4126
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4127
    console_cmd = hyper.GetShellCommandForConsole(instance)
4128

    
4129
    # build ssh cmdline
4130
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4131

    
4132

    
4133
class LUReplaceDisks(LogicalUnit):
4134
  """Replace the disks of an instance.
4135

4136
  """
4137
  HPATH = "mirrors-replace"
4138
  HTYPE = constants.HTYPE_INSTANCE
4139
  _OP_REQP = ["instance_name", "mode", "disks"]
4140
  REQ_BGL = False
4141

    
4142
  def ExpandNames(self):
4143
    self._ExpandAndLockInstance()
4144

    
4145
    if not hasattr(self.op, "remote_node"):
4146
      self.op.remote_node = None
4147

    
4148
    ia_name = getattr(self.op, "iallocator", None)
4149
    if ia_name is not None:
4150
      if self.op.remote_node is not None:
4151
        raise errors.OpPrereqError("Give either the iallocator or the new"
4152
                                   " secondary, not both")
4153
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4154
    elif self.op.remote_node is not None:
4155
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4156
      if remote_node is None:
4157
        raise errors.OpPrereqError("Node '%s' not known" %
4158
                                   self.op.remote_node)
4159
      self.op.remote_node = remote_node
4160
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4161
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4162
    else:
4163
      self.needed_locks[locking.LEVEL_NODE] = []
4164
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4165

    
4166
  def DeclareLocks(self, level):
4167
    # If we're not already locking all nodes in the set we have to declare the
4168
    # instance's primary/secondary nodes.
4169
    if (level == locking.LEVEL_NODE and
4170
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4171
      self._LockInstancesNodes()
4172

    
4173
  def _RunAllocator(self):
4174
    """Compute a new secondary node using an IAllocator.
4175

4176
    """
4177
    ial = IAllocator(self,
4178
                     mode=constants.IALLOCATOR_MODE_RELOC,
4179
                     name=self.op.instance_name,
4180
                     relocate_from=[self.sec_node])
4181

    
4182
    ial.Run(self.op.iallocator)
4183

    
4184
    if not ial.success:
4185
      raise errors.OpPrereqError("Can't compute nodes using"
4186
                                 " iallocator '%s': %s" % (self.op.iallocator,
4187
                                                           ial.info))
4188
    if len(ial.nodes) != ial.required_nodes:
4189
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4190
                                 " of nodes (%s), required %s" %
4191
                                 (len(ial.nodes), ial.required_nodes))
4192
    self.op.remote_node = ial.nodes[0]
4193
    self.LogInfo("Selected new secondary for the instance: %s",
4194
                 self.op.remote_node)
4195

    
4196
  def BuildHooksEnv(self):
4197
    """Build hooks env.
4198

4199
    This runs on the master, the primary and all the secondaries.
4200

4201
    """
4202
    env = {
4203
      "MODE": self.op.mode,
4204
      "NEW_SECONDARY": self.op.remote_node,
4205
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4206
      }
4207
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4208
    nl = [
4209
      self.cfg.GetMasterNode(),
4210
      self.instance.primary_node,
4211
      ]
4212
    if self.op.remote_node is not None:
4213
      nl.append(self.op.remote_node)
4214
    return env, nl, nl
4215

    
4216
  def CheckPrereq(self):
4217
    """Check prerequisites.
4218

4219
    This checks that the instance is in the cluster.
4220

4221
    """
4222
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4223
    assert instance is not None, \
4224
      "Cannot retrieve locked instance %s" % self.op.instance_name
4225
    self.instance = instance
4226

    
4227
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4228
      raise errors.OpPrereqError("Instance's disk layout is not"
4229
                                 " network mirrored.")
4230

    
4231
    if len(instance.secondary_nodes) != 1:
4232
      raise errors.OpPrereqError("The instance has a strange layout,"
4233
                                 " expected one secondary but found %d" %
4234
                                 len(instance.secondary_nodes))
4235

    
4236
    self.sec_node = instance.secondary_nodes[0]
4237

    
4238
    ia_name = getattr(self.op, "iallocator", None)
4239
    if ia_name is not None:
4240
      self._RunAllocator()
4241

    
4242
    remote_node = self.op.remote_node
4243
    if remote_node is not None:
4244
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4245
      assert self.remote_node_info is not None, \
4246
        "Cannot retrieve locked node %s" % remote_node
4247
    else:
4248
      self.remote_node_info = None
4249
    if remote_node == instance.primary_node:
4250
      raise errors.OpPrereqError("The specified node is the primary node of"
4251
                                 " the instance.")
4252
    elif remote_node == self.sec_node:
4253
      if self.op.mode == constants.REPLACE_DISK_SEC:
4254
        # this is for DRBD8, where we can't execute the same mode of
4255
        # replacement as for drbd7 (no different port allocated)
4256
        raise errors.OpPrereqError("Same secondary given, cannot execute"
4257
                                   " replacement")
4258
    if instance.disk_template == constants.DT_DRBD8:
4259
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4260
          remote_node is not None):
4261
        # switch to replace secondary mode
4262
        self.op.mode = constants.REPLACE_DISK_SEC
4263

    
4264
      if self.op.mode == constants.REPLACE_DISK_ALL:
4265
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4266
                                   " secondary disk replacement, not"
4267
                                   " both at once")
4268
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4269
        if remote_node is not None:
4270
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4271
                                     " the secondary while doing a primary"
4272
                                     " node disk replacement")
4273
        self.tgt_node = instance.primary_node
4274
        self.oth_node = instance.secondary_nodes[0]
4275
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4276
        self.new_node = remote_node # this can be None, in which case
4277
                                    # we don't change the secondary
4278
        self.tgt_node = instance.secondary_nodes[0]
4279
        self.oth_node = instance.primary_node
4280
      else:
4281
        raise errors.ProgrammerError("Unhandled disk replace mode")
4282

    
4283
    if not self.op.disks:
4284
      self.op.disks = range(len(instance.disks))
4285

    
4286
    for disk_idx in self.op.disks:
4287
      instance.FindDisk(disk_idx)
4288

    
4289
  def _ExecD8DiskOnly(self, feedback_fn):
4290
    """Replace a disk on the primary or secondary for dbrd8.
4291

4292
    The algorithm for replace is quite complicated:
4293

4294
      1. for each disk to be replaced:
4295

4296
        1. create new LVs on the target node with unique names
4297
        1. detach old LVs from the drbd device
4298
        1. rename old LVs to name_replaced.<time_t>
4299
        1. rename new LVs to old LVs
4300
        1. attach the new LVs (with the old names now) to the drbd device
4301

4302
      1. wait for sync across all devices
4303

4304
      1. for each modified disk:
4305

4306
        1. remove old LVs (which have the name name_replaces.<time_t>)
4307

4308
    Failures are not very well handled.
4309

4310
    """
4311
    steps_total = 6
4312
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4313
    instance = self.instance
4314
    iv_names = {}
4315
    vgname = self.cfg.GetVGName()
4316
    # start of work
4317
    cfg = self.cfg
4318
    tgt_node = self.tgt_node
4319
    oth_node = self.oth_node
4320

    
4321
    # Step: check device activation
4322
    self.proc.LogStep(1, steps_total, "check device existence")
4323
    info("checking volume groups")
4324
    my_vg = cfg.GetVGName()
4325
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4326
    if not results:
4327
      raise errors.OpExecError("Can't list volume groups on the nodes")
4328
    for node in oth_node, tgt_node:
4329
      res = results[node]
4330
      if res.failed or not res.data or my_vg not in res.data:
4331
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4332
                                 (my_vg, node))
4333
    for idx, dev in enumerate(instance.disks):
4334
      if idx not in self.op.disks:
4335
        continue
4336
      for node in tgt_node, oth_node:
4337
        info("checking disk/%d on %s" % (idx, node))
4338
        cfg.SetDiskID(dev, node)
4339
        if not self.rpc.call_blockdev_find(node, dev):
4340
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4341
                                   (idx, node))
4342

    
4343
    # Step: check other node consistency
4344
    self.proc.LogStep(2, steps_total, "check peer consistency")
4345
    for idx, dev in enumerate(instance.disks):
4346
      if idx not in self.op.disks:
4347
        continue
4348
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4349
      if not _CheckDiskConsistency(self, dev, oth_node,
4350
                                   oth_node==instance.primary_node):
4351
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4352
                                 " to replace disks on this node (%s)" %
4353
                                 (oth_node, tgt_node))
4354

    
4355
    # Step: create new storage
4356
    self.proc.LogStep(3, steps_total, "allocate new storage")
4357
    for idx, dev in enumerate(instance.disks):
4358
      if idx not in self.op.disks:
4359
        continue
4360
      size = dev.size
4361
      cfg.SetDiskID(dev, tgt_node)
4362
      lv_names = [".disk%d_%s" % (idx, suf)
4363
                  for suf in ["data", "meta"]]
4364
      names = _GenerateUniqueNames(self, lv_names)
4365
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4366
                             logical_id=(vgname, names[0]))
4367
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4368
                             logical_id=(vgname, names[1]))
4369
      new_lvs = [lv_data, lv_meta]
4370
      old_lvs = dev.children
4371
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4372
      info("creating new local storage on %s for %s" %
4373
           (tgt_node, dev.iv_name))
4374
      # since we *always* want to create this LV, we use the
4375
      # _Create...OnPrimary (which forces the creation), even if we
4376
      # are talking about the secondary node
4377
      for new_lv in new_lvs:
4378
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4379
                                        _GetInstanceInfoText(instance)):
4380
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4381
                                   " node '%s'" %
4382
                                   (new_lv.logical_id[1], tgt_node))
4383

    
4384
    # Step: for each lv, detach+rename*2+attach
4385
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4386
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4387
      info("detaching %s drbd from local storage" % dev.iv_name)
4388
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4389
      result.Raise()
4390
      if not result.data:
4391
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4392
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4393
      #dev.children = []
4394
      #cfg.Update(instance)
4395

    
4396
      # ok, we created the new LVs, so now we know we have the needed
4397
      # storage; as such, we proceed on the target node to rename
4398
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4399
      # using the assumption that logical_id == physical_id (which in
4400
      # turn is the unique_id on that node)
4401

    
4402
      # FIXME(iustin): use a better name for the replaced LVs
4403
      temp_suffix = int(time.time())
4404
      ren_fn = lambda d, suff: (d.physical_id[0],
4405
                                d.physical_id[1] + "_replaced-%s" % suff)
4406
      # build the rename list based on what LVs exist on the node
4407
      rlist = []
4408
      for to_ren in old_lvs:
4409
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4410
        if not find_res.failed and find_res.data is not None: # device exists
4411
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4412

    
4413
      info("renaming the old LVs on the target node")
4414
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4415
      result.Raise()
4416
      if not result.data:
4417
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4418
      # now we rename the new LVs to the old LVs
4419
      info("renaming the new LVs on the target node")
4420
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4421
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4422
      result.Raise()
4423
      if not result.data:
4424
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4425

    
4426
      for old, new in zip(old_lvs, new_lvs):
4427
        new.logical_id = old.logical_id
4428
        cfg.SetDiskID(new, tgt_node)
4429

    
4430
      for disk in old_lvs:
4431
        disk.logical_id = ren_fn(disk, temp_suffix)
4432
        cfg.SetDiskID(disk, tgt_node)
4433

    
4434
      # now that the new lvs have the old name, we can add them to the device
4435
      info("adding new mirror component on %s" % tgt_node)
4436
      result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4437
      if result.failed or not result.data:
4438
        for new_lv in new_lvs:
4439
          result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4440
          if result.failed or not result.data:
4441
            warning("Can't rollback device %s", hint="manually cleanup unused"
4442
                    " logical volumes")
4443
        raise errors.OpExecError("Can't add local storage to drbd")
4444

    
4445
      dev.children = new_lvs
4446
      cfg.Update(instance)
4447

    
4448
    # Step: wait for sync
4449

    
4450
    # this can fail as the old devices are degraded and _WaitForSync
4451
    # does a combined result over all disks, so we don't check its
4452
    # return value
4453
    self.proc.LogStep(5, steps_total, "sync devices")
4454
    _WaitForSync(self, instance, unlock=True)
4455

    
4456
    # so check manually all the devices
4457
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4458
      cfg.SetDiskID(dev, instance.primary_node)
4459
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4460
      if result.failed or result.data[5]:
4461
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4462

    
4463
    # Step: remove old storage
4464
    self.proc.LogStep(6, steps_total, "removing old storage")
4465
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4466
      info("remove logical volumes for %s" % name)
4467
      for lv in old_lvs:
4468
        cfg.SetDiskID(lv, tgt_node)
4469
        result = self.rpc.call_blockdev_remove(tgt_node, lv)
4470
        if result.failed or not result.data:
4471
          warning("Can't remove old LV", hint="manually remove unused LVs")
4472
          continue
4473

    
4474
  def _ExecD8Secondary(self, feedback_fn):
4475
    """Replace the secondary node for drbd8.
4476

4477
    The algorithm for replace is quite complicated:
4478
      - for all disks of the instance:
4479
        - create new LVs on the new node with same names
4480
        - shutdown the drbd device on the old secondary
4481
        - disconnect the drbd network on the primary
4482
        - create the drbd device on the new secondary
4483
        - network attach the drbd on the primary, using an artifice:
4484
          the drbd code for Attach() will connect to the network if it
4485
          finds a device which is connected to the good local disks but
4486
          not network enabled
4487
      - wait for sync across all devices
4488
      - remove all disks from the old secondary
4489

4490
    Failures are not very well handled.
4491

4492
    """
4493
    steps_total = 6
4494
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4495
    instance = self.instance
4496
    iv_names = {}
4497
    vgname = self.cfg.GetVGName()
4498
    # start of work
4499
    cfg = self.cfg
4500
    old_node = self.tgt_node
4501
    new_node = self.new_node
4502
    pri_node = instance.primary_node
4503

    
4504
    # Step: check device activation
4505
    self.proc.LogStep(1, steps_total, "check device existence")
4506
    info("checking volume groups")
4507
    my_vg = cfg.GetVGName()
4508
    results = self.rpc.call_vg_list([pri_node, new_node])
4509
    for node in pri_node, new_node:
4510
      res = results[node]
4511
      if res.failed or not res.data or my_vg not in res.data:
4512
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4513
                                 (my_vg, node))
4514
    for idx, dev in enumerate(instance.disks):
4515
      if idx not in self.op.disks:
4516
        continue
4517
      info("checking disk/%d on %s" % (idx, pri_node))
4518
      cfg.SetDiskID(dev, pri_node)
4519
      result = self.rpc.call_blockdev_find(pri_node, dev)
4520
      result.Raise()
4521
      if not result.data:
4522
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4523
                                 (idx, pri_node))
4524

    
4525
    # Step: check other node consistency
4526
    self.proc.LogStep(2, steps_total, "check peer consistency")
4527
    for idx, dev in enumerate(instance.disks):
4528
      if idx not in self.op.disks:
4529
        continue
4530
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4531
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4532
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4533
                                 " unsafe to replace the secondary" %
4534
                                 pri_node)
4535

    
4536
    # Step: create new storage
4537
    self.proc.LogStep(3, steps_total, "allocate new storage")
4538
    for idx, dev in enumerate(instance.disks):
4539
      size = dev.size
4540
      info("adding new local storage on %s for disk/%d" %
4541
           (new_node, idx))
4542
      # since we *always* want to create this LV, we use the
4543
      # _Create...OnPrimary (which forces the creation), even if we
4544
      # are talking about the secondary node
4545
      for new_lv in dev.children:
4546
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4547
                                        _GetInstanceInfoText(instance)):
4548
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4549
                                   " node '%s'" %
4550
                                   (new_lv.logical_id[1], new_node))
4551

    
4552
    # Step 4: dbrd minors and drbd setups changes
4553
    # after this, we must manually remove the drbd minors on both the
4554
    # error and the success paths
4555
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4556
                                   instance.name)
4557
    logging.debug("Allocated minors %s" % (minors,))
4558
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4559
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4560
      size = dev.size
4561
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4562
      # create new devices on new_node
4563
      if pri_node == dev.logical_id[0]:
4564
        new_logical_id = (pri_node, new_node,
4565
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4566
                          dev.logical_id[5])
4567
      else:
4568
        new_logical_id = (new_node, pri_node,
4569
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4570
                          dev.logical_id[5])
4571
      iv_names[idx] = (dev, dev.children, new_logical_id)
4572
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4573
                    new_logical_id)
4574
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4575
                              logical_id=new_logical_id,
4576
                              children=dev.children)
4577
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4578
                                        new_drbd, False,
4579
                                        _GetInstanceInfoText(instance)):
4580
        self.cfg.ReleaseDRBDMinors(instance.name)
4581
        raise errors.OpExecError("Failed to create new DRBD on"
4582
                                 " node '%s'" % new_node)
4583

    
4584
    for idx, dev in enumerate(instance.disks):
4585
      # we have new devices, shutdown the drbd on the old secondary
4586
      info("shutting down drbd for disk/%d on old node" % idx)
4587
      cfg.SetDiskID(dev, old_node)
4588
      result = self.rpc.call_blockdev_shutdown(old_node, dev)
4589
      if result.failed or not result.data:
4590
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4591
                hint="Please cleanup this device manually as soon as possible")
4592

    
4593
    info("detaching primary drbds from the network (=> standalone)")
4594
    done = 0
4595
    for idx, dev in enumerate(instance.disks):
4596
      cfg.SetDiskID(dev, pri_node)
4597
      # set the network part of the physical (unique in bdev terms) id
4598
      # to None, meaning detach from network
4599
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4600
      # and 'find' the device, which will 'fix' it to match the
4601
      # standalone state
4602
      result = self.rpc.call_blockdev_find(pri_node, dev)
4603
      if not result.failed and result.data:
4604
        done += 1
4605
      else:
4606
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4607
                idx)
4608

    
4609
    if not done:
4610
      # no detaches succeeded (very unlikely)
4611
      self.cfg.ReleaseDRBDMinors(instance.name)
4612
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4613

    
4614
    # if we managed to detach at least one, we update all the disks of
4615
    # the instance to point to the new secondary
4616
    info("updating instance configuration")
4617
    for dev, _, new_logical_id in iv_names.itervalues():
4618
      dev.logical_id = new_logical_id
4619
      cfg.SetDiskID(dev, pri_node)
4620
    cfg.Update(instance)
4621
    # we can remove now the temp minors as now the new values are
4622
    # written to the config file (and therefore stable)
4623
    self.cfg.ReleaseDRBDMinors(instance.name)
4624

    
4625
    # and now perform the drbd attach
4626
    info("attaching primary drbds to new secondary (standalone => connected)")
4627
    failures = []
4628
    for idx, dev in enumerate(instance.disks):
4629
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4630
      # since the attach is smart, it's enough to 'find' the device,
4631
      # it will automatically activate the network, if the physical_id
4632
      # is correct
4633
      cfg.SetDiskID(dev, pri_node)
4634
      logging.debug("Disk to attach: %s", dev)
4635
      result = self.rpc.call_blockdev_find(pri_node, dev)
4636
      if result.failed or not result.data:
4637
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4638
                "please do a gnt-instance info to see the status of disks")
4639

    
4640
    # this can fail as the old devices are degraded and _WaitForSync
4641
    # does a combined result over all disks, so we don't check its
4642
    # return value
4643
    self.proc.LogStep(5, steps_total, "sync devices")
4644
    _WaitForSync(self, instance, unlock=True)
4645

    
4646
    # so check manually all the devices
4647
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4648
      cfg.SetDiskID(dev, pri_node)
4649
      result = self.rpc.call_blockdev_find(pri_node, dev)
4650
      result.Raise()
4651
      if result.data[5]:
4652
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4653

    
4654
    self.proc.LogStep(6, steps_total, "removing old storage")
4655
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4656
      info("remove logical volumes for disk/%d" % idx)
4657
      for lv in old_lvs:
4658
        cfg.SetDiskID(lv, old_node)
4659
        result = self.rpc.call_blockdev_remove(old_node, lv)
4660
        if result.failed or not result.data:
4661
          warning("Can't remove LV on old secondary",
4662
                  hint="Cleanup stale volumes by hand")
4663

    
4664
  def Exec(self, feedback_fn):
4665
    """Execute disk replacement.
4666

4667
    This dispatches the disk replacement to the appropriate handler.
4668

4669
    """
4670
    instance = self.instance
4671

    
4672
    # Activate the instance disks if we're replacing them on a down instance
4673
    if instance.status == "down":
4674
      _StartInstanceDisks(self, instance, True)
4675

    
4676
    if instance.disk_template == constants.DT_DRBD8:
4677
      if self.op.remote_node is None:
4678
        fn = self._ExecD8DiskOnly
4679
      else:
4680
        fn = self._ExecD8Secondary
4681
    else:
4682
      raise errors.ProgrammerError("Unhandled disk replacement case")
4683

    
4684
    ret = fn(feedback_fn)
4685

    
4686
    # Deactivate the instance disks if we're replacing them on a down instance
4687
    if instance.status == "down":
4688
      _SafeShutdownInstanceDisks(self, instance)
4689

    
4690
    return ret
4691

    
4692

    
4693
class LUGrowDisk(LogicalUnit):
4694
  """Grow a disk of an instance.
4695

4696
  """
4697
  HPATH = "disk-grow"
4698
  HTYPE = constants.HTYPE_INSTANCE
4699
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4700
  REQ_BGL = False
4701

    
4702
  def ExpandNames(self):
4703
    self._ExpandAndLockInstance()
4704
    self.needed_locks[locking.LEVEL_NODE] = []
4705
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4706

    
4707
  def DeclareLocks(self, level):
4708
    if level == locking.LEVEL_NODE:
4709
      self._LockInstancesNodes()
4710

    
4711
  def BuildHooksEnv(self):
4712
    """Build hooks env.
4713

4714
    This runs on the master, the primary and all the secondaries.
4715

4716
    """
4717
    env = {
4718
      "DISK": self.op.disk,
4719
      "AMOUNT": self.op.amount,
4720
      }
4721
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4722
    nl = [
4723
      self.cfg.GetMasterNode(),
4724
      self.instance.primary_node,
4725
      ]
4726
    return env, nl, nl
4727

    
4728
  def CheckPrereq(self):
4729
    """Check prerequisites.
4730

4731
    This checks that the instance is in the cluster.
4732

4733
    """
4734
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4735
    assert instance is not None, \
4736
      "Cannot retrieve locked instance %s" % self.op.instance_name
4737

    
4738
    self.instance = instance
4739

    
4740
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4741
      raise errors.OpPrereqError("Instance's disk layout does not support"
4742
                                 " growing.")
4743

    
4744
    self.disk = instance.FindDisk(self.op.disk)
4745

    
4746
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4747
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4748
                                       instance.hypervisor)
4749
    for node in nodenames:
4750
      info = nodeinfo[node]
4751
      if info.failed or not info.data:
4752
        raise errors.OpPrereqError("Cannot get current information"
4753
                                   " from node '%s'" % node)
4754
      vg_free = info.data.get('vg_free', None)
4755
      if not isinstance(vg_free, int):
4756
        raise errors.OpPrereqError("Can't compute free disk space on"
4757
                                   " node %s" % node)
4758
      if self.op.amount > vg_free:
4759
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4760
                                   " %d MiB available, %d MiB required" %
4761
                                   (node, vg_free, self.op.amount))
4762

    
4763
  def Exec(self, feedback_fn):
4764
    """Execute disk grow.
4765

4766
    """
4767
    instance = self.instance
4768
    disk = self.disk
4769
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4770
      self.cfg.SetDiskID(disk, node)
4771
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4772
      result.Raise()
4773
      if (not result.data or not isinstance(result.data, (list, tuple)) or
4774
          len(result.data) != 2):
4775
        raise errors.OpExecError("Grow request failed to node %s" % node)
4776
      elif not result.data[0]:
4777
        raise errors.OpExecError("Grow request failed to node %s: %s" %
4778
                                 (node, result.data[1]))
4779
    disk.RecordGrow(self.op.amount)
4780
    self.cfg.Update(instance)
4781
    if self.op.wait_for_sync:
4782
      disk_abort = not _WaitForSync(self, instance)
4783
      if disk_abort:
4784
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4785
                             " status.\nPlease check the instance.")
4786

    
4787

    
4788
class LUQueryInstanceData(NoHooksLU):
4789
  """Query runtime instance data.
4790

4791
  """
4792
  _OP_REQP = ["instances", "static"]
4793
  REQ_BGL = False
4794

    
4795
  def ExpandNames(self):
4796
    self.needed_locks = {}
4797
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4798

    
4799
    if not isinstance(self.op.instances, list):
4800
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4801

    
4802
    if self.op.instances:
4803
      self.wanted_names = []
4804
      for name in self.op.instances:
4805
        full_name = self.cfg.ExpandInstanceName(name)
4806
        if full_name is None:
4807
          raise errors.OpPrereqError("Instance '%s' not known" %
4808
                                     self.op.instance_name)
4809
        self.wanted_names.append(full_name)
4810
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4811
    else:
4812
      self.wanted_names = None
4813
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4814

    
4815
    self.needed_locks[locking.LEVEL_NODE] = []
4816
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4817

    
4818
  def DeclareLocks(self, level):
4819
    if level == locking.LEVEL_NODE:
4820
      self._LockInstancesNodes()
4821

    
4822
  def CheckPrereq(self):
4823
    """Check prerequisites.
4824

4825
    This only checks the optional instance list against the existing names.
4826

4827
    """
4828
    if self.wanted_names is None:
4829
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4830

    
4831
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4832
                             in self.wanted_names]
4833
    return
4834

    
4835
  def _ComputeDiskStatus(self, instance, snode, dev):
4836
    """Compute block device status.
4837

4838
    """
4839
    static = self.op.static
4840
    if not static:
4841
      self.cfg.SetDiskID(dev, instance.primary_node)
4842
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4843
      dev_pstatus.Raise()
4844
      dev_pstatus = dev_pstatus.data
4845
    else:
4846
      dev_pstatus = None
4847

    
4848
    if dev.dev_type in constants.LDS_DRBD:
4849
      # we change the snode then (otherwise we use the one passed in)
4850
      if dev.logical_id[0] == instance.primary_node:
4851
        snode = dev.logical_id[1]
4852
      else:
4853
        snode = dev.logical_id[0]
4854

    
4855
    if snode and not static:
4856
      self.cfg.SetDiskID(dev, snode)
4857
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4858
      dev_sstatus.Raise()
4859
      dev_sstatus = dev_sstatus.data
4860
    else:
4861
      dev_sstatus = None
4862

    
4863
    if dev.children:
4864
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4865
                      for child in dev.children]
4866
    else:
4867
      dev_children = []
4868

    
4869
    data = {
4870
      "iv_name": dev.iv_name,
4871
      "dev_type": dev.dev_type,
4872
      "logical_id": dev.logical_id,
4873
      "physical_id": dev.physical_id,
4874
      "pstatus": dev_pstatus,
4875
      "sstatus": dev_sstatus,
4876
      "children": dev_children,
4877
      "mode": dev.mode,
4878
      }
4879

    
4880
    return data
4881

    
4882
  def Exec(self, feedback_fn):
4883
    """Gather and return data"""
4884
    result = {}
4885

    
4886
    cluster = self.cfg.GetClusterInfo()
4887

    
4888
    for instance in self.wanted_instances:
4889
      if not self.op.static:
4890
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4891
                                                  instance.name,
4892
                                                  instance.hypervisor)
4893
        remote_info.Raise()
4894
        remote_info = remote_info.data
4895
        if remote_info and "state" in remote_info:
4896
          remote_state = "up"
4897
        else:
4898
          remote_state = "down"
4899
      else:
4900
        remote_state = None
4901
      if instance.status == "down":
4902
        config_state = "down"
4903
      else:
4904
        config_state = "up"
4905

    
4906
      disks = [self._ComputeDiskStatus(instance, None, device)
4907
               for device in instance.disks]
4908

    
4909
      idict = {
4910
        "name": instance.name,
4911
        "config_state": config_state,
4912
        "run_state": remote_state,
4913
        "pnode": instance.primary_node,
4914
        "snodes": instance.secondary_nodes,
4915
        "os": instance.os,
4916
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4917
        "disks": disks,
4918
        "hypervisor": instance.hypervisor,
4919
        "network_port": instance.network_port,
4920
        "hv_instance": instance.hvparams,
4921
        "hv_actual": cluster.FillHV(instance),
4922
        "be_instance": instance.beparams,
4923
        "be_actual": cluster.FillBE(instance),
4924
        }
4925

    
4926
      result[instance.name] = idict
4927

    
4928
    return result
4929

    
4930

    
4931
class LUSetInstanceParams(LogicalUnit):
4932
  """Modifies an instances's parameters.
4933

4934
  """
4935
  HPATH = "instance-modify"
4936
  HTYPE = constants.HTYPE_INSTANCE
4937
  _OP_REQP = ["instance_name"]
4938
  REQ_BGL = False
4939

    
4940
  def CheckArguments(self):
4941
    if not hasattr(self.op, 'nics'):
4942
      self.op.nics = []
4943
    if not hasattr(self.op, 'disks'):
4944
      self.op.disks = []
4945
    if not hasattr(self.op, 'beparams'):
4946
      self.op.beparams = {}
4947
    if not hasattr(self.op, 'hvparams'):
4948
      self.op.hvparams = {}
4949
    self.op.force = getattr(self.op, "force", False)
4950
    if not (self.op.nics or self.op.disks or
4951
            self.op.hvparams or self.op.beparams):
4952
      raise errors.OpPrereqError("No changes submitted")
4953

    
4954
    utils.CheckBEParams(self.op.beparams)
4955

    
4956
    # Disk validation
4957
    disk_addremove = 0
4958
    for disk_op, disk_dict in self.op.disks:
4959
      if disk_op == constants.DDM_REMOVE:
4960
        disk_addremove += 1
4961
        continue
4962
      elif disk_op == constants.DDM_ADD:
4963
        disk_addremove += 1
4964
      else:
4965
        if not isinstance(disk_op, int):
4966
          raise errors.OpPrereqError("Invalid disk index")
4967
      if disk_op == constants.DDM_ADD:
4968
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4969
        if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4970
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4971
        size = disk_dict.get('size', None)
4972
        if size is None:
4973
          raise errors.OpPrereqError("Required disk parameter size missing")
4974
        try:
4975
          size = int(size)
4976
        except ValueError, err:
4977
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
4978
                                     str(err))
4979
        disk_dict['size'] = size
4980
      else:
4981
        # modification of disk
4982
        if 'size' in disk_dict:
4983
          raise errors.OpPrereqError("Disk size change not possible, use"
4984
                                     " grow-disk")
4985

    
4986
    if disk_addremove > 1:
4987
      raise errors.OpPrereqError("Only one disk add or remove operation"
4988
                                 " supported at a time")
4989

    
4990
    # NIC validation
4991
    nic_addremove = 0
4992
    for nic_op, nic_dict in self.op.nics:
4993
      if nic_op == constants.DDM_REMOVE:
4994
        nic_addremove += 1
4995
        continue
4996
      elif nic_op == constants.DDM_ADD:
4997
        nic_addremove += 1
4998
      else:
4999
        if not isinstance(nic_op, int):
5000
          raise errors.OpPrereqError("Invalid nic index")
5001

    
5002
      # nic_dict should be a dict
5003
      nic_ip = nic_dict.get('ip', None)
5004
      if nic_ip is not None:
5005
        if nic_ip.lower() == "none":
5006
          nic_dict['ip'] = None
5007
        else:
5008
          if not utils.IsValidIP(nic_ip):
5009
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5010
      # we can only check None bridges and assign the default one
5011
      nic_bridge = nic_dict.get('bridge', None)
5012
      if nic_bridge is None:
5013
        nic_dict['bridge'] = self.cfg.GetDefBridge()
5014
      # but we can validate MACs
5015
      nic_mac = nic_dict.get('mac', None)
5016
      if nic_mac is not None:
5017
        if self.cfg.IsMacInUse(nic_mac):
5018
          raise errors.OpPrereqError("MAC address %s already in use"
5019
                                     " in cluster" % nic_mac)
5020
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5021
          if not utils.IsValidMac(nic_mac):
5022
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5023
    if nic_addremove > 1:
5024
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5025
                                 " supported at a time")
5026

    
5027
  def ExpandNames(self):
5028
    self._ExpandAndLockInstance()
5029
    self.needed_locks[locking.LEVEL_NODE] = []
5030
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5031

    
5032
  def DeclareLocks(self, level):
5033
    if level == locking.LEVEL_NODE:
5034
      self._LockInstancesNodes()
5035

    
5036
  def BuildHooksEnv(self):
5037
    """Build hooks env.
5038

5039
    This runs on the master, primary and secondaries.
5040

5041
    """
5042
    args = dict()
5043
    if constants.BE_MEMORY in self.be_new:
5044
      args['memory'] = self.be_new[constants.BE_MEMORY]
5045
    if constants.BE_VCPUS in self.be_new:
5046
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5047
    # FIXME: readd disk/nic changes
5048
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5049
    nl = [self.cfg.GetMasterNode(),
5050
          self.instance.primary_node] + list(self.instance.secondary_nodes)
5051
    return env, nl, nl
5052

    
5053
  def CheckPrereq(self):
5054
    """Check prerequisites.
5055

5056
    This only checks the instance list against the existing names.
5057

5058
    """
5059
    force = self.force = self.op.force
5060

    
5061
    # checking the new params on the primary/secondary nodes
5062

    
5063
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5064
    assert self.instance is not None, \
5065
      "Cannot retrieve locked instance %s" % self.op.instance_name
5066
    pnode = self.instance.primary_node
5067
    nodelist = [pnode]
5068
    nodelist.extend(instance.secondary_nodes)
5069

    
5070
    # hvparams processing
5071
    if self.op.hvparams:
5072
      i_hvdict = copy.deepcopy(instance.hvparams)
5073
      for key, val in self.op.hvparams.iteritems():
5074
        if val == constants.VALUE_DEFAULT:
5075
          try:
5076
            del i_hvdict[key]
5077
          except KeyError:
5078
            pass
5079
        elif val == constants.VALUE_NONE:
5080
          i_hvdict[key] = None
5081
        else:
5082
          i_hvdict[key] = val
5083
      cluster = self.cfg.GetClusterInfo()
5084
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5085
                                i_hvdict)
5086
      # local check
5087
      hypervisor.GetHypervisor(
5088
        instance.hypervisor).CheckParameterSyntax(hv_new)
5089
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5090
      self.hv_new = hv_new # the new actual values
5091
      self.hv_inst = i_hvdict # the new dict (without defaults)
5092
    else:
5093
      self.hv_new = self.hv_inst = {}
5094

    
5095
    # beparams processing
5096
    if self.op.beparams:
5097
      i_bedict = copy.deepcopy(instance.beparams)
5098
      for key, val in self.op.beparams.iteritems():
5099
        if val == constants.VALUE_DEFAULT:
5100
          try:
5101
            del i_bedict[key]
5102
          except KeyError:
5103
            pass
5104
        else:
5105
          i_bedict[key] = val
5106
      cluster = self.cfg.GetClusterInfo()
5107
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5108
                                i_bedict)
5109
      self.be_new = be_new # the new actual values
5110
      self.be_inst = i_bedict # the new dict (without defaults)
5111
    else:
5112
      self.be_new = self.be_inst = {}
5113

    
5114
    self.warn = []
5115

    
5116
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5117
      mem_check_list = [pnode]
5118
      if be_new[constants.BE_AUTO_BALANCE]:
5119
        # either we changed auto_balance to yes or it was from before
5120
        mem_check_list.extend(instance.secondary_nodes)
5121
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5122
                                                  instance.hypervisor)
5123
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5124
                                         instance.hypervisor)
5125
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5126
        # Assume the primary node is unreachable and go ahead
5127
        self.warn.append("Can't get info from primary node %s" % pnode)
5128
      else:
5129
        if not instance_info.failed and instance_info.data:
5130
          current_mem = instance_info.data['memory']
5131
        else:
5132
          # Assume instance not running
5133
          # (there is a slight race condition here, but it's not very probable,
5134
          # and we have no other way to check)
5135
          current_mem = 0
5136
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5137
                    nodeinfo[pnode].data['memory_free'])
5138
        if miss_mem > 0:
5139
          raise errors.OpPrereqError("This change will prevent the instance"
5140
                                     " from starting, due to %d MB of memory"
5141
                                     " missing on its primary node" % miss_mem)
5142

    
5143
      if be_new[constants.BE_AUTO_BALANCE]:
5144
        for node, nres in instance.secondary_nodes.iteritems():
5145
          if nres.failed or not isinstance(nres.data, dict):
5146
            self.warn.append("Can't get info from secondary node %s" % node)
5147
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5148
            self.warn.append("Not enough memory to failover instance to"
5149
                             " secondary node %s" % node)
5150

    
5151
    # NIC processing
5152
    for nic_op, nic_dict in self.op.nics:
5153
      if nic_op == constants.DDM_REMOVE:
5154
        if not instance.nics:
5155
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5156
        continue
5157
      if nic_op != constants.DDM_ADD:
5158
        # an existing nic
5159
        if nic_op < 0 or nic_op >= len(instance.nics):
5160
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5161
                                     " are 0 to %d" %
5162
                                     (nic_op, len(instance.nics)))
5163
      nic_bridge = nic_dict.get('bridge', None)
5164
      if nic_bridge is not None:
5165
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5166
          msg = ("Bridge '%s' doesn't exist on one of"
5167
                 " the instance nodes" % nic_bridge)
5168
          if self.force:
5169
            self.warn.append(msg)
5170
          else:
5171
            raise errors.OpPrereqError(msg)
5172

    
5173
    # DISK processing
5174
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5175
      raise errors.OpPrereqError("Disk operations not supported for"
5176
                                 " diskless instances")
5177
    for disk_op, disk_dict in self.op.disks:
5178
      if disk_op == constants.DDM_REMOVE:
5179
        if len(instance.disks) == 1:
5180
          raise errors.OpPrereqError("Cannot remove the last disk of"
5181
                                     " an instance")
5182
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5183
        ins_l = ins_l[pnode]
5184
        if not type(ins_l) is list:
5185
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5186
        if instance.name in ins_l:
5187
          raise errors.OpPrereqError("Instance is running, can't remove"
5188
                                     " disks.")
5189

    
5190
      if (disk_op == constants.DDM_ADD and
5191
          len(instance.nics) >= constants.MAX_DISKS):
5192
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5193
                                   " add more" % constants.MAX_DISKS)
5194
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5195
        # an existing disk
5196
        if disk_op < 0 or disk_op >= len(instance.disks):
5197
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5198
                                     " are 0 to %d" %
5199
                                     (disk_op, len(instance.disks)))
5200

    
5201
    return
5202

    
5203
  def Exec(self, feedback_fn):
5204
    """Modifies an instance.
5205

5206
    All parameters take effect only at the next restart of the instance.
5207

5208
    """
5209
    # Process here the warnings from CheckPrereq, as we don't have a
5210
    # feedback_fn there.
5211
    for warn in self.warn:
5212
      feedback_fn("WARNING: %s" % warn)
5213

    
5214
    result = []
5215
    instance = self.instance
5216
    # disk changes
5217
    for disk_op, disk_dict in self.op.disks:
5218
      if disk_op == constants.DDM_REMOVE:
5219
        # remove the last disk
5220
        device = instance.disks.pop()
5221
        device_idx = len(instance.disks)
5222
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5223
          self.cfg.SetDiskID(disk, node)
5224
          result = self.rpc.call_blockdev_remove(node, disk)
5225
          if result.failed or not result.data:
5226
            self.proc.LogWarning("Could not remove disk/%d on node %s,"
5227
                                 " continuing anyway", device_idx, node)
5228
        result.append(("disk/%d" % device_idx, "remove"))
5229
      elif disk_op == constants.DDM_ADD:
5230
        # add a new disk
5231
        if instance.disk_template == constants.DT_FILE:
5232
          file_driver, file_path = instance.disks[0].logical_id
5233
          file_path = os.path.dirname(file_path)
5234
        else:
5235
          file_driver = file_path = None
5236
        disk_idx_base = len(instance.disks)
5237
        new_disk = _GenerateDiskTemplate(self,
5238
                                         instance.disk_template,
5239
                                         instance, instance.primary_node,
5240
                                         instance.secondary_nodes,
5241
                                         [disk_dict],
5242
                                         file_path,
5243
                                         file_driver,
5244
                                         disk_idx_base)[0]
5245
        new_disk.mode = disk_dict['mode']
5246
        instance.disks.append(new_disk)
5247
        info = _GetInstanceInfoText(instance)
5248

    
5249
        logging.info("Creating volume %s for instance %s",
5250
                     new_disk.iv_name, instance.name)
5251
        # Note: this needs to be kept in sync with _CreateDisks
5252
        #HARDCODE
5253
        for secondary_node in instance.secondary_nodes:
5254
          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5255
                                            new_disk, False, info):
5256
            self.LogWarning("Failed to create volume %s (%s) on"
5257
                            " secondary node %s!",
5258
                            new_disk.iv_name, new_disk, secondary_node)
5259
        #HARDCODE
5260
        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5261
                                        instance, new_disk, info):
5262
          self.LogWarning("Failed to create volume %s on primary!",
5263
                          new_disk.iv_name)
5264
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5265
                       (new_disk.size, new_disk.mode)))
5266
      else:
5267
        # change a given disk
5268
        instance.disks[disk_op].mode = disk_dict['mode']
5269
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5270
    # NIC changes
5271
    for nic_op, nic_dict in self.op.nics:
5272
      if nic_op == constants.DDM_REMOVE:
5273
        # remove the last nic
5274
        del instance.nics[-1]
5275
        result.append(("nic.%d" % len(instance.nics), "remove"))
5276
      elif nic_op == constants.DDM_ADD:
5277
        # add a new nic
5278
        if 'mac' not in nic_dict:
5279
          mac = constants.VALUE_GENERATE
5280
        else:
5281
          mac = nic_dict['mac']
5282
        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5283
          mac = self.cfg.GenerateMAC()
5284
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5285
                              bridge=nic_dict.get('bridge', None))
5286
        instance.nics.append(new_nic)
5287
        result.append(("nic.%d" % (len(instance.nics) - 1),
5288
                       "add:mac=%s,ip=%s,bridge=%s" %
5289
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
5290
      else:
5291
        # change a given nic
5292
        for key in 'mac', 'ip', 'bridge':
5293
          if key in nic_dict:
5294
            setattr(instance.nics[nic_op], key, nic_dict[key])
5295
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5296

    
5297
    # hvparams changes
5298
    if self.op.hvparams:
5299
      instance.hvparams = self.hv_new
5300
      for key, val in self.op.hvparams.iteritems():
5301
        result.append(("hv/%s" % key, val))
5302

    
5303
    # beparams changes
5304
    if self.op.beparams:
5305
      instance.beparams = self.be_inst
5306
      for key, val in self.op.beparams.iteritems():
5307
        result.append(("be/%s" % key, val))
5308

    
5309
    self.cfg.Update(instance)
5310

    
5311
    return result
5312

    
5313

    
5314
class LUQueryExports(NoHooksLU):
5315
  """Query the exports list
5316

5317
  """
5318
  _OP_REQP = ['nodes']
5319
  REQ_BGL = False
5320

    
5321
  def ExpandNames(self):
5322
    self.needed_locks = {}
5323
    self.share_locks[locking.LEVEL_NODE] = 1
5324
    if not self.op.nodes:
5325
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5326
    else:
5327
      self.needed_locks[locking.LEVEL_NODE] = \
5328
        _GetWantedNodes(self, self.op.nodes)
5329

    
5330
  def CheckPrereq(self):
5331
    """Check prerequisites.
5332

5333
    """
5334
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5335

    
5336
  def Exec(self, feedback_fn):
5337
    """Compute the list of all the exported system images.
5338

5339
    @rtype: dict
5340
    @return: a dictionary with the structure node->(export-list)
5341
        where export-list is a list of the instances exported on
5342
        that node.
5343

5344
    """
5345
    result = self.rpc.call_export_list(self.nodes)
5346
    result.Raise()
5347
    return result.data
5348

    
5349

    
5350
class LUExportInstance(LogicalUnit):
5351
  """Export an instance to an image in the cluster.
5352

5353
  """
5354
  HPATH = "instance-export"
5355
  HTYPE = constants.HTYPE_INSTANCE
5356
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
5357
  REQ_BGL = False
5358

    
5359
  def ExpandNames(self):
5360
    self._ExpandAndLockInstance()
5361
    # FIXME: lock only instance primary and destination node
5362
    #
5363
    # Sad but true, for now we have do lock all nodes, as we don't know where
5364
    # the previous export might be, and and in this LU we search for it and
5365
    # remove it from its current node. In the future we could fix this by:
5366
    #  - making a tasklet to search (share-lock all), then create the new one,
5367
    #    then one to remove, after
5368
    #  - removing the removal operation altoghether
5369
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5370

    
5371
  def DeclareLocks(self, level):
5372
    """Last minute lock declaration."""
5373
    # All nodes are locked anyway, so nothing to do here.
5374

    
5375
  def BuildHooksEnv(self):
5376
    """Build hooks env.
5377

5378
    This will run on the master, primary node and target node.
5379

5380
    """
5381
    env = {
5382
      "EXPORT_NODE": self.op.target_node,
5383
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5384
      }
5385
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5386
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5387
          self.op.target_node]
5388
    return env, nl, nl
5389

    
5390
  def CheckPrereq(self):
5391
    """Check prerequisites.
5392

5393
    This checks that the instance and node names are valid.
5394

5395
    """
5396
    instance_name = self.op.instance_name
5397
    self.instance = self.cfg.GetInstanceInfo(instance_name)
5398
    assert self.instance is not None, \
5399
          "Cannot retrieve locked instance %s" % self.op.instance_name
5400

    
5401
    self.dst_node = self.cfg.GetNodeInfo(
5402
      self.cfg.ExpandNodeName(self.op.target_node))
5403

    
5404
    if self.dst_node is None:
5405
      # This is wrong node name, not a non-locked node
5406
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5407

    
5408
    # instance disk type verification
5409
    for disk in self.instance.disks:
5410
      if disk.dev_type == constants.LD_FILE:
5411
        raise errors.OpPrereqError("Export not supported for instances with"
5412
                                   " file-based disks")
5413

    
5414
  def Exec(self, feedback_fn):
5415
    """Export an instance to an image in the cluster.
5416

5417
    """
5418
    instance = self.instance
5419
    dst_node = self.dst_node
5420
    src_node = instance.primary_node
5421
    if self.op.shutdown:
5422
      # shutdown the instance, but not the disks
5423
      result = self.rpc.call_instance_shutdown(src_node, instance)
5424
      result.Raise()
5425
      if not result.data:
5426
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5427
                                 (instance.name, src_node))
5428

    
5429
    vgname = self.cfg.GetVGName()
5430

    
5431
    snap_disks = []
5432

    
5433
    try:
5434
      for disk in instance.disks:
5435
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5436
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5437
        if new_dev_name.failed or not new_dev_name.data:
5438
          self.LogWarning("Could not snapshot block device %s on node %s",
5439
                          disk.logical_id[1], src_node)
5440
          snap_disks.append(False)
5441
        else:
5442
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5443
                                 logical_id=(vgname, new_dev_name.data),
5444
                                 physical_id=(vgname, new_dev_name.data),
5445
                                 iv_name=disk.iv_name)
5446
          snap_disks.append(new_dev)
5447

    
5448
    finally:
5449
      if self.op.shutdown and instance.status == "up":
5450
        result = self.rpc.call_instance_start(src_node, instance, None)
5451
        if result.failed or not result.data:
5452
          _ShutdownInstanceDisks(self, instance)
5453
          raise errors.OpExecError("Could not start instance")
5454

    
5455
    # TODO: check for size
5456

    
5457
    cluster_name = self.cfg.GetClusterName()
5458
    for idx, dev in enumerate(snap_disks):
5459
      if dev:
5460
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5461
                                               instance, cluster_name, idx)
5462
        if result.failed or not result.data:
5463
          self.LogWarning("Could not export block device %s from node %s to"
5464
                          " node %s", dev.logical_id[1], src_node,
5465
                          dst_node.name)
5466
        result = self.rpc.call_blockdev_remove(src_node, dev)
5467
        if result.failed or not result.data:
5468
          self.LogWarning("Could not remove snapshot block device %s from node"
5469
                          " %s", dev.logical_id[1], src_node)
5470

    
5471
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5472
    if result.failed or not result.data:
5473
      self.LogWarning("Could not finalize export for instance %s on node %s",
5474
                      instance.name, dst_node.name)
5475

    
5476
    nodelist = self.cfg.GetNodeList()
5477
    nodelist.remove(dst_node.name)
5478

    
5479
    # on one-node clusters nodelist will be empty after the removal
5480
    # if we proceed the backup would be removed because OpQueryExports
5481
    # substitutes an empty list with the full cluster node list.
5482
    if nodelist:
5483
      exportlist = self.rpc.call_export_list(nodelist)
5484
      for node in exportlist:
5485
        if exportlist[node].failed:
5486
          continue
5487
        if instance.name in exportlist[node].data:
5488
          if not self.rpc.call_export_remove(node, instance.name):
5489
            self.LogWarning("Could not remove older export for instance %s"
5490
                            " on node %s", instance.name, node)
5491

    
5492

    
5493
class LURemoveExport(NoHooksLU):
5494
  """Remove exports related to the named instance.
5495

5496
  """
5497
  _OP_REQP = ["instance_name"]
5498
  REQ_BGL = False
5499

    
5500
  def ExpandNames(self):
5501
    self.needed_locks = {}
5502
    # We need all nodes to be locked in order for RemoveExport to work, but we
5503
    # don't need to lock the instance itself, as nothing will happen to it (and
5504
    # we can remove exports also for a removed instance)
5505
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5506

    
5507
  def CheckPrereq(self):
5508
    """Check prerequisites.
5509
    """
5510
    pass
5511

    
5512
  def Exec(self, feedback_fn):
5513
    """Remove any export.
5514

5515
    """
5516
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5517
    # If the instance was not found we'll try with the name that was passed in.
5518
    # This will only work if it was an FQDN, though.
5519
    fqdn_warn = False
5520
    if not instance_name:
5521
      fqdn_warn = True
5522
      instance_name = self.op.instance_name
5523

    
5524
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5525
      locking.LEVEL_NODE])
5526
    found = False
5527
    for node in exportlist:
5528
      if exportlist[node].failed:
5529
        self.LogWarning("Failed to query node %s, continuing" % node)
5530
        continue
5531
      if instance_name in exportlist[node].data:
5532
        found = True
5533
        result = self.rpc.call_export_remove(node, instance_name)
5534
        if result.failed or not result.data:
5535
          logging.error("Could not remove export for instance %s"
5536
                        " on node %s", instance_name, node)
5537

    
5538
    if fqdn_warn and not found:
5539
      feedback_fn("Export not found. If trying to remove an export belonging"
5540
                  " to a deleted instance please use its Fully Qualified"
5541
                  " Domain Name.")
5542

    
5543

    
5544
class TagsLU(NoHooksLU):
5545
  """Generic tags LU.
5546

5547
  This is an abstract class which is the parent of all the other tags LUs.
5548

5549
  """
5550

    
5551
  def ExpandNames(self):
5552
    self.needed_locks = {}
5553
    if self.op.kind == constants.TAG_NODE:
5554
      name = self.cfg.ExpandNodeName(self.op.name)
5555
      if name is None:
5556
        raise errors.OpPrereqError("Invalid node name (%s)" %
5557
                                   (self.op.name,))
5558
      self.op.name = name
5559
      self.needed_locks[locking.LEVEL_NODE] = name
5560
    elif self.op.kind == constants.TAG_INSTANCE:
5561
      name = self.cfg.ExpandInstanceName(self.op.name)
5562
      if name is None:
5563
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5564
                                   (self.op.name,))
5565
      self.op.name = name
5566
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5567

    
5568
  def CheckPrereq(self):
5569
    """Check prerequisites.
5570

5571
    """
5572
    if self.op.kind == constants.TAG_CLUSTER:
5573
      self.target = self.cfg.GetClusterInfo()
5574
    elif self.op.kind == constants.TAG_NODE:
5575
      self.target = self.cfg.GetNodeInfo(self.op.name)
5576
    elif self.op.kind == constants.TAG_INSTANCE:
5577
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5578
    else:
5579
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5580
                                 str(self.op.kind))
5581

    
5582

    
5583
class LUGetTags(TagsLU):
5584
  """Returns the tags of a given object.
5585

5586
  """
5587
  _OP_REQP = ["kind", "name"]
5588
  REQ_BGL = False
5589

    
5590
  def Exec(self, feedback_fn):
5591
    """Returns the tag list.
5592

5593
    """
5594
    return list(self.target.GetTags())
5595

    
5596

    
5597
class LUSearchTags(NoHooksLU):
5598
  """Searches the tags for a given pattern.
5599

5600
  """
5601
  _OP_REQP = ["pattern"]
5602
  REQ_BGL = False
5603

    
5604
  def ExpandNames(self):
5605
    self.needed_locks = {}
5606

    
5607
  def CheckPrereq(self):
5608
    """Check prerequisites.
5609

5610
    This checks the pattern passed for validity by compiling it.
5611

5612
    """
5613
    try:
5614
      self.re = re.compile(self.op.pattern)
5615
    except re.error, err:
5616
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5617
                                 (self.op.pattern, err))
5618

    
5619
  def Exec(self, feedback_fn):
5620
    """Returns the tag list.
5621

5622
    """
5623
    cfg = self.cfg
5624
    tgts = [("/cluster", cfg.GetClusterInfo())]
5625
    ilist = cfg.GetAllInstancesInfo().values()
5626
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5627
    nlist = cfg.GetAllNodesInfo().values()
5628
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5629
    results = []
5630
    for path, target in tgts:
5631
      for tag in target.GetTags():
5632
        if self.re.search(tag):
5633
          results.append((path, tag))
5634
    return results
5635

    
5636

    
5637
class LUAddTags(TagsLU):
5638
  """Sets a tag on a given object.
5639

5640
  """
5641
  _OP_REQP = ["kind", "name", "tags"]
5642
  REQ_BGL = False
5643

    
5644
  def CheckPrereq(self):
5645
    """Check prerequisites.
5646

5647
    This checks the type and length of the tag name and value.
5648

5649
    """
5650
    TagsLU.CheckPrereq(self)
5651
    for tag in self.op.tags:
5652
      objects.TaggableObject.ValidateTag(tag)
5653

    
5654
  def Exec(self, feedback_fn):
5655
    """Sets the tag.
5656

5657
    """
5658
    try:
5659
      for tag in self.op.tags:
5660
        self.target.AddTag(tag)
5661
    except errors.TagError, err:
5662
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5663
    try:
5664
      self.cfg.Update(self.target)
5665
    except errors.ConfigurationError:
5666
      raise errors.OpRetryError("There has been a modification to the"
5667
                                " config file and the operation has been"
5668
                                " aborted. Please retry.")
5669

    
5670

    
5671
class LUDelTags(TagsLU):
5672
  """Delete a list of tags from a given object.
5673

5674
  """
5675
  _OP_REQP = ["kind", "name", "tags"]
5676
  REQ_BGL = False
5677

    
5678
  def CheckPrereq(self):
5679
    """Check prerequisites.
5680

5681
    This checks that we have the given tag.
5682

5683
    """
5684
    TagsLU.CheckPrereq(self)
5685
    for tag in self.op.tags:
5686
      objects.TaggableObject.ValidateTag(tag)
5687
    del_tags = frozenset(self.op.tags)
5688
    cur_tags = self.target.GetTags()
5689
    if not del_tags <= cur_tags:
5690
      diff_tags = del_tags - cur_tags
5691
      diff_names = ["'%s'" % tag for tag in diff_tags]
5692
      diff_names.sort()
5693
      raise errors.OpPrereqError("Tag(s) %s not found" %
5694
                                 (",".join(diff_names)))
5695

    
5696
  def Exec(self, feedback_fn):
5697
    """Remove the tag from the object.
5698

5699
    """
5700
    for tag in self.op.tags:
5701
      self.target.RemoveTag(tag)
5702
    try:
5703
      self.cfg.Update(self.target)
5704
    except errors.ConfigurationError:
5705
      raise errors.OpRetryError("There has been a modification to the"
5706
                                " config file and the operation has been"
5707
                                " aborted. Please retry.")
5708

    
5709

    
5710
class LUTestDelay(NoHooksLU):
5711
  """Sleep for a specified amount of time.
5712

5713
  This LU sleeps on the master and/or nodes for a specified amount of
5714
  time.
5715

5716
  """
5717
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5718
  REQ_BGL = False
5719

    
5720
  def ExpandNames(self):
5721
    """Expand names and set required locks.
5722

5723
    This expands the node list, if any.
5724

5725
    """
5726
    self.needed_locks = {}
5727
    if self.op.on_nodes:
5728
      # _GetWantedNodes can be used here, but is not always appropriate to use
5729
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5730
      # more information.
5731
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5732
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5733

    
5734
  def CheckPrereq(self):
5735
    """Check prerequisites.
5736

5737
    """
5738

    
5739
  def Exec(self, feedback_fn):
5740
    """Do the actual sleep.
5741

5742
    """
5743
    if self.op.on_master:
5744
      if not utils.TestDelay(self.op.duration):
5745
        raise errors.OpExecError("Error during master delay test")
5746
    if self.op.on_nodes:
5747
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5748
      if not result:
5749
        raise errors.OpExecError("Complete failure from rpc call")
5750
      for node, node_result in result.items():
5751
        node_result.Raise()
5752
        if not node_result.data:
5753
          raise errors.OpExecError("Failure during rpc call to node %s,"
5754
                                   " result: %s" % (node, node_result.data))
5755

    
5756

    
5757
class IAllocator(object):
5758
  """IAllocator framework.
5759

5760
  An IAllocator instance has three sets of attributes:
5761
    - cfg that is needed to query the cluster
5762
    - input data (all members of the _KEYS class attribute are required)
5763
    - four buffer attributes (in|out_data|text), that represent the
5764
      input (to the external script) in text and data structure format,
5765
      and the output from it, again in two formats
5766
    - the result variables from the script (success, info, nodes) for
5767
      easy usage
5768

5769
  """
5770
  _ALLO_KEYS = [
5771
    "mem_size", "disks", "disk_template",
5772
    "os", "tags", "nics", "vcpus", "hypervisor",
5773
    ]
5774
  _RELO_KEYS = [
5775
    "relocate_from",
5776
    ]
5777

    
5778
  def __init__(self, lu, mode, name, **kwargs):
5779
    self.lu = lu
5780
    # init buffer variables
5781
    self.in_text = self.out_text = self.in_data = self.out_data = None
5782
    # init all input fields so that pylint is happy
5783
    self.mode = mode
5784
    self.name = name
5785
    self.mem_size = self.disks = self.disk_template = None
5786
    self.os = self.tags = self.nics = self.vcpus = None
5787
    self.relocate_from = None
5788
    # computed fields
5789
    self.required_nodes = None
5790
    # init result fields
5791
    self.success = self.info = self.nodes = None
5792
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5793
      keyset = self._ALLO_KEYS
5794
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5795
      keyset = self._RELO_KEYS
5796
    else:
5797
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5798
                                   " IAllocator" % self.mode)
5799
    for key in kwargs:
5800
      if key not in keyset:
5801
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5802
                                     " IAllocator" % key)
5803
      setattr(self, key, kwargs[key])
5804
    for key in keyset:
5805
      if key not in kwargs:
5806
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5807
                                     " IAllocator" % key)
5808
    self._BuildInputData()
5809

    
5810
  def _ComputeClusterData(self):
5811
    """Compute the generic allocator input data.
5812

5813
    This is the data that is independent of the actual operation.
5814

5815
    """
5816
    cfg = self.lu.cfg
5817
    cluster_info = cfg.GetClusterInfo()
5818
    # cluster data
5819
    data = {
5820
      "version": 1,
5821
      "cluster_name": cfg.GetClusterName(),
5822
      "cluster_tags": list(cluster_info.GetTags()),
5823
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5824
      # we don't have job IDs
5825
      }
5826
    iinfo = cfg.GetAllInstancesInfo().values()
5827
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5828

    
5829
    # node data
5830
    node_results = {}
5831
    node_list = cfg.GetNodeList()
5832

    
5833
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5834
      hypervisor = self.hypervisor
5835
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5836
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5837

    
5838
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5839
                                           hypervisor)
5840
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5841
                       cluster_info.enabled_hypervisors)
5842
    for nname in node_list:
5843
      ninfo = cfg.GetNodeInfo(nname)
5844
      node_data[nname].Raise()
5845
      if not isinstance(node_data[nname].data, dict):
5846
        raise errors.OpExecError("Can't get data for node %s" % nname)
5847
      remote_info = node_data[nname].data
5848
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5849
                   'vg_size', 'vg_free', 'cpu_total']:
5850
        if attr not in remote_info:
5851
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5852
                                   (nname, attr))
5853
        try:
5854
          remote_info[attr] = int(remote_info[attr])
5855
        except ValueError, err:
5856
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5857
                                   " %s" % (nname, attr, str(err)))
5858
      # compute memory used by primary instances
5859
      i_p_mem = i_p_up_mem = 0
5860
      for iinfo, beinfo in i_list:
5861
        if iinfo.primary_node == nname:
5862
          i_p_mem += beinfo[constants.BE_MEMORY]
5863
          if iinfo.name not in node_iinfo[nname]:
5864
            i_used_mem = 0
5865
          else:
5866
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5867
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5868
          remote_info['memory_free'] -= max(0, i_mem_diff)
5869

    
5870
          if iinfo.status == "up":
5871
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5872

    
5873
      # compute memory used by instances
5874
      pnr = {
5875
        "tags": list(ninfo.GetTags()),
5876
        "total_memory": remote_info['memory_total'],
5877
        "reserved_memory": remote_info['memory_dom0'],
5878
        "free_memory": remote_info['memory_free'],
5879
        "i_pri_memory": i_p_mem,
5880
        "i_pri_up_memory": i_p_up_mem,
5881
        "total_disk": remote_info['vg_size'],
5882
        "free_disk": remote_info['vg_free'],
5883
        "primary_ip": ninfo.primary_ip,
5884
        "secondary_ip": ninfo.secondary_ip,
5885
        "total_cpus": remote_info['cpu_total'],
5886
        }
5887
      node_results[nname] = pnr
5888
    data["nodes"] = node_results
5889

    
5890
    # instance data
5891
    instance_data = {}
5892
    for iinfo, beinfo in i_list:
5893
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5894
                  for n in iinfo.nics]
5895
      pir = {
5896
        "tags": list(iinfo.GetTags()),
5897
        "should_run": iinfo.status == "up",
5898
        "vcpus": beinfo[constants.BE_VCPUS],
5899
        "memory": beinfo[constants.BE_MEMORY],
5900
        "os": iinfo.os,
5901
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5902
        "nics": nic_data,
5903
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5904
        "disk_template": iinfo.disk_template,
5905
        "hypervisor": iinfo.hypervisor,
5906
        }
5907
      instance_data[iinfo.name] = pir
5908

    
5909
    data["instances"] = instance_data
5910

    
5911
    self.in_data = data
5912

    
5913
  def _AddNewInstance(self):
5914
    """Add new instance data to allocator structure.
5915

5916
    This in combination with _AllocatorGetClusterData will create the
5917
    correct structure needed as input for the allocator.
5918

5919
    The checks for the completeness of the opcode must have already been
5920
    done.
5921

5922
    """
5923
    data = self.in_data
5924
    if len(self.disks) != 2:
5925
      raise errors.OpExecError("Only two-disk configurations supported")
5926

    
5927
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5928

    
5929
    if self.disk_template in constants.DTS_NET_MIRROR:
5930
      self.required_nodes = 2
5931
    else:
5932
      self.required_nodes = 1
5933
    request = {
5934
      "type": "allocate",
5935
      "name": self.name,
5936
      "disk_template": self.disk_template,
5937
      "tags": self.tags,
5938
      "os": self.os,
5939
      "vcpus": self.vcpus,
5940
      "memory": self.mem_size,
5941
      "disks": self.disks,
5942
      "disk_space_total": disk_space,
5943
      "nics": self.nics,
5944
      "required_nodes": self.required_nodes,
5945
      }
5946
    data["request"] = request
5947

    
5948
  def _AddRelocateInstance(self):
5949
    """Add relocate instance data to allocator structure.
5950

5951
    This in combination with _IAllocatorGetClusterData will create the
5952
    correct structure needed as input for the allocator.
5953

5954
    The checks for the completeness of the opcode must have already been
5955
    done.
5956

5957
    """
5958
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5959
    if instance is None:
5960
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5961
                                   " IAllocator" % self.name)
5962

    
5963
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5964
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5965

    
5966
    if len(instance.secondary_nodes) != 1:
5967
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5968

    
5969
    self.required_nodes = 1
5970
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
5971
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5972

    
5973
    request = {
5974
      "type": "relocate",
5975
      "name": self.name,
5976
      "disk_space_total": disk_space,
5977
      "required_nodes": self.required_nodes,
5978
      "relocate_from": self.relocate_from,
5979
      }
5980
    self.in_data["request"] = request
5981

    
5982
  def _BuildInputData(self):
5983
    """Build input data structures.
5984

5985
    """
5986
    self._ComputeClusterData()
5987

    
5988
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5989
      self._AddNewInstance()
5990
    else:
5991
      self._AddRelocateInstance()
5992

    
5993
    self.in_text = serializer.Dump(self.in_data)
5994

    
5995
  def Run(self, name, validate=True, call_fn=None):
5996
    """Run an instance allocator and return the results.
5997

5998
    """
5999
    if call_fn is None:
6000
      call_fn = self.lu.rpc.call_iallocator_runner
6001
    data = self.in_text
6002

    
6003
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6004
    result.Raise()
6005

    
6006
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6007
      raise errors.OpExecError("Invalid result from master iallocator runner")
6008

    
6009
    rcode, stdout, stderr, fail = result.data
6010

    
6011
    if rcode == constants.IARUN_NOTFOUND:
6012
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6013
    elif rcode == constants.IARUN_FAILURE:
6014
      raise errors.OpExecError("Instance allocator call failed: %s,"
6015
                               " output: %s" % (fail, stdout+stderr))
6016
    self.out_text = stdout
6017
    if validate:
6018
      self._ValidateResult()
6019

    
6020
  def _ValidateResult(self):
6021
    """Process the allocator results.
6022

6023
    This will process and if successful save the result in
6024
    self.out_data and the other parameters.
6025

6026
    """
6027
    try:
6028
      rdict = serializer.Load(self.out_text)
6029
    except Exception, err:
6030
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6031

    
6032
    if not isinstance(rdict, dict):
6033
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6034

    
6035
    for key in "success", "info", "nodes":
6036
      if key not in rdict:
6037
        raise errors.OpExecError("Can't parse iallocator results:"
6038
                                 " missing key '%s'" % key)
6039
      setattr(self, key, rdict[key])
6040

    
6041
    if not isinstance(rdict["nodes"], list):
6042
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6043
                               " is not a list")
6044
    self.out_data = rdict
6045

    
6046

    
6047
class LUTestAllocator(NoHooksLU):
6048
  """Run allocator tests.
6049

6050
  This LU runs the allocator tests
6051

6052
  """
6053
  _OP_REQP = ["direction", "mode", "name"]
6054

    
6055
  def CheckPrereq(self):
6056
    """Check prerequisites.
6057

6058
    This checks the opcode parameters depending on the director and mode test.
6059

6060
    """
6061
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6062
      for attr in ["name", "mem_size", "disks", "disk_template",
6063
                   "os", "tags", "nics", "vcpus"]:
6064
        if not hasattr(self.op, attr):
6065
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6066
                                     attr)
6067
      iname = self.cfg.ExpandInstanceName(self.op.name)
6068
      if iname is not None:
6069
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6070
                                   iname)
6071
      if not isinstance(self.op.nics, list):
6072
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6073
      for row in self.op.nics:
6074
        if (not isinstance(row, dict) or
6075
            "mac" not in row or
6076
            "ip" not in row or
6077
            "bridge" not in row):
6078
          raise errors.OpPrereqError("Invalid contents of the"
6079
                                     " 'nics' parameter")
6080
      if not isinstance(self.op.disks, list):
6081
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6082
      if len(self.op.disks) != 2:
6083
        raise errors.OpPrereqError("Only two-disk configurations supported")
6084
      for row in self.op.disks:
6085
        if (not isinstance(row, dict) or
6086
            "size" not in row or
6087
            not isinstance(row["size"], int) or
6088
            "mode" not in row or
6089
            row["mode"] not in ['r', 'w']):
6090
          raise errors.OpPrereqError("Invalid contents of the"
6091
                                     " 'disks' parameter")
6092
      if self.op.hypervisor is None:
6093
        self.op.hypervisor = self.cfg.GetHypervisorType()
6094
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6095
      if not hasattr(self.op, "name"):
6096
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6097
      fname = self.cfg.ExpandInstanceName(self.op.name)
6098
      if fname is None:
6099
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6100
                                   self.op.name)
6101
      self.op.name = fname
6102
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6103
    else:
6104
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6105
                                 self.op.mode)
6106

    
6107
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6108
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6109
        raise errors.OpPrereqError("Missing allocator name")
6110
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6111
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6112
                                 self.op.direction)
6113

    
6114
  def Exec(self, feedback_fn):
6115
    """Run the allocator test.
6116

6117
    """
6118
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6119
      ial = IAllocator(self,
6120
                       mode=self.op.mode,
6121
                       name=self.op.name,
6122
                       mem_size=self.op.mem_size,
6123
                       disks=self.op.disks,
6124
                       disk_template=self.op.disk_template,
6125
                       os=self.op.os,
6126
                       tags=self.op.tags,
6127
                       nics=self.op.nics,
6128
                       vcpus=self.op.vcpus,
6129
                       hypervisor=self.op.hypervisor,
6130
                       )
6131
    else:
6132
      ial = IAllocator(self,
6133
                       mode=self.op.mode,
6134
                       name=self.op.name,
6135
                       relocate_from=list(self.relocate_from),
6136
                       )
6137

    
6138
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6139
      result = ial.in_text
6140
    else:
6141
      ial.Run(self.op.allocator, validate=False)
6142
      result = ial.out_text
6143
    return result