Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ a5961235

History | View | Annotate | Download (215.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = lu.cfg.GetInstanceList()
396
  return utils.NiceSort(wanted)
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _CheckBooleanOpField(op, name):
419
  """Validates boolean opcode parameters.
420

421
  This will ensure that an opcode parameter is either a boolean value,
422
  or None (but that it always exists).
423

424
  """
425
  val = getattr(op, name, None)
426
  if not (val is None or isinstance(val, bool)):
427
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428
                               (name, str(val)))
429
  setattr(op, name, val)
430

    
431

    
432
def _CheckNodeOnline(lu, node):
433
  """Ensure that a given node is online.
434

435
  @param lu: the LU on behalf of which we make the check
436
  @param node: the node to check
437
  @raise errors.OpPrereqError: if the nodes is offline
438

439
  """
440
  if lu.cfg.GetNodeInfo(node).offline:
441
    raise errors.OpPrereqError("Can't use offline node %s" % node)
442

    
443

    
444
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445
                          memory, vcpus, nics):
446
  """Builds instance related env variables for hooks
447

448
  This builds the hook environment from individual variables.
449

450
  @type name: string
451
  @param name: the name of the instance
452
  @type primary_node: string
453
  @param primary_node: the name of the instance's primary node
454
  @type secondary_nodes: list
455
  @param secondary_nodes: list of secondary nodes as strings
456
  @type os_type: string
457
  @param os_type: the name of the instance's OS
458
  @type status: string
459
  @param status: the desired status of the instances
460
  @type memory: string
461
  @param memory: the memory size of the instance
462
  @type vcpus: string
463
  @param vcpus: the count of VCPUs the instance has
464
  @type nics: list
465
  @param nics: list of tuples (ip, bridge, mac) representing
466
      the NICs the instance  has
467
  @rtype: dict
468
  @return: the hook environment for this instance
469

470
  """
471
  env = {
472
    "OP_TARGET": name,
473
    "INSTANCE_NAME": name,
474
    "INSTANCE_PRIMARY": primary_node,
475
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
476
    "INSTANCE_OS_TYPE": os_type,
477
    "INSTANCE_STATUS": status,
478
    "INSTANCE_MEMORY": memory,
479
    "INSTANCE_VCPUS": vcpus,
480
  }
481

    
482
  if nics:
483
    nic_count = len(nics)
484
    for idx, (ip, bridge, mac) in enumerate(nics):
485
      if ip is None:
486
        ip = ""
487
      env["INSTANCE_NIC%d_IP" % idx] = ip
488
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
489
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
490
  else:
491
    nic_count = 0
492

    
493
  env["INSTANCE_NIC_COUNT"] = nic_count
494

    
495
  return env
496

    
497

    
498
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
499
  """Builds instance related env variables for hooks from an object.
500

501
  @type lu: L{LogicalUnit}
502
  @param lu: the logical unit on whose behalf we execute
503
  @type instance: L{objects.Instance}
504
  @param instance: the instance for which we should build the
505
      environment
506
  @type override: dict
507
  @param override: dictionary with key/values that will override
508
      our values
509
  @rtype: dict
510
  @return: the hook environment dictionary
511

512
  """
513
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
514
  args = {
515
    'name': instance.name,
516
    'primary_node': instance.primary_node,
517
    'secondary_nodes': instance.secondary_nodes,
518
    'os_type': instance.os,
519
    'status': instance.os,
520
    'memory': bep[constants.BE_MEMORY],
521
    'vcpus': bep[constants.BE_VCPUS],
522
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
523
  }
524
  if override:
525
    args.update(override)
526
  return _BuildInstanceHookEnv(**args)
527

    
528

    
529
def _AdjustCandidatePool(lu):
530
  """Adjust the candidate pool after node operations.
531

532
  """
533
  mod_list = lu.cfg.MaintainCandidatePool()
534
  if mod_list:
535
    lu.LogInfo("Promoted nodes to master candidate role: %s",
536
               ", ".join(mod_list))
537
    for name in mod_list:
538
      lu.context.ReaddNode(name)
539
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
540
  if mc_now > mc_max:
541
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
542
               (mc_now, mc_max))
543

    
544

    
545
def _CheckInstanceBridgesExist(lu, instance):
546
  """Check that the brigdes needed by an instance exist.
547

548
  """
549
  # check bridges existance
550
  brlist = [nic.bridge for nic in instance.nics]
551
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
552
  result.Raise()
553
  if not result.data:
554
    raise errors.OpPrereqError("One or more target bridges %s does not"
555
                               " exist on destination node '%s'" %
556
                               (brlist, instance.primary_node))
557

    
558

    
559
class LUDestroyCluster(NoHooksLU):
560
  """Logical unit for destroying the cluster.
561

562
  """
563
  _OP_REQP = []
564

    
565
  def CheckPrereq(self):
566
    """Check prerequisites.
567

568
    This checks whether the cluster is empty.
569

570
    Any errors are signalled by raising errors.OpPrereqError.
571

572
    """
573
    master = self.cfg.GetMasterNode()
574

    
575
    nodelist = self.cfg.GetNodeList()
576
    if len(nodelist) != 1 or nodelist[0] != master:
577
      raise errors.OpPrereqError("There are still %d node(s) in"
578
                                 " this cluster." % (len(nodelist) - 1))
579
    instancelist = self.cfg.GetInstanceList()
580
    if instancelist:
581
      raise errors.OpPrereqError("There are still %d instance(s) in"
582
                                 " this cluster." % len(instancelist))
583

    
584
  def Exec(self, feedback_fn):
585
    """Destroys the cluster.
586

587
    """
588
    master = self.cfg.GetMasterNode()
589
    result = self.rpc.call_node_stop_master(master, False)
590
    result.Raise()
591
    if not result.data:
592
      raise errors.OpExecError("Could not disable the master role")
593
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594
    utils.CreateBackup(priv_key)
595
    utils.CreateBackup(pub_key)
596
    return master
597

    
598

    
599
class LUVerifyCluster(LogicalUnit):
600
  """Verifies the cluster status.
601

602
  """
603
  HPATH = "cluster-verify"
604
  HTYPE = constants.HTYPE_CLUSTER
605
  _OP_REQP = ["skip_checks"]
606
  REQ_BGL = False
607

    
608
  def ExpandNames(self):
609
    self.needed_locks = {
610
      locking.LEVEL_NODE: locking.ALL_SET,
611
      locking.LEVEL_INSTANCE: locking.ALL_SET,
612
    }
613
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
614

    
615
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
616
                  node_result, feedback_fn, master_files):
617
    """Run multiple tests against a node.
618

619
    Test list:
620

621
      - compares ganeti version
622
      - checks vg existance and size > 20G
623
      - checks config file checksum
624
      - checks ssh to other nodes
625

626
    @type nodeinfo: L{objects.Node}
627
    @param nodeinfo: the node to check
628
    @param file_list: required list of files
629
    @param local_cksum: dictionary of local files and their checksums
630
    @param node_result: the results from the node
631
    @param feedback_fn: function used to accumulate results
632
    @param master_files: list of files that only masters should have
633

634
    """
635
    node = nodeinfo.name
636

    
637
    # main result, node_result should be a non-empty dict
638
    if not node_result or not isinstance(node_result, dict):
639
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
640
      return True
641

    
642
    # compares ganeti version
643
    local_version = constants.PROTOCOL_VERSION
644
    remote_version = node_result.get('version', None)
645
    if not remote_version:
646
      feedback_fn("  - ERROR: connection to %s failed" % (node))
647
      return True
648

    
649
    if local_version != remote_version:
650
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
651
                      (local_version, node, remote_version))
652
      return True
653

    
654
    # checks vg existance and size > 20G
655

    
656
    bad = False
657
    vglist = node_result.get(constants.NV_VGLIST, None)
658
    if not vglist:
659
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
660
                      (node,))
661
      bad = True
662
    else:
663
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
664
                                            constants.MIN_VG_SIZE)
665
      if vgstatus:
666
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
667
        bad = True
668

    
669
    # checks config file checksum
670

    
671
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
672
    if not isinstance(remote_cksum, dict):
673
      bad = True
674
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
675
    else:
676
      for file_name in file_list:
677
        node_is_mc = nodeinfo.master_candidate
678
        must_have_file = file_name not in master_files
679
        if file_name not in remote_cksum:
680
          if node_is_mc or must_have_file:
681
            bad = True
682
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
683
        elif remote_cksum[file_name] != local_cksum[file_name]:
684
          if node_is_mc or must_have_file:
685
            bad = True
686
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
687
          else:
688
            # not candidate and this is not a must-have file
689
            bad = True
690
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
691
                        " '%s'" % file_name)
692
        else:
693
          # all good, except non-master/non-must have combination
694
          if not node_is_mc and not must_have_file:
695
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
696
                        " candidates" % file_name)
697

    
698
    # checks ssh to any
699

    
700
    if constants.NV_NODELIST not in node_result:
701
      bad = True
702
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
703
    else:
704
      if node_result[constants.NV_NODELIST]:
705
        bad = True
706
        for node in node_result[constants.NV_NODELIST]:
707
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
708
                          (node, node_result[constants.NV_NODELIST][node]))
709

    
710
    if constants.NV_NODENETTEST not in node_result:
711
      bad = True
712
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
713
    else:
714
      if node_result[constants.NV_NODENETTEST]:
715
        bad = True
716
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
717
        for node in nlist:
718
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
719
                          (node, node_result[constants.NV_NODENETTEST][node]))
720

    
721
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
722
    if isinstance(hyp_result, dict):
723
      for hv_name, hv_result in hyp_result.iteritems():
724
        if hv_result is not None:
725
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
726
                      (hv_name, hv_result))
727
    return bad
728

    
729
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
730
                      node_instance, feedback_fn):
731
    """Verify an instance.
732

733
    This function checks to see if the required block devices are
734
    available on the instance's node.
735

736
    """
737
    bad = False
738

    
739
    node_current = instanceconfig.primary_node
740

    
741
    node_vol_should = {}
742
    instanceconfig.MapLVsByNode(node_vol_should)
743

    
744
    for node in node_vol_should:
745
      for volume in node_vol_should[node]:
746
        if node not in node_vol_is or volume not in node_vol_is[node]:
747
          feedback_fn("  - ERROR: volume %s missing on node %s" %
748
                          (volume, node))
749
          bad = True
750

    
751
    if not instanceconfig.status == 'down':
752
      if (node_current not in node_instance or
753
          not instance in node_instance[node_current]):
754
        feedback_fn("  - ERROR: instance %s not running on node %s" %
755
                        (instance, node_current))
756
        bad = True
757

    
758
    for node in node_instance:
759
      if (not node == node_current):
760
        if instance in node_instance[node]:
761
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
762
                          (instance, node))
763
          bad = True
764

    
765
    return bad
766

    
767
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
768
    """Verify if there are any unknown volumes in the cluster.
769

770
    The .os, .swap and backup volumes are ignored. All other volumes are
771
    reported as unknown.
772

773
    """
774
    bad = False
775

    
776
    for node in node_vol_is:
777
      for volume in node_vol_is[node]:
778
        if node not in node_vol_should or volume not in node_vol_should[node]:
779
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
780
                      (volume, node))
781
          bad = True
782
    return bad
783

    
784
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
785
    """Verify the list of running instances.
786

787
    This checks what instances are running but unknown to the cluster.
788

789
    """
790
    bad = False
791
    for node in node_instance:
792
      for runninginstance in node_instance[node]:
793
        if runninginstance not in instancelist:
794
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
795
                          (runninginstance, node))
796
          bad = True
797
    return bad
798

    
799
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
800
    """Verify N+1 Memory Resilience.
801

802
    Check that if one single node dies we can still start all the instances it
803
    was primary for.
804

805
    """
806
    bad = False
807

    
808
    for node, nodeinfo in node_info.iteritems():
809
      # This code checks that every node which is now listed as secondary has
810
      # enough memory to host all instances it is supposed to should a single
811
      # other node in the cluster fail.
812
      # FIXME: not ready for failover to an arbitrary node
813
      # FIXME: does not support file-backed instances
814
      # WARNING: we currently take into account down instances as well as up
815
      # ones, considering that even if they're down someone might want to start
816
      # them even in the event of a node failure.
817
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
818
        needed_mem = 0
819
        for instance in instances:
820
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
821
          if bep[constants.BE_AUTO_BALANCE]:
822
            needed_mem += bep[constants.BE_MEMORY]
823
        if nodeinfo['mfree'] < needed_mem:
824
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
825
                      " failovers should node %s fail" % (node, prinode))
826
          bad = True
827
    return bad
828

    
829
  def CheckPrereq(self):
830
    """Check prerequisites.
831

832
    Transform the list of checks we're going to skip into a set and check that
833
    all its members are valid.
834

835
    """
836
    self.skip_set = frozenset(self.op.skip_checks)
837
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
838
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
839

    
840
  def BuildHooksEnv(self):
841
    """Build hooks env.
842

843
    Cluster-Verify hooks just rone in the post phase and their failure makes
844
    the output be logged in the verify output and the verification to fail.
845

846
    """
847
    all_nodes = self.cfg.GetNodeList()
848
    # TODO: populate the environment with useful information for verify hooks
849
    env = {}
850
    return env, [], all_nodes
851

    
852
  def Exec(self, feedback_fn):
853
    """Verify integrity of cluster, performing various test on nodes.
854

855
    """
856
    bad = False
857
    feedback_fn("* Verifying global settings")
858
    for msg in self.cfg.VerifyConfig():
859
      feedback_fn("  - ERROR: %s" % msg)
860

    
861
    vg_name = self.cfg.GetVGName()
862
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
863
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
864
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
865
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
866
    i_non_redundant = [] # Non redundant instances
867
    i_non_a_balanced = [] # Non auto-balanced instances
868
    node_volume = {}
869
    node_instance = {}
870
    node_info = {}
871
    instance_cfg = {}
872

    
873
    # FIXME: verify OS list
874
    # do local checksums
875
    master_files = [constants.CLUSTER_CONF_FILE]
876

    
877
    file_names = ssconf.SimpleStore().GetFileList()
878
    file_names.append(constants.SSL_CERT_FILE)
879
    file_names.extend(master_files)
880

    
881
    local_checksums = utils.FingerprintFiles(file_names)
882

    
883
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
884
    node_verify_param = {
885
      constants.NV_FILELIST: file_names,
886
      constants.NV_NODELIST: nodelist,
887
      constants.NV_HYPERVISOR: hypervisors,
888
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
889
                                  node.secondary_ip) for node in nodeinfo],
890
      constants.NV_LVLIST: vg_name,
891
      constants.NV_INSTANCELIST: hypervisors,
892
      constants.NV_VGLIST: None,
893
      constants.NV_VERSION: None,
894
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
895
      }
896
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
897
                                           self.cfg.GetClusterName())
898

    
899
    cluster = self.cfg.GetClusterInfo()
900
    master_node = self.cfg.GetMasterNode()
901
    for node_i in nodeinfo:
902
      node = node_i.name
903
      nresult = all_nvinfo[node].data
904

    
905
      if node == master_node:
906
        ntype = "master"
907
      elif node_i.master_candidate:
908
        ntype = "master candidate"
909
      else:
910
        ntype = "regular"
911
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
912

    
913
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
914
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
915
        bad = True
916
        continue
917

    
918
      result = self._VerifyNode(node_i, file_names, local_checksums,
919
                                nresult, feedback_fn, master_files)
920
      bad = bad or result
921

    
922
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
923
      if isinstance(lvdata, basestring):
924
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
925
                    (node, lvdata.encode('string_escape')))
926
        bad = True
927
        node_volume[node] = {}
928
      elif not isinstance(lvdata, dict):
929
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
930
        bad = True
931
        continue
932
      else:
933
        node_volume[node] = lvdata
934

    
935
      # node_instance
936
      idata = nresult.get(constants.NV_INSTANCELIST, None)
937
      if not isinstance(idata, list):
938
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
939
                    (node,))
940
        bad = True
941
        continue
942

    
943
      node_instance[node] = idata
944

    
945
      # node_info
946
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
947
      if not isinstance(nodeinfo, dict):
948
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
949
        bad = True
950
        continue
951

    
952
      try:
953
        node_info[node] = {
954
          "mfree": int(nodeinfo['memory_free']),
955
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
956
          "pinst": [],
957
          "sinst": [],
958
          # dictionary holding all instances this node is secondary for,
959
          # grouped by their primary node. Each key is a cluster node, and each
960
          # value is a list of instances which have the key as primary and the
961
          # current node as secondary.  this is handy to calculate N+1 memory
962
          # availability if you can only failover from a primary to its
963
          # secondary.
964
          "sinst-by-pnode": {},
965
        }
966
      except ValueError:
967
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
968
        bad = True
969
        continue
970

    
971
    node_vol_should = {}
972

    
973
    for instance in instancelist:
974
      feedback_fn("* Verifying instance %s" % instance)
975
      inst_config = self.cfg.GetInstanceInfo(instance)
976
      result =  self._VerifyInstance(instance, inst_config, node_volume,
977
                                     node_instance, feedback_fn)
978
      bad = bad or result
979

    
980
      inst_config.MapLVsByNode(node_vol_should)
981

    
982
      instance_cfg[instance] = inst_config
983

    
984
      pnode = inst_config.primary_node
985
      if pnode in node_info:
986
        node_info[pnode]['pinst'].append(instance)
987
      else:
988
        feedback_fn("  - ERROR: instance %s, connection to primary node"
989
                    " %s failed" % (instance, pnode))
990
        bad = True
991

    
992
      # If the instance is non-redundant we cannot survive losing its primary
993
      # node, so we are not N+1 compliant. On the other hand we have no disk
994
      # templates with more than one secondary so that situation is not well
995
      # supported either.
996
      # FIXME: does not support file-backed instances
997
      if len(inst_config.secondary_nodes) == 0:
998
        i_non_redundant.append(instance)
999
      elif len(inst_config.secondary_nodes) > 1:
1000
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1001
                    % instance)
1002

    
1003
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1004
        i_non_a_balanced.append(instance)
1005

    
1006
      for snode in inst_config.secondary_nodes:
1007
        if snode in node_info:
1008
          node_info[snode]['sinst'].append(instance)
1009
          if pnode not in node_info[snode]['sinst-by-pnode']:
1010
            node_info[snode]['sinst-by-pnode'][pnode] = []
1011
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1012
        else:
1013
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1014
                      " %s failed" % (instance, snode))
1015

    
1016
    feedback_fn("* Verifying orphan volumes")
1017
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1018
                                       feedback_fn)
1019
    bad = bad or result
1020

    
1021
    feedback_fn("* Verifying remaining instances")
1022
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1023
                                         feedback_fn)
1024
    bad = bad or result
1025

    
1026
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1027
      feedback_fn("* Verifying N+1 Memory redundancy")
1028
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1029
      bad = bad or result
1030

    
1031
    feedback_fn("* Other Notes")
1032
    if i_non_redundant:
1033
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1034
                  % len(i_non_redundant))
1035

    
1036
    if i_non_a_balanced:
1037
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1038
                  % len(i_non_a_balanced))
1039

    
1040
    return not bad
1041

    
1042
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1043
    """Analize the post-hooks' result
1044

1045
    This method analyses the hook result, handles it, and sends some
1046
    nicely-formatted feedback back to the user.
1047

1048
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1049
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1050
    @param hooks_results: the results of the multi-node hooks rpc call
1051
    @param feedback_fn: function used send feedback back to the caller
1052
    @param lu_result: previous Exec result
1053
    @return: the new Exec result, based on the previous result
1054
        and hook results
1055

1056
    """
1057
    # We only really run POST phase hooks, and are only interested in
1058
    # their results
1059
    if phase == constants.HOOKS_PHASE_POST:
1060
      # Used to change hooks' output to proper indentation
1061
      indent_re = re.compile('^', re.M)
1062
      feedback_fn("* Hooks Results")
1063
      if not hooks_results:
1064
        feedback_fn("  - ERROR: general communication failure")
1065
        lu_result = 1
1066
      else:
1067
        for node_name in hooks_results:
1068
          show_node_header = True
1069
          res = hooks_results[node_name]
1070
          if res.failed or res.data is False or not isinstance(res.data, list):
1071
            feedback_fn("    Communication failure in hooks execution")
1072
            lu_result = 1
1073
            continue
1074
          for script, hkr, output in res.data:
1075
            if hkr == constants.HKR_FAIL:
1076
              # The node header is only shown once, if there are
1077
              # failing hooks on that node
1078
              if show_node_header:
1079
                feedback_fn("  Node %s:" % node_name)
1080
                show_node_header = False
1081
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1082
              output = indent_re.sub('      ', output)
1083
              feedback_fn("%s" % output)
1084
              lu_result = 1
1085

    
1086
      return lu_result
1087

    
1088

    
1089
class LUVerifyDisks(NoHooksLU):
1090
  """Verifies the cluster disks status.
1091

1092
  """
1093
  _OP_REQP = []
1094
  REQ_BGL = False
1095

    
1096
  def ExpandNames(self):
1097
    self.needed_locks = {
1098
      locking.LEVEL_NODE: locking.ALL_SET,
1099
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1100
    }
1101
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1102

    
1103
  def CheckPrereq(self):
1104
    """Check prerequisites.
1105

1106
    This has no prerequisites.
1107

1108
    """
1109
    pass
1110

    
1111
  def Exec(self, feedback_fn):
1112
    """Verify integrity of cluster disks.
1113

1114
    """
1115
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1116

    
1117
    vg_name = self.cfg.GetVGName()
1118
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1119
    instances = [self.cfg.GetInstanceInfo(name)
1120
                 for name in self.cfg.GetInstanceList()]
1121

    
1122
    nv_dict = {}
1123
    for inst in instances:
1124
      inst_lvs = {}
1125
      if (inst.status != "up" or
1126
          inst.disk_template not in constants.DTS_NET_MIRROR):
1127
        continue
1128
      inst.MapLVsByNode(inst_lvs)
1129
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1130
      for node, vol_list in inst_lvs.iteritems():
1131
        for vol in vol_list:
1132
          nv_dict[(node, vol)] = inst
1133

    
1134
    if not nv_dict:
1135
      return result
1136

    
1137
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1138

    
1139
    to_act = set()
1140
    for node in nodes:
1141
      # node_volume
1142
      lvs = node_lvs[node]
1143
      if lvs.failed:
1144
        self.LogWarning("Connection to node %s failed: %s" %
1145
                        (node, lvs.data))
1146
        continue
1147
      lvs = lvs.data
1148
      if isinstance(lvs, basestring):
1149
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1150
        res_nlvm[node] = lvs
1151
      elif not isinstance(lvs, dict):
1152
        logging.warning("Connection to node %s failed or invalid data"
1153
                        " returned", node)
1154
        res_nodes.append(node)
1155
        continue
1156

    
1157
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1158
        inst = nv_dict.pop((node, lv_name), None)
1159
        if (not lv_online and inst is not None
1160
            and inst.name not in res_instances):
1161
          res_instances.append(inst.name)
1162

    
1163
    # any leftover items in nv_dict are missing LVs, let's arrange the
1164
    # data better
1165
    for key, inst in nv_dict.iteritems():
1166
      if inst.name not in res_missing:
1167
        res_missing[inst.name] = []
1168
      res_missing[inst.name].append(key)
1169

    
1170
    return result
1171

    
1172

    
1173
class LURenameCluster(LogicalUnit):
1174
  """Rename the cluster.
1175

1176
  """
1177
  HPATH = "cluster-rename"
1178
  HTYPE = constants.HTYPE_CLUSTER
1179
  _OP_REQP = ["name"]
1180

    
1181
  def BuildHooksEnv(self):
1182
    """Build hooks env.
1183

1184
    """
1185
    env = {
1186
      "OP_TARGET": self.cfg.GetClusterName(),
1187
      "NEW_NAME": self.op.name,
1188
      }
1189
    mn = self.cfg.GetMasterNode()
1190
    return env, [mn], [mn]
1191

    
1192
  def CheckPrereq(self):
1193
    """Verify that the passed name is a valid one.
1194

1195
    """
1196
    hostname = utils.HostInfo(self.op.name)
1197

    
1198
    new_name = hostname.name
1199
    self.ip = new_ip = hostname.ip
1200
    old_name = self.cfg.GetClusterName()
1201
    old_ip = self.cfg.GetMasterIP()
1202
    if new_name == old_name and new_ip == old_ip:
1203
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1204
                                 " cluster has changed")
1205
    if new_ip != old_ip:
1206
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1207
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1208
                                   " reachable on the network. Aborting." %
1209
                                   new_ip)
1210

    
1211
    self.op.name = new_name
1212

    
1213
  def Exec(self, feedback_fn):
1214
    """Rename the cluster.
1215

1216
    """
1217
    clustername = self.op.name
1218
    ip = self.ip
1219

    
1220
    # shutdown the master IP
1221
    master = self.cfg.GetMasterNode()
1222
    result = self.rpc.call_node_stop_master(master, False)
1223
    if result.failed or not result.data:
1224
      raise errors.OpExecError("Could not disable the master role")
1225

    
1226
    try:
1227
      cluster = self.cfg.GetClusterInfo()
1228
      cluster.cluster_name = clustername
1229
      cluster.master_ip = ip
1230
      self.cfg.Update(cluster)
1231

    
1232
      # update the known hosts file
1233
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1234
      node_list = self.cfg.GetNodeList()
1235
      try:
1236
        node_list.remove(master)
1237
      except ValueError:
1238
        pass
1239
      result = self.rpc.call_upload_file(node_list,
1240
                                         constants.SSH_KNOWN_HOSTS_FILE)
1241
      for to_node, to_result in result.iteritems():
1242
        if to_result.failed or not to_result.data:
1243
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1244

    
1245
    finally:
1246
      result = self.rpc.call_node_start_master(master, False)
1247
      if result.failed or not result.data:
1248
        self.LogWarning("Could not re-enable the master role on"
1249
                        " the master, please restart manually.")
1250

    
1251

    
1252
def _RecursiveCheckIfLVMBased(disk):
1253
  """Check if the given disk or its children are lvm-based.
1254

1255
  @type disk: L{objects.Disk}
1256
  @param disk: the disk to check
1257
  @rtype: booleean
1258
  @return: boolean indicating whether a LD_LV dev_type was found or not
1259

1260
  """
1261
  if disk.children:
1262
    for chdisk in disk.children:
1263
      if _RecursiveCheckIfLVMBased(chdisk):
1264
        return True
1265
  return disk.dev_type == constants.LD_LV
1266

    
1267

    
1268
class LUSetClusterParams(LogicalUnit):
1269
  """Change the parameters of the cluster.
1270

1271
  """
1272
  HPATH = "cluster-modify"
1273
  HTYPE = constants.HTYPE_CLUSTER
1274
  _OP_REQP = []
1275
  REQ_BGL = False
1276

    
1277
  def CheckParameters(self):
1278
    """Check parameters
1279

1280
    """
1281
    if not hasattr(self.op, "candidate_pool_size"):
1282
      self.op.candidate_pool_size = None
1283
    if self.op.candidate_pool_size is not None:
1284
      try:
1285
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1286
      except ValueError, err:
1287
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1288
                                   str(err))
1289
      if self.op.candidate_pool_size < 1:
1290
        raise errors.OpPrereqError("At least one master candidate needed")
1291

    
1292
  def ExpandNames(self):
1293
    # FIXME: in the future maybe other cluster params won't require checking on
1294
    # all nodes to be modified.
1295
    self.needed_locks = {
1296
      locking.LEVEL_NODE: locking.ALL_SET,
1297
    }
1298
    self.share_locks[locking.LEVEL_NODE] = 1
1299

    
1300
  def BuildHooksEnv(self):
1301
    """Build hooks env.
1302

1303
    """
1304
    env = {
1305
      "OP_TARGET": self.cfg.GetClusterName(),
1306
      "NEW_VG_NAME": self.op.vg_name,
1307
      }
1308
    mn = self.cfg.GetMasterNode()
1309
    return env, [mn], [mn]
1310

    
1311
  def CheckPrereq(self):
1312
    """Check prerequisites.
1313

1314
    This checks whether the given params don't conflict and
1315
    if the given volume group is valid.
1316

1317
    """
1318
    # FIXME: This only works because there is only one parameter that can be
1319
    # changed or removed.
1320
    if self.op.vg_name is not None and not self.op.vg_name:
1321
      instances = self.cfg.GetAllInstancesInfo().values()
1322
      for inst in instances:
1323
        for disk in inst.disks:
1324
          if _RecursiveCheckIfLVMBased(disk):
1325
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1326
                                       " lvm-based instances exist")
1327

    
1328
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1329

    
1330
    # if vg_name not None, checks given volume group on all nodes
1331
    if self.op.vg_name:
1332
      vglist = self.rpc.call_vg_list(node_list)
1333
      for node in node_list:
1334
        if vglist[node].failed:
1335
          # ignoring down node
1336
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1337
          continue
1338
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1339
                                              self.op.vg_name,
1340
                                              constants.MIN_VG_SIZE)
1341
        if vgstatus:
1342
          raise errors.OpPrereqError("Error on node '%s': %s" %
1343
                                     (node, vgstatus))
1344

    
1345
    self.cluster = cluster = self.cfg.GetClusterInfo()
1346
    # validate beparams changes
1347
    if self.op.beparams:
1348
      utils.CheckBEParams(self.op.beparams)
1349
      self.new_beparams = cluster.FillDict(
1350
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1351

    
1352
    # hypervisor list/parameters
1353
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1354
    if self.op.hvparams:
1355
      if not isinstance(self.op.hvparams, dict):
1356
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1357
      for hv_name, hv_dict in self.op.hvparams.items():
1358
        if hv_name not in self.new_hvparams:
1359
          self.new_hvparams[hv_name] = hv_dict
1360
        else:
1361
          self.new_hvparams[hv_name].update(hv_dict)
1362

    
1363
    if self.op.enabled_hypervisors is not None:
1364
      self.hv_list = self.op.enabled_hypervisors
1365
    else:
1366
      self.hv_list = cluster.enabled_hypervisors
1367

    
1368
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1369
      # either the enabled list has changed, or the parameters have, validate
1370
      for hv_name, hv_params in self.new_hvparams.items():
1371
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1372
            (self.op.enabled_hypervisors and
1373
             hv_name in self.op.enabled_hypervisors)):
1374
          # either this is a new hypervisor, or its parameters have changed
1375
          hv_class = hypervisor.GetHypervisor(hv_name)
1376
          hv_class.CheckParameterSyntax(hv_params)
1377
          _CheckHVParams(self, node_list, hv_name, hv_params)
1378

    
1379
  def Exec(self, feedback_fn):
1380
    """Change the parameters of the cluster.
1381

1382
    """
1383
    if self.op.vg_name is not None:
1384
      if self.op.vg_name != self.cfg.GetVGName():
1385
        self.cfg.SetVGName(self.op.vg_name)
1386
      else:
1387
        feedback_fn("Cluster LVM configuration already in desired"
1388
                    " state, not changing")
1389
    if self.op.hvparams:
1390
      self.cluster.hvparams = self.new_hvparams
1391
    if self.op.enabled_hypervisors is not None:
1392
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1393
    if self.op.beparams:
1394
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1395
    if self.op.candidate_pool_size is not None:
1396
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1397

    
1398
    self.cfg.Update(self.cluster)
1399

    
1400
    # we want to update nodes after the cluster so that if any errors
1401
    # happen, we have recorded and saved the cluster info
1402
    if self.op.candidate_pool_size is not None:
1403
      _AdjustCandidatePool(self)
1404

    
1405

    
1406
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1407
  """Sleep and poll for an instance's disk to sync.
1408

1409
  """
1410
  if not instance.disks:
1411
    return True
1412

    
1413
  if not oneshot:
1414
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1415

    
1416
  node = instance.primary_node
1417

    
1418
  for dev in instance.disks:
1419
    lu.cfg.SetDiskID(dev, node)
1420

    
1421
  retries = 0
1422
  while True:
1423
    max_time = 0
1424
    done = True
1425
    cumul_degraded = False
1426
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1427
    if rstats.failed or not rstats.data:
1428
      lu.LogWarning("Can't get any data from node %s", node)
1429
      retries += 1
1430
      if retries >= 10:
1431
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1432
                                 " aborting." % node)
1433
      time.sleep(6)
1434
      continue
1435
    rstats = rstats.data
1436
    retries = 0
1437
    for i in range(len(rstats)):
1438
      mstat = rstats[i]
1439
      if mstat is None:
1440
        lu.LogWarning("Can't compute data for node %s/%s",
1441
                           node, instance.disks[i].iv_name)
1442
        continue
1443
      # we ignore the ldisk parameter
1444
      perc_done, est_time, is_degraded, _ = mstat
1445
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1446
      if perc_done is not None:
1447
        done = False
1448
        if est_time is not None:
1449
          rem_time = "%d estimated seconds remaining" % est_time
1450
          max_time = est_time
1451
        else:
1452
          rem_time = "no time estimate"
1453
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1454
                        (instance.disks[i].iv_name, perc_done, rem_time))
1455
    if done or oneshot:
1456
      break
1457

    
1458
    time.sleep(min(60, max_time))
1459

    
1460
  if done:
1461
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1462
  return not cumul_degraded
1463

    
1464

    
1465
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1466
  """Check that mirrors are not degraded.
1467

1468
  The ldisk parameter, if True, will change the test from the
1469
  is_degraded attribute (which represents overall non-ok status for
1470
  the device(s)) to the ldisk (representing the local storage status).
1471

1472
  """
1473
  lu.cfg.SetDiskID(dev, node)
1474
  if ldisk:
1475
    idx = 6
1476
  else:
1477
    idx = 5
1478

    
1479
  result = True
1480
  if on_primary or dev.AssembleOnSecondary():
1481
    rstats = lu.rpc.call_blockdev_find(node, dev)
1482
    if rstats.failed or not rstats.data:
1483
      logging.warning("Node %s: disk degraded, not found or node down", node)
1484
      result = False
1485
    else:
1486
      result = result and (not rstats.data[idx])
1487
  if dev.children:
1488
    for child in dev.children:
1489
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1490

    
1491
  return result
1492

    
1493

    
1494
class LUDiagnoseOS(NoHooksLU):
1495
  """Logical unit for OS diagnose/query.
1496

1497
  """
1498
  _OP_REQP = ["output_fields", "names"]
1499
  REQ_BGL = False
1500
  _FIELDS_STATIC = utils.FieldSet()
1501
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1502

    
1503
  def ExpandNames(self):
1504
    if self.op.names:
1505
      raise errors.OpPrereqError("Selective OS query not supported")
1506

    
1507
    _CheckOutputFields(static=self._FIELDS_STATIC,
1508
                       dynamic=self._FIELDS_DYNAMIC,
1509
                       selected=self.op.output_fields)
1510

    
1511
    # Lock all nodes, in shared mode
1512
    self.needed_locks = {}
1513
    self.share_locks[locking.LEVEL_NODE] = 1
1514
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1515

    
1516
  def CheckPrereq(self):
1517
    """Check prerequisites.
1518

1519
    """
1520

    
1521
  @staticmethod
1522
  def _DiagnoseByOS(node_list, rlist):
1523
    """Remaps a per-node return list into an a per-os per-node dictionary
1524

1525
    @param node_list: a list with the names of all nodes
1526
    @param rlist: a map with node names as keys and OS objects as values
1527

1528
    @rtype: dict
1529
    @returns: a dictionary with osnames as keys and as value another map, with
1530
        nodes as keys and list of OS objects as values, eg::
1531

1532
          {"debian-etch": {"node1": [<object>,...],
1533
                           "node2": [<object>,]}
1534
          }
1535

1536
    """
1537
    all_os = {}
1538
    for node_name, nr in rlist.iteritems():
1539
      if nr.failed or not nr.data:
1540
        continue
1541
      for os_obj in nr.data:
1542
        if os_obj.name not in all_os:
1543
          # build a list of nodes for this os containing empty lists
1544
          # for each node in node_list
1545
          all_os[os_obj.name] = {}
1546
          for nname in node_list:
1547
            all_os[os_obj.name][nname] = []
1548
        all_os[os_obj.name][node_name].append(os_obj)
1549
    return all_os
1550

    
1551
  def Exec(self, feedback_fn):
1552
    """Compute the list of OSes.
1553

1554
    """
1555
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1556
    node_data = self.rpc.call_os_diagnose(node_list)
1557
    if node_data == False:
1558
      raise errors.OpExecError("Can't gather the list of OSes")
1559
    pol = self._DiagnoseByOS(node_list, node_data)
1560
    output = []
1561
    for os_name, os_data in pol.iteritems():
1562
      row = []
1563
      for field in self.op.output_fields:
1564
        if field == "name":
1565
          val = os_name
1566
        elif field == "valid":
1567
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1568
        elif field == "node_status":
1569
          val = {}
1570
          for node_name, nos_list in os_data.iteritems():
1571
            val[node_name] = [(v.status, v.path) for v in nos_list]
1572
        else:
1573
          raise errors.ParameterError(field)
1574
        row.append(val)
1575
      output.append(row)
1576

    
1577
    return output
1578

    
1579

    
1580
class LURemoveNode(LogicalUnit):
1581
  """Logical unit for removing a node.
1582

1583
  """
1584
  HPATH = "node-remove"
1585
  HTYPE = constants.HTYPE_NODE
1586
  _OP_REQP = ["node_name"]
1587

    
1588
  def BuildHooksEnv(self):
1589
    """Build hooks env.
1590

1591
    This doesn't run on the target node in the pre phase as a failed
1592
    node would then be impossible to remove.
1593

1594
    """
1595
    env = {
1596
      "OP_TARGET": self.op.node_name,
1597
      "NODE_NAME": self.op.node_name,
1598
      }
1599
    all_nodes = self.cfg.GetNodeList()
1600
    all_nodes.remove(self.op.node_name)
1601
    return env, all_nodes, all_nodes
1602

    
1603
  def CheckPrereq(self):
1604
    """Check prerequisites.
1605

1606
    This checks:
1607
     - the node exists in the configuration
1608
     - it does not have primary or secondary instances
1609
     - it's not the master
1610

1611
    Any errors are signalled by raising errors.OpPrereqError.
1612

1613
    """
1614
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1615
    if node is None:
1616
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1617

    
1618
    instance_list = self.cfg.GetInstanceList()
1619

    
1620
    masternode = self.cfg.GetMasterNode()
1621
    if node.name == masternode:
1622
      raise errors.OpPrereqError("Node is the master node,"
1623
                                 " you need to failover first.")
1624

    
1625
    for instance_name in instance_list:
1626
      instance = self.cfg.GetInstanceInfo(instance_name)
1627
      if node.name == instance.primary_node:
1628
        raise errors.OpPrereqError("Instance %s still running on the node,"
1629
                                   " please remove first." % instance_name)
1630
      if node.name in instance.secondary_nodes:
1631
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1632
                                   " please remove first." % instance_name)
1633
    self.op.node_name = node.name
1634
    self.node = node
1635

    
1636
  def Exec(self, feedback_fn):
1637
    """Removes the node from the cluster.
1638

1639
    """
1640
    node = self.node
1641
    logging.info("Stopping the node daemon and removing configs from node %s",
1642
                 node.name)
1643

    
1644
    self.context.RemoveNode(node.name)
1645

    
1646
    self.rpc.call_node_leave_cluster(node.name)
1647

    
1648
    # Promote nodes to master candidate as needed
1649
    _AdjustCandidatePool(self)
1650

    
1651

    
1652
class LUQueryNodes(NoHooksLU):
1653
  """Logical unit for querying nodes.
1654

1655
  """
1656
  _OP_REQP = ["output_fields", "names"]
1657
  REQ_BGL = False
1658
  _FIELDS_DYNAMIC = utils.FieldSet(
1659
    "dtotal", "dfree",
1660
    "mtotal", "mnode", "mfree",
1661
    "bootid",
1662
    "ctotal",
1663
    )
1664

    
1665
  _FIELDS_STATIC = utils.FieldSet(
1666
    "name", "pinst_cnt", "sinst_cnt",
1667
    "pinst_list", "sinst_list",
1668
    "pip", "sip", "tags",
1669
    "serial_no",
1670
    "master_candidate",
1671
    "master",
1672
    "offline",
1673
    )
1674

    
1675
  def ExpandNames(self):
1676
    _CheckOutputFields(static=self._FIELDS_STATIC,
1677
                       dynamic=self._FIELDS_DYNAMIC,
1678
                       selected=self.op.output_fields)
1679

    
1680
    self.needed_locks = {}
1681
    self.share_locks[locking.LEVEL_NODE] = 1
1682

    
1683
    if self.op.names:
1684
      self.wanted = _GetWantedNodes(self, self.op.names)
1685
    else:
1686
      self.wanted = locking.ALL_SET
1687

    
1688
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1689
    if self.do_locking:
1690
      # if we don't request only static fields, we need to lock the nodes
1691
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1692

    
1693

    
1694
  def CheckPrereq(self):
1695
    """Check prerequisites.
1696

1697
    """
1698
    # The validation of the node list is done in the _GetWantedNodes,
1699
    # if non empty, and if empty, there's no validation to do
1700
    pass
1701

    
1702
  def Exec(self, feedback_fn):
1703
    """Computes the list of nodes and their attributes.
1704

1705
    """
1706
    all_info = self.cfg.GetAllNodesInfo()
1707
    if self.do_locking:
1708
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1709
    elif self.wanted != locking.ALL_SET:
1710
      nodenames = self.wanted
1711
      missing = set(nodenames).difference(all_info.keys())
1712
      if missing:
1713
        raise errors.OpExecError(
1714
          "Some nodes were removed before retrieving their data: %s" % missing)
1715
    else:
1716
      nodenames = all_info.keys()
1717

    
1718
    nodenames = utils.NiceSort(nodenames)
1719
    nodelist = [all_info[name] for name in nodenames]
1720

    
1721
    # begin data gathering
1722

    
1723
    if self.do_locking:
1724
      live_data = {}
1725
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1726
                                          self.cfg.GetHypervisorType())
1727
      for name in nodenames:
1728
        nodeinfo = node_data[name]
1729
        if not nodeinfo.failed and nodeinfo.data:
1730
          nodeinfo = nodeinfo.data
1731
          fn = utils.TryConvert
1732
          live_data[name] = {
1733
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1734
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1735
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1736
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1737
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1738
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1739
            "bootid": nodeinfo.get('bootid', None),
1740
            }
1741
        else:
1742
          live_data[name] = {}
1743
    else:
1744
      live_data = dict.fromkeys(nodenames, {})
1745

    
1746
    node_to_primary = dict([(name, set()) for name in nodenames])
1747
    node_to_secondary = dict([(name, set()) for name in nodenames])
1748

    
1749
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1750
                             "sinst_cnt", "sinst_list"))
1751
    if inst_fields & frozenset(self.op.output_fields):
1752
      instancelist = self.cfg.GetInstanceList()
1753

    
1754
      for instance_name in instancelist:
1755
        inst = self.cfg.GetInstanceInfo(instance_name)
1756
        if inst.primary_node in node_to_primary:
1757
          node_to_primary[inst.primary_node].add(inst.name)
1758
        for secnode in inst.secondary_nodes:
1759
          if secnode in node_to_secondary:
1760
            node_to_secondary[secnode].add(inst.name)
1761

    
1762
    master_node = self.cfg.GetMasterNode()
1763

    
1764
    # end data gathering
1765

    
1766
    output = []
1767
    for node in nodelist:
1768
      node_output = []
1769
      for field in self.op.output_fields:
1770
        if field == "name":
1771
          val = node.name
1772
        elif field == "pinst_list":
1773
          val = list(node_to_primary[node.name])
1774
        elif field == "sinst_list":
1775
          val = list(node_to_secondary[node.name])
1776
        elif field == "pinst_cnt":
1777
          val = len(node_to_primary[node.name])
1778
        elif field == "sinst_cnt":
1779
          val = len(node_to_secondary[node.name])
1780
        elif field == "pip":
1781
          val = node.primary_ip
1782
        elif field == "sip":
1783
          val = node.secondary_ip
1784
        elif field == "tags":
1785
          val = list(node.GetTags())
1786
        elif field == "serial_no":
1787
          val = node.serial_no
1788
        elif field == "master_candidate":
1789
          val = node.master_candidate
1790
        elif field == "master":
1791
          val = node.name == master_node
1792
        elif field == "offline":
1793
          val = node.offline
1794
        elif self._FIELDS_DYNAMIC.Matches(field):
1795
          val = live_data[node.name].get(field, None)
1796
        else:
1797
          raise errors.ParameterError(field)
1798
        node_output.append(val)
1799
      output.append(node_output)
1800

    
1801
    return output
1802

    
1803

    
1804
class LUQueryNodeVolumes(NoHooksLU):
1805
  """Logical unit for getting volumes on node(s).
1806

1807
  """
1808
  _OP_REQP = ["nodes", "output_fields"]
1809
  REQ_BGL = False
1810
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1811
  _FIELDS_STATIC = utils.FieldSet("node")
1812

    
1813
  def ExpandNames(self):
1814
    _CheckOutputFields(static=self._FIELDS_STATIC,
1815
                       dynamic=self._FIELDS_DYNAMIC,
1816
                       selected=self.op.output_fields)
1817

    
1818
    self.needed_locks = {}
1819
    self.share_locks[locking.LEVEL_NODE] = 1
1820
    if not self.op.nodes:
1821
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1822
    else:
1823
      self.needed_locks[locking.LEVEL_NODE] = \
1824
        _GetWantedNodes(self, self.op.nodes)
1825

    
1826
  def CheckPrereq(self):
1827
    """Check prerequisites.
1828

1829
    This checks that the fields required are valid output fields.
1830

1831
    """
1832
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1833

    
1834
  def Exec(self, feedback_fn):
1835
    """Computes the list of nodes and their attributes.
1836

1837
    """
1838
    nodenames = self.nodes
1839
    volumes = self.rpc.call_node_volumes(nodenames)
1840

    
1841
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1842
             in self.cfg.GetInstanceList()]
1843

    
1844
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1845

    
1846
    output = []
1847
    for node in nodenames:
1848
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1849
        continue
1850

    
1851
      node_vols = volumes[node].data[:]
1852
      node_vols.sort(key=lambda vol: vol['dev'])
1853

    
1854
      for vol in node_vols:
1855
        node_output = []
1856
        for field in self.op.output_fields:
1857
          if field == "node":
1858
            val = node
1859
          elif field == "phys":
1860
            val = vol['dev']
1861
          elif field == "vg":
1862
            val = vol['vg']
1863
          elif field == "name":
1864
            val = vol['name']
1865
          elif field == "size":
1866
            val = int(float(vol['size']))
1867
          elif field == "instance":
1868
            for inst in ilist:
1869
              if node not in lv_by_node[inst]:
1870
                continue
1871
              if vol['name'] in lv_by_node[inst][node]:
1872
                val = inst.name
1873
                break
1874
            else:
1875
              val = '-'
1876
          else:
1877
            raise errors.ParameterError(field)
1878
          node_output.append(str(val))
1879

    
1880
        output.append(node_output)
1881

    
1882
    return output
1883

    
1884

    
1885
class LUAddNode(LogicalUnit):
1886
  """Logical unit for adding node to the cluster.
1887

1888
  """
1889
  HPATH = "node-add"
1890
  HTYPE = constants.HTYPE_NODE
1891
  _OP_REQP = ["node_name"]
1892

    
1893
  def BuildHooksEnv(self):
1894
    """Build hooks env.
1895

1896
    This will run on all nodes before, and on all nodes + the new node after.
1897

1898
    """
1899
    env = {
1900
      "OP_TARGET": self.op.node_name,
1901
      "NODE_NAME": self.op.node_name,
1902
      "NODE_PIP": self.op.primary_ip,
1903
      "NODE_SIP": self.op.secondary_ip,
1904
      }
1905
    nodes_0 = self.cfg.GetNodeList()
1906
    nodes_1 = nodes_0 + [self.op.node_name, ]
1907
    return env, nodes_0, nodes_1
1908

    
1909
  def CheckPrereq(self):
1910
    """Check prerequisites.
1911

1912
    This checks:
1913
     - the new node is not already in the config
1914
     - it is resolvable
1915
     - its parameters (single/dual homed) matches the cluster
1916

1917
    Any errors are signalled by raising errors.OpPrereqError.
1918

1919
    """
1920
    node_name = self.op.node_name
1921
    cfg = self.cfg
1922

    
1923
    dns_data = utils.HostInfo(node_name)
1924

    
1925
    node = dns_data.name
1926
    primary_ip = self.op.primary_ip = dns_data.ip
1927
    secondary_ip = getattr(self.op, "secondary_ip", None)
1928
    if secondary_ip is None:
1929
      secondary_ip = primary_ip
1930
    if not utils.IsValidIP(secondary_ip):
1931
      raise errors.OpPrereqError("Invalid secondary IP given")
1932
    self.op.secondary_ip = secondary_ip
1933

    
1934
    node_list = cfg.GetNodeList()
1935
    if not self.op.readd and node in node_list:
1936
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1937
                                 node)
1938
    elif self.op.readd and node not in node_list:
1939
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1940

    
1941
    for existing_node_name in node_list:
1942
      existing_node = cfg.GetNodeInfo(existing_node_name)
1943

    
1944
      if self.op.readd and node == existing_node_name:
1945
        if (existing_node.primary_ip != primary_ip or
1946
            existing_node.secondary_ip != secondary_ip):
1947
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1948
                                     " address configuration as before")
1949
        continue
1950

    
1951
      if (existing_node.primary_ip == primary_ip or
1952
          existing_node.secondary_ip == primary_ip or
1953
          existing_node.primary_ip == secondary_ip or
1954
          existing_node.secondary_ip == secondary_ip):
1955
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1956
                                   " existing node %s" % existing_node.name)
1957

    
1958
    # check that the type of the node (single versus dual homed) is the
1959
    # same as for the master
1960
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1961
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1962
    newbie_singlehomed = secondary_ip == primary_ip
1963
    if master_singlehomed != newbie_singlehomed:
1964
      if master_singlehomed:
1965
        raise errors.OpPrereqError("The master has no private ip but the"
1966
                                   " new node has one")
1967
      else:
1968
        raise errors.OpPrereqError("The master has a private ip but the"
1969
                                   " new node doesn't have one")
1970

    
1971
    # checks reachablity
1972
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1973
      raise errors.OpPrereqError("Node not reachable by ping")
1974

    
1975
    if not newbie_singlehomed:
1976
      # check reachability from my secondary ip to newbie's secondary ip
1977
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1978
                           source=myself.secondary_ip):
1979
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1980
                                   " based ping to noded port")
1981

    
1982
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1983
    node_info = self.cfg.GetAllNodesInfo().values()
1984
    mc_now, _ = self.cfg.GetMasterCandidateStats()
1985
    master_candidate = mc_now < cp_size
1986

    
1987
    self.new_node = objects.Node(name=node,
1988
                                 primary_ip=primary_ip,
1989
                                 secondary_ip=secondary_ip,
1990
                                 master_candidate=master_candidate,
1991
                                 offline=False)
1992

    
1993
  def Exec(self, feedback_fn):
1994
    """Adds the new node to the cluster.
1995

1996
    """
1997
    new_node = self.new_node
1998
    node = new_node.name
1999

    
2000
    # check connectivity
2001
    result = self.rpc.call_version([node])[node]
2002
    result.Raise()
2003
    if result.data:
2004
      if constants.PROTOCOL_VERSION == result.data:
2005
        logging.info("Communication to node %s fine, sw version %s match",
2006
                     node, result.data)
2007
      else:
2008
        raise errors.OpExecError("Version mismatch master version %s,"
2009
                                 " node version %s" %
2010
                                 (constants.PROTOCOL_VERSION, result.data))
2011
    else:
2012
      raise errors.OpExecError("Cannot get version from the new node")
2013

    
2014
    # setup ssh on node
2015
    logging.info("Copy ssh key to node %s", node)
2016
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2017
    keyarray = []
2018
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2019
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2020
                priv_key, pub_key]
2021

    
2022
    for i in keyfiles:
2023
      f = open(i, 'r')
2024
      try:
2025
        keyarray.append(f.read())
2026
      finally:
2027
        f.close()
2028

    
2029
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2030
                                    keyarray[2],
2031
                                    keyarray[3], keyarray[4], keyarray[5])
2032

    
2033
    if result.failed or not result.data:
2034
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2035

    
2036
    # Add node to our /etc/hosts, and add key to known_hosts
2037
    utils.AddHostToEtcHosts(new_node.name)
2038

    
2039
    if new_node.secondary_ip != new_node.primary_ip:
2040
      result = self.rpc.call_node_has_ip_address(new_node.name,
2041
                                                 new_node.secondary_ip)
2042
      if result.failed or not result.data:
2043
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2044
                                 " you gave (%s). Please fix and re-run this"
2045
                                 " command." % new_node.secondary_ip)
2046

    
2047
    node_verify_list = [self.cfg.GetMasterNode()]
2048
    node_verify_param = {
2049
      'nodelist': [node],
2050
      # TODO: do a node-net-test as well?
2051
    }
2052

    
2053
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2054
                                       self.cfg.GetClusterName())
2055
    for verifier in node_verify_list:
2056
      if result[verifier].failed or not result[verifier].data:
2057
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2058
                                 " for remote verification" % verifier)
2059
      if result[verifier].data['nodelist']:
2060
        for failed in result[verifier].data['nodelist']:
2061
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2062
                      (verifier, result[verifier]['nodelist'][failed]))
2063
        raise errors.OpExecError("ssh/hostname verification failed.")
2064

    
2065
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2066
    # including the node just added
2067
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2068
    dist_nodes = self.cfg.GetNodeList()
2069
    if not self.op.readd:
2070
      dist_nodes.append(node)
2071
    if myself.name in dist_nodes:
2072
      dist_nodes.remove(myself.name)
2073

    
2074
    logging.debug("Copying hosts and known_hosts to all nodes")
2075
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2076
      result = self.rpc.call_upload_file(dist_nodes, fname)
2077
      for to_node, to_result in result.iteritems():
2078
        if to_result.failed or not to_result.data:
2079
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2080

    
2081
    to_copy = []
2082
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2083
      to_copy.append(constants.VNC_PASSWORD_FILE)
2084
    for fname in to_copy:
2085
      result = self.rpc.call_upload_file([node], fname)
2086
      if result[node].failed or not result[node]:
2087
        logging.error("Could not copy file %s to node %s", fname, node)
2088

    
2089
    if self.op.readd:
2090
      self.context.ReaddNode(new_node)
2091
    else:
2092
      self.context.AddNode(new_node)
2093

    
2094

    
2095
class LUSetNodeParams(LogicalUnit):
2096
  """Modifies the parameters of a node.
2097

2098
  """
2099
  HPATH = "node-modify"
2100
  HTYPE = constants.HTYPE_NODE
2101
  _OP_REQP = ["node_name"]
2102
  REQ_BGL = False
2103

    
2104
  def CheckArguments(self):
2105
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2106
    if node_name is None:
2107
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2108
    self.op.node_name = node_name
2109
    if not hasattr(self.op, 'master_candidate'):
2110
      raise errors.OpPrereqError("Please pass at least one modification")
2111
    self.op.master_candidate = bool(self.op.master_candidate)
2112

    
2113
  def ExpandNames(self):
2114
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2115

    
2116
  def BuildHooksEnv(self):
2117
    """Build hooks env.
2118

2119
    This runs on the master node.
2120

2121
    """
2122
    env = {
2123
      "OP_TARGET": self.op.node_name,
2124
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2125
      }
2126
    nl = [self.cfg.GetMasterNode(),
2127
          self.op.node_name]
2128
    return env, nl, nl
2129

    
2130
  def CheckPrereq(self):
2131
    """Check prerequisites.
2132

2133
    This only checks the instance list against the existing names.
2134

2135
    """
2136
    force = self.force = self.op.force
2137

    
2138
    if self.op.master_candidate == False:
2139
      if self.op.node_name == self.cfg.GetMasterNode():
2140
        raise errors.OpPrereqError("The master node has to be a"
2141
                                   " master candidate")
2142
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2143
      node_info = self.cfg.GetAllNodesInfo().values()
2144
      num_candidates = len([node for node in node_info
2145
                            if node.master_candidate])
2146
      if num_candidates <= cp_size:
2147
        msg = ("Not enough master candidates (desired"
2148
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2149
        if force:
2150
          self.LogWarning(msg)
2151
        else:
2152
          raise errors.OpPrereqError(msg)
2153

    
2154
    return
2155

    
2156
  def Exec(self, feedback_fn):
2157
    """Modifies a node.
2158

2159
    """
2160
    node = self.cfg.GetNodeInfo(self.op.node_name)
2161

    
2162
    result = []
2163

    
2164
    if self.op.master_candidate is not None:
2165
      node.master_candidate = self.op.master_candidate
2166
      result.append(("master_candidate", str(self.op.master_candidate)))
2167
      if self.op.master_candidate == False:
2168
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2169
        if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2170
            or len(rrc.data) != 2):
2171
          self.LogWarning("Node rpc error: %s" % rrc.error)
2172
        elif not rrc.data[0]:
2173
          self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2174

    
2175
    # this will trigger configuration file update, if needed
2176
    self.cfg.Update(node)
2177
    # this will trigger job queue propagation or cleanup
2178
    if self.op.node_name != self.cfg.GetMasterNode():
2179
      self.context.ReaddNode(node)
2180

    
2181
    return result
2182

    
2183

    
2184
class LUQueryClusterInfo(NoHooksLU):
2185
  """Query cluster configuration.
2186

2187
  """
2188
  _OP_REQP = []
2189
  REQ_BGL = False
2190

    
2191
  def ExpandNames(self):
2192
    self.needed_locks = {}
2193

    
2194
  def CheckPrereq(self):
2195
    """No prerequsites needed for this LU.
2196

2197
    """
2198
    pass
2199

    
2200
  def Exec(self, feedback_fn):
2201
    """Return cluster config.
2202

2203
    """
2204
    cluster = self.cfg.GetClusterInfo()
2205
    result = {
2206
      "software_version": constants.RELEASE_VERSION,
2207
      "protocol_version": constants.PROTOCOL_VERSION,
2208
      "config_version": constants.CONFIG_VERSION,
2209
      "os_api_version": constants.OS_API_VERSION,
2210
      "export_version": constants.EXPORT_VERSION,
2211
      "architecture": (platform.architecture()[0], platform.machine()),
2212
      "name": cluster.cluster_name,
2213
      "master": cluster.master_node,
2214
      "default_hypervisor": cluster.default_hypervisor,
2215
      "enabled_hypervisors": cluster.enabled_hypervisors,
2216
      "hvparams": cluster.hvparams,
2217
      "beparams": cluster.beparams,
2218
      "candidate_pool_size": cluster.candidate_pool_size,
2219
      }
2220

    
2221
    return result
2222

    
2223

    
2224
class LUQueryConfigValues(NoHooksLU):
2225
  """Return configuration values.
2226

2227
  """
2228
  _OP_REQP = []
2229
  REQ_BGL = False
2230
  _FIELDS_DYNAMIC = utils.FieldSet()
2231
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2232

    
2233
  def ExpandNames(self):
2234
    self.needed_locks = {}
2235

    
2236
    _CheckOutputFields(static=self._FIELDS_STATIC,
2237
                       dynamic=self._FIELDS_DYNAMIC,
2238
                       selected=self.op.output_fields)
2239

    
2240
  def CheckPrereq(self):
2241
    """No prerequisites.
2242

2243
    """
2244
    pass
2245

    
2246
  def Exec(self, feedback_fn):
2247
    """Dump a representation of the cluster config to the standard output.
2248

2249
    """
2250
    values = []
2251
    for field in self.op.output_fields:
2252
      if field == "cluster_name":
2253
        entry = self.cfg.GetClusterName()
2254
      elif field == "master_node":
2255
        entry = self.cfg.GetMasterNode()
2256
      elif field == "drain_flag":
2257
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2258
      else:
2259
        raise errors.ParameterError(field)
2260
      values.append(entry)
2261
    return values
2262

    
2263

    
2264
class LUActivateInstanceDisks(NoHooksLU):
2265
  """Bring up an instance's disks.
2266

2267
  """
2268
  _OP_REQP = ["instance_name"]
2269
  REQ_BGL = False
2270

    
2271
  def ExpandNames(self):
2272
    self._ExpandAndLockInstance()
2273
    self.needed_locks[locking.LEVEL_NODE] = []
2274
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2275

    
2276
  def DeclareLocks(self, level):
2277
    if level == locking.LEVEL_NODE:
2278
      self._LockInstancesNodes()
2279

    
2280
  def CheckPrereq(self):
2281
    """Check prerequisites.
2282

2283
    This checks that the instance is in the cluster.
2284

2285
    """
2286
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2287
    assert self.instance is not None, \
2288
      "Cannot retrieve locked instance %s" % self.op.instance_name
2289

    
2290
  def Exec(self, feedback_fn):
2291
    """Activate the disks.
2292

2293
    """
2294
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2295
    if not disks_ok:
2296
      raise errors.OpExecError("Cannot activate block devices")
2297

    
2298
    return disks_info
2299

    
2300

    
2301
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2302
  """Prepare the block devices for an instance.
2303

2304
  This sets up the block devices on all nodes.
2305

2306
  @type lu: L{LogicalUnit}
2307
  @param lu: the logical unit on whose behalf we execute
2308
  @type instance: L{objects.Instance}
2309
  @param instance: the instance for whose disks we assemble
2310
  @type ignore_secondaries: boolean
2311
  @param ignore_secondaries: if true, errors on secondary nodes
2312
      won't result in an error return from the function
2313
  @return: False if the operation failed, otherwise a list of
2314
      (host, instance_visible_name, node_visible_name)
2315
      with the mapping from node devices to instance devices
2316

2317
  """
2318
  device_info = []
2319
  disks_ok = True
2320
  iname = instance.name
2321
  # With the two passes mechanism we try to reduce the window of
2322
  # opportunity for the race condition of switching DRBD to primary
2323
  # before handshaking occured, but we do not eliminate it
2324

    
2325
  # The proper fix would be to wait (with some limits) until the
2326
  # connection has been made and drbd transitions from WFConnection
2327
  # into any other network-connected state (Connected, SyncTarget,
2328
  # SyncSource, etc.)
2329

    
2330
  # 1st pass, assemble on all nodes in secondary mode
2331
  for inst_disk in instance.disks:
2332
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2333
      lu.cfg.SetDiskID(node_disk, node)
2334
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2335
      if result.failed or not result:
2336
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2337
                           " (is_primary=False, pass=1)",
2338
                           inst_disk.iv_name, node)
2339
        if not ignore_secondaries:
2340
          disks_ok = False
2341

    
2342
  # FIXME: race condition on drbd migration to primary
2343

    
2344
  # 2nd pass, do only the primary node
2345
  for inst_disk in instance.disks:
2346
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2347
      if node != instance.primary_node:
2348
        continue
2349
      lu.cfg.SetDiskID(node_disk, node)
2350
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2351
      if result.failed or not result:
2352
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2353
                           " (is_primary=True, pass=2)",
2354
                           inst_disk.iv_name, node)
2355
        disks_ok = False
2356
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2357

    
2358
  # leave the disks configured for the primary node
2359
  # this is a workaround that would be fixed better by
2360
  # improving the logical/physical id handling
2361
  for disk in instance.disks:
2362
    lu.cfg.SetDiskID(disk, instance.primary_node)
2363

    
2364
  return disks_ok, device_info
2365

    
2366

    
2367
def _StartInstanceDisks(lu, instance, force):
2368
  """Start the disks of an instance.
2369

2370
  """
2371
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2372
                                           ignore_secondaries=force)
2373
  if not disks_ok:
2374
    _ShutdownInstanceDisks(lu, instance)
2375
    if force is not None and not force:
2376
      lu.proc.LogWarning("", hint="If the message above refers to a"
2377
                         " secondary node,"
2378
                         " you can retry the operation using '--force'.")
2379
    raise errors.OpExecError("Disk consistency error")
2380

    
2381

    
2382
class LUDeactivateInstanceDisks(NoHooksLU):
2383
  """Shutdown an instance's disks.
2384

2385
  """
2386
  _OP_REQP = ["instance_name"]
2387
  REQ_BGL = False
2388

    
2389
  def ExpandNames(self):
2390
    self._ExpandAndLockInstance()
2391
    self.needed_locks[locking.LEVEL_NODE] = []
2392
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2393

    
2394
  def DeclareLocks(self, level):
2395
    if level == locking.LEVEL_NODE:
2396
      self._LockInstancesNodes()
2397

    
2398
  def CheckPrereq(self):
2399
    """Check prerequisites.
2400

2401
    This checks that the instance is in the cluster.
2402

2403
    """
2404
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2405
    assert self.instance is not None, \
2406
      "Cannot retrieve locked instance %s" % self.op.instance_name
2407

    
2408
  def Exec(self, feedback_fn):
2409
    """Deactivate the disks
2410

2411
    """
2412
    instance = self.instance
2413
    _SafeShutdownInstanceDisks(self, instance)
2414

    
2415

    
2416
def _SafeShutdownInstanceDisks(lu, instance):
2417
  """Shutdown block devices of an instance.
2418

2419
  This function checks if an instance is running, before calling
2420
  _ShutdownInstanceDisks.
2421

2422
  """
2423
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2424
                                      [instance.hypervisor])
2425
  ins_l = ins_l[instance.primary_node]
2426
  if ins_l.failed or not isinstance(ins_l.data, list):
2427
    raise errors.OpExecError("Can't contact node '%s'" %
2428
                             instance.primary_node)
2429

    
2430
  if instance.name in ins_l.data:
2431
    raise errors.OpExecError("Instance is running, can't shutdown"
2432
                             " block devices.")
2433

    
2434
  _ShutdownInstanceDisks(lu, instance)
2435

    
2436

    
2437
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2438
  """Shutdown block devices of an instance.
2439

2440
  This does the shutdown on all nodes of the instance.
2441

2442
  If the ignore_primary is false, errors on the primary node are
2443
  ignored.
2444

2445
  """
2446
  result = True
2447
  for disk in instance.disks:
2448
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2449
      lu.cfg.SetDiskID(top_disk, node)
2450
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2451
      if result.failed or not result.data:
2452
        logging.error("Could not shutdown block device %s on node %s",
2453
                      disk.iv_name, node)
2454
        if not ignore_primary or node != instance.primary_node:
2455
          result = False
2456
  return result
2457

    
2458

    
2459
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2460
  """Checks if a node has enough free memory.
2461

2462
  This function check if a given node has the needed amount of free
2463
  memory. In case the node has less memory or we cannot get the
2464
  information from the node, this function raise an OpPrereqError
2465
  exception.
2466

2467
  @type lu: C{LogicalUnit}
2468
  @param lu: a logical unit from which we get configuration data
2469
  @type node: C{str}
2470
  @param node: the node to check
2471
  @type reason: C{str}
2472
  @param reason: string to use in the error message
2473
  @type requested: C{int}
2474
  @param requested: the amount of memory in MiB to check for
2475
  @type hypervisor: C{str}
2476
  @param hypervisor: the hypervisor to ask for memory stats
2477
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2478
      we cannot check the node
2479

2480
  """
2481
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2482
  nodeinfo[node].Raise()
2483
  free_mem = nodeinfo[node].data.get('memory_free')
2484
  if not isinstance(free_mem, int):
2485
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2486
                             " was '%s'" % (node, free_mem))
2487
  if requested > free_mem:
2488
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2489
                             " needed %s MiB, available %s MiB" %
2490
                             (node, reason, requested, free_mem))
2491

    
2492

    
2493
class LUStartupInstance(LogicalUnit):
2494
  """Starts an instance.
2495

2496
  """
2497
  HPATH = "instance-start"
2498
  HTYPE = constants.HTYPE_INSTANCE
2499
  _OP_REQP = ["instance_name", "force"]
2500
  REQ_BGL = False
2501

    
2502
  def ExpandNames(self):
2503
    self._ExpandAndLockInstance()
2504

    
2505
  def BuildHooksEnv(self):
2506
    """Build hooks env.
2507

2508
    This runs on master, primary and secondary nodes of the instance.
2509

2510
    """
2511
    env = {
2512
      "FORCE": self.op.force,
2513
      }
2514
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2515
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2516
          list(self.instance.secondary_nodes))
2517
    return env, nl, nl
2518

    
2519
  def CheckPrereq(self):
2520
    """Check prerequisites.
2521

2522
    This checks that the instance is in the cluster.
2523

2524
    """
2525
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2526
    assert self.instance is not None, \
2527
      "Cannot retrieve locked instance %s" % self.op.instance_name
2528

    
2529
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2530
    # check bridges existance
2531
    _CheckInstanceBridgesExist(self, instance)
2532

    
2533
    _CheckNodeFreeMemory(self, instance.primary_node,
2534
                         "starting instance %s" % instance.name,
2535
                         bep[constants.BE_MEMORY], instance.hypervisor)
2536

    
2537
  def Exec(self, feedback_fn):
2538
    """Start the instance.
2539

2540
    """
2541
    instance = self.instance
2542
    force = self.op.force
2543
    extra_args = getattr(self.op, "extra_args", "")
2544

    
2545
    self.cfg.MarkInstanceUp(instance.name)
2546

    
2547
    node_current = instance.primary_node
2548

    
2549
    _StartInstanceDisks(self, instance, force)
2550

    
2551
    result = self.rpc.call_instance_start(node_current, instance, extra_args)
2552
    if result.failed or not result.data:
2553
      _ShutdownInstanceDisks(self, instance)
2554
      raise errors.OpExecError("Could not start instance")
2555

    
2556

    
2557
class LURebootInstance(LogicalUnit):
2558
  """Reboot an instance.
2559

2560
  """
2561
  HPATH = "instance-reboot"
2562
  HTYPE = constants.HTYPE_INSTANCE
2563
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2564
  REQ_BGL = False
2565

    
2566
  def ExpandNames(self):
2567
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2568
                                   constants.INSTANCE_REBOOT_HARD,
2569
                                   constants.INSTANCE_REBOOT_FULL]:
2570
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2571
                                  (constants.INSTANCE_REBOOT_SOFT,
2572
                                   constants.INSTANCE_REBOOT_HARD,
2573
                                   constants.INSTANCE_REBOOT_FULL))
2574
    self._ExpandAndLockInstance()
2575

    
2576
  def BuildHooksEnv(self):
2577
    """Build hooks env.
2578

2579
    This runs on master, primary and secondary nodes of the instance.
2580

2581
    """
2582
    env = {
2583
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2584
      }
2585
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2586
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2587
          list(self.instance.secondary_nodes))
2588
    return env, nl, nl
2589

    
2590
  def CheckPrereq(self):
2591
    """Check prerequisites.
2592

2593
    This checks that the instance is in the cluster.
2594

2595
    """
2596
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2597
    assert self.instance is not None, \
2598
      "Cannot retrieve locked instance %s" % self.op.instance_name
2599

    
2600
    # check bridges existance
2601
    _CheckInstanceBridgesExist(self, instance)
2602

    
2603
  def Exec(self, feedback_fn):
2604
    """Reboot the instance.
2605

2606
    """
2607
    instance = self.instance
2608
    ignore_secondaries = self.op.ignore_secondaries
2609
    reboot_type = self.op.reboot_type
2610
    extra_args = getattr(self.op, "extra_args", "")
2611

    
2612
    node_current = instance.primary_node
2613

    
2614
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2615
                       constants.INSTANCE_REBOOT_HARD]:
2616
      result = self.rpc.call_instance_reboot(node_current, instance,
2617
                                             reboot_type, extra_args)
2618
      if result.failed or not result.data:
2619
        raise errors.OpExecError("Could not reboot instance")
2620
    else:
2621
      if not self.rpc.call_instance_shutdown(node_current, instance):
2622
        raise errors.OpExecError("could not shutdown instance for full reboot")
2623
      _ShutdownInstanceDisks(self, instance)
2624
      _StartInstanceDisks(self, instance, ignore_secondaries)
2625
      result = self.rpc.call_instance_start(node_current, instance, extra_args)
2626
      if result.failed or not result.data:
2627
        _ShutdownInstanceDisks(self, instance)
2628
        raise errors.OpExecError("Could not start instance for full reboot")
2629

    
2630
    self.cfg.MarkInstanceUp(instance.name)
2631

    
2632

    
2633
class LUShutdownInstance(LogicalUnit):
2634
  """Shutdown an instance.
2635

2636
  """
2637
  HPATH = "instance-stop"
2638
  HTYPE = constants.HTYPE_INSTANCE
2639
  _OP_REQP = ["instance_name"]
2640
  REQ_BGL = False
2641

    
2642
  def ExpandNames(self):
2643
    self._ExpandAndLockInstance()
2644

    
2645
  def BuildHooksEnv(self):
2646
    """Build hooks env.
2647

2648
    This runs on master, primary and secondary nodes of the instance.
2649

2650
    """
2651
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2652
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2653
          list(self.instance.secondary_nodes))
2654
    return env, nl, nl
2655

    
2656
  def CheckPrereq(self):
2657
    """Check prerequisites.
2658

2659
    This checks that the instance is in the cluster.
2660

2661
    """
2662
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2663
    assert self.instance is not None, \
2664
      "Cannot retrieve locked instance %s" % self.op.instance_name
2665

    
2666
  def Exec(self, feedback_fn):
2667
    """Shutdown the instance.
2668

2669
    """
2670
    instance = self.instance
2671
    node_current = instance.primary_node
2672
    self.cfg.MarkInstanceDown(instance.name)
2673
    result = self.rpc.call_instance_shutdown(node_current, instance)
2674
    if result.failed or not result.data:
2675
      self.proc.LogWarning("Could not shutdown instance")
2676

    
2677
    _ShutdownInstanceDisks(self, instance)
2678

    
2679

    
2680
class LUReinstallInstance(LogicalUnit):
2681
  """Reinstall an instance.
2682

2683
  """
2684
  HPATH = "instance-reinstall"
2685
  HTYPE = constants.HTYPE_INSTANCE
2686
  _OP_REQP = ["instance_name"]
2687
  REQ_BGL = False
2688

    
2689
  def ExpandNames(self):
2690
    self._ExpandAndLockInstance()
2691

    
2692
  def BuildHooksEnv(self):
2693
    """Build hooks env.
2694

2695
    This runs on master, primary and secondary nodes of the instance.
2696

2697
    """
2698
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2699
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2700
          list(self.instance.secondary_nodes))
2701
    return env, nl, nl
2702

    
2703
  def CheckPrereq(self):
2704
    """Check prerequisites.
2705

2706
    This checks that the instance is in the cluster and is not running.
2707

2708
    """
2709
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2710
    assert instance is not None, \
2711
      "Cannot retrieve locked instance %s" % self.op.instance_name
2712

    
2713
    if instance.disk_template == constants.DT_DISKLESS:
2714
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2715
                                 self.op.instance_name)
2716
    if instance.status != "down":
2717
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2718
                                 self.op.instance_name)
2719
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2720
                                              instance.name,
2721
                                              instance.hypervisor)
2722
    if remote_info.failed or remote_info.data:
2723
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2724
                                 (self.op.instance_name,
2725
                                  instance.primary_node))
2726

    
2727
    self.op.os_type = getattr(self.op, "os_type", None)
2728
    if self.op.os_type is not None:
2729
      # OS verification
2730
      pnode = self.cfg.GetNodeInfo(
2731
        self.cfg.ExpandNodeName(instance.primary_node))
2732
      if pnode is None:
2733
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2734
                                   self.op.pnode)
2735
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2736
      result.Raise()
2737
      if not isinstance(result.data, objects.OS):
2738
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2739
                                   " primary node"  % self.op.os_type)
2740

    
2741
    self.instance = instance
2742

    
2743
  def Exec(self, feedback_fn):
2744
    """Reinstall the instance.
2745

2746
    """
2747
    inst = self.instance
2748

    
2749
    if self.op.os_type is not None:
2750
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2751
      inst.os = self.op.os_type
2752
      self.cfg.Update(inst)
2753

    
2754
    _StartInstanceDisks(self, inst, None)
2755
    try:
2756
      feedback_fn("Running the instance OS create scripts...")
2757
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2758
      result.Raise()
2759
      if not result.data:
2760
        raise errors.OpExecError("Could not install OS for instance %s"
2761
                                 " on node %s" %
2762
                                 (inst.name, inst.primary_node))
2763
    finally:
2764
      _ShutdownInstanceDisks(self, inst)
2765

    
2766

    
2767
class LURenameInstance(LogicalUnit):
2768
  """Rename an instance.
2769

2770
  """
2771
  HPATH = "instance-rename"
2772
  HTYPE = constants.HTYPE_INSTANCE
2773
  _OP_REQP = ["instance_name", "new_name"]
2774

    
2775
  def BuildHooksEnv(self):
2776
    """Build hooks env.
2777

2778
    This runs on master, primary and secondary nodes of the instance.
2779

2780
    """
2781
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2782
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2783
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2784
          list(self.instance.secondary_nodes))
2785
    return env, nl, nl
2786

    
2787
  def CheckPrereq(self):
2788
    """Check prerequisites.
2789

2790
    This checks that the instance is in the cluster and is not running.
2791

2792
    """
2793
    instance = self.cfg.GetInstanceInfo(
2794
      self.cfg.ExpandInstanceName(self.op.instance_name))
2795
    if instance is None:
2796
      raise errors.OpPrereqError("Instance '%s' not known" %
2797
                                 self.op.instance_name)
2798
    if instance.status != "down":
2799
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2800
                                 self.op.instance_name)
2801
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2802
                                              instance.name,
2803
                                              instance.hypervisor)
2804
    remote_info.Raise()
2805
    if remote_info.data:
2806
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2807
                                 (self.op.instance_name,
2808
                                  instance.primary_node))
2809
    self.instance = instance
2810

    
2811
    # new name verification
2812
    name_info = utils.HostInfo(self.op.new_name)
2813

    
2814
    self.op.new_name = new_name = name_info.name
2815
    instance_list = self.cfg.GetInstanceList()
2816
    if new_name in instance_list:
2817
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2818
                                 new_name)
2819

    
2820
    if not getattr(self.op, "ignore_ip", False):
2821
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2822
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2823
                                   (name_info.ip, new_name))
2824

    
2825

    
2826
  def Exec(self, feedback_fn):
2827
    """Reinstall the instance.
2828

2829
    """
2830
    inst = self.instance
2831
    old_name = inst.name
2832

    
2833
    if inst.disk_template == constants.DT_FILE:
2834
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2835

    
2836
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2837
    # Change the instance lock. This is definitely safe while we hold the BGL
2838
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2839
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2840

    
2841
    # re-read the instance from the configuration after rename
2842
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2843

    
2844
    if inst.disk_template == constants.DT_FILE:
2845
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2846
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2847
                                                     old_file_storage_dir,
2848
                                                     new_file_storage_dir)
2849
      result.Raise()
2850
      if not result.data:
2851
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2852
                                 " directory '%s' to '%s' (but the instance"
2853
                                 " has been renamed in Ganeti)" % (
2854
                                 inst.primary_node, old_file_storage_dir,
2855
                                 new_file_storage_dir))
2856

    
2857
      if not result.data[0]:
2858
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2859
                                 " (but the instance has been renamed in"
2860
                                 " Ganeti)" % (old_file_storage_dir,
2861
                                               new_file_storage_dir))
2862

    
2863
    _StartInstanceDisks(self, inst, None)
2864
    try:
2865
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2866
                                                 old_name)
2867
      if result.failed or not result.data:
2868
        msg = ("Could not run OS rename script for instance %s on node %s"
2869
               " (but the instance has been renamed in Ganeti)" %
2870
               (inst.name, inst.primary_node))
2871
        self.proc.LogWarning(msg)
2872
    finally:
2873
      _ShutdownInstanceDisks(self, inst)
2874

    
2875

    
2876
class LURemoveInstance(LogicalUnit):
2877
  """Remove an instance.
2878

2879
  """
2880
  HPATH = "instance-remove"
2881
  HTYPE = constants.HTYPE_INSTANCE
2882
  _OP_REQP = ["instance_name", "ignore_failures"]
2883
  REQ_BGL = False
2884

    
2885
  def ExpandNames(self):
2886
    self._ExpandAndLockInstance()
2887
    self.needed_locks[locking.LEVEL_NODE] = []
2888
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2889

    
2890
  def DeclareLocks(self, level):
2891
    if level == locking.LEVEL_NODE:
2892
      self._LockInstancesNodes()
2893

    
2894
  def BuildHooksEnv(self):
2895
    """Build hooks env.
2896

2897
    This runs on master, primary and secondary nodes of the instance.
2898

2899
    """
2900
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2901
    nl = [self.cfg.GetMasterNode()]
2902
    return env, nl, nl
2903

    
2904
  def CheckPrereq(self):
2905
    """Check prerequisites.
2906

2907
    This checks that the instance is in the cluster.
2908

2909
    """
2910
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2911
    assert self.instance is not None, \
2912
      "Cannot retrieve locked instance %s" % self.op.instance_name
2913

    
2914
  def Exec(self, feedback_fn):
2915
    """Remove the instance.
2916

2917
    """
2918
    instance = self.instance
2919
    logging.info("Shutting down instance %s on node %s",
2920
                 instance.name, instance.primary_node)
2921

    
2922
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2923
    if result.failed or not result.data:
2924
      if self.op.ignore_failures:
2925
        feedback_fn("Warning: can't shutdown instance")
2926
      else:
2927
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2928
                                 (instance.name, instance.primary_node))
2929

    
2930
    logging.info("Removing block devices for instance %s", instance.name)
2931

    
2932
    if not _RemoveDisks(self, instance):
2933
      if self.op.ignore_failures:
2934
        feedback_fn("Warning: can't remove instance's disks")
2935
      else:
2936
        raise errors.OpExecError("Can't remove instance's disks")
2937

    
2938
    logging.info("Removing instance %s out of cluster config", instance.name)
2939

    
2940
    self.cfg.RemoveInstance(instance.name)
2941
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2942

    
2943

    
2944
class LUQueryInstances(NoHooksLU):
2945
  """Logical unit for querying instances.
2946

2947
  """
2948
  _OP_REQP = ["output_fields", "names"]
2949
  REQ_BGL = False
2950
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2951
                                    "admin_state", "admin_ram",
2952
                                    "disk_template", "ip", "mac", "bridge",
2953
                                    "sda_size", "sdb_size", "vcpus", "tags",
2954
                                    "network_port", "beparams",
2955
                                    "(disk).(size)/([0-9]+)",
2956
                                    "(disk).(sizes)",
2957
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2958
                                    "(nic).(macs|ips|bridges)",
2959
                                    "(disk|nic).(count)",
2960
                                    "serial_no", "hypervisor", "hvparams",] +
2961
                                  ["hv/%s" % name
2962
                                   for name in constants.HVS_PARAMETERS] +
2963
                                  ["be/%s" % name
2964
                                   for name in constants.BES_PARAMETERS])
2965
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2966

    
2967

    
2968
  def ExpandNames(self):
2969
    _CheckOutputFields(static=self._FIELDS_STATIC,
2970
                       dynamic=self._FIELDS_DYNAMIC,
2971
                       selected=self.op.output_fields)
2972

    
2973
    self.needed_locks = {}
2974
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2975
    self.share_locks[locking.LEVEL_NODE] = 1
2976

    
2977
    if self.op.names:
2978
      self.wanted = _GetWantedInstances(self, self.op.names)
2979
    else:
2980
      self.wanted = locking.ALL_SET
2981

    
2982
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2983
    if self.do_locking:
2984
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2985
      self.needed_locks[locking.LEVEL_NODE] = []
2986
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2987

    
2988
  def DeclareLocks(self, level):
2989
    if level == locking.LEVEL_NODE and self.do_locking:
2990
      self._LockInstancesNodes()
2991

    
2992
  def CheckPrereq(self):
2993
    """Check prerequisites.
2994

2995
    """
2996
    pass
2997

    
2998
  def Exec(self, feedback_fn):
2999
    """Computes the list of nodes and their attributes.
3000

3001
    """
3002
    all_info = self.cfg.GetAllInstancesInfo()
3003
    if self.do_locking:
3004
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3005
    elif self.wanted != locking.ALL_SET:
3006
      instance_names = self.wanted
3007
      missing = set(instance_names).difference(all_info.keys())
3008
      if missing:
3009
        raise errors.OpExecError(
3010
          "Some instances were removed before retrieving their data: %s"
3011
          % missing)
3012
    else:
3013
      instance_names = all_info.keys()
3014

    
3015
    instance_names = utils.NiceSort(instance_names)
3016
    instance_list = [all_info[iname] for iname in instance_names]
3017

    
3018
    # begin data gathering
3019

    
3020
    nodes = frozenset([inst.primary_node for inst in instance_list])
3021
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3022

    
3023
    bad_nodes = []
3024
    off_nodes = []
3025
    if self.do_locking:
3026
      live_data = {}
3027
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3028
      for name in nodes:
3029
        result = node_data[name]
3030
        if result.offline:
3031
          # offline nodes will be in both lists
3032
          off_nodes.append(name)
3033
        if result.failed:
3034
          bad_nodes.append(name)
3035
        else:
3036
          if result.data:
3037
            live_data.update(result.data)
3038
            # else no instance is alive
3039
    else:
3040
      live_data = dict([(name, {}) for name in instance_names])
3041

    
3042
    # end data gathering
3043

    
3044
    HVPREFIX = "hv/"
3045
    BEPREFIX = "be/"
3046
    output = []
3047
    for instance in instance_list:
3048
      iout = []
3049
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3050
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3051
      for field in self.op.output_fields:
3052
        st_match = self._FIELDS_STATIC.Matches(field)
3053
        if field == "name":
3054
          val = instance.name
3055
        elif field == "os":
3056
          val = instance.os
3057
        elif field == "pnode":
3058
          val = instance.primary_node
3059
        elif field == "snodes":
3060
          val = list(instance.secondary_nodes)
3061
        elif field == "admin_state":
3062
          val = (instance.status != "down")
3063
        elif field == "oper_state":
3064
          if instance.primary_node in bad_nodes:
3065
            val = None
3066
          else:
3067
            val = bool(live_data.get(instance.name))
3068
        elif field == "status":
3069
          if instance.primary_node in off_nodes:
3070
            val = "ERROR_nodeoffline"
3071
          elif instance.primary_node in bad_nodes:
3072
            val = "ERROR_nodedown"
3073
          else:
3074
            running = bool(live_data.get(instance.name))
3075
            if running:
3076
              if instance.status != "down":
3077
                val = "running"
3078
              else:
3079
                val = "ERROR_up"
3080
            else:
3081
              if instance.status != "down":
3082
                val = "ERROR_down"
3083
              else:
3084
                val = "ADMIN_down"
3085
        elif field == "oper_ram":
3086
          if instance.primary_node in bad_nodes:
3087
            val = None
3088
          elif instance.name in live_data:
3089
            val = live_data[instance.name].get("memory", "?")
3090
          else:
3091
            val = "-"
3092
        elif field == "disk_template":
3093
          val = instance.disk_template
3094
        elif field == "ip":
3095
          val = instance.nics[0].ip
3096
        elif field == "bridge":
3097
          val = instance.nics[0].bridge
3098
        elif field == "mac":
3099
          val = instance.nics[0].mac
3100
        elif field == "sda_size" or field == "sdb_size":
3101
          idx = ord(field[2]) - ord('a')
3102
          try:
3103
            val = instance.FindDisk(idx).size
3104
          except errors.OpPrereqError:
3105
            val = None
3106
        elif field == "tags":
3107
          val = list(instance.GetTags())
3108
        elif field == "serial_no":
3109
          val = instance.serial_no
3110
        elif field == "network_port":
3111
          val = instance.network_port
3112
        elif field == "hypervisor":
3113
          val = instance.hypervisor
3114
        elif field == "hvparams":
3115
          val = i_hv
3116
        elif (field.startswith(HVPREFIX) and
3117
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3118
          val = i_hv.get(field[len(HVPREFIX):], None)
3119
        elif field == "beparams":
3120
          val = i_be
3121
        elif (field.startswith(BEPREFIX) and
3122
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3123
          val = i_be.get(field[len(BEPREFIX):], None)
3124
        elif st_match and st_match.groups():
3125
          # matches a variable list
3126
          st_groups = st_match.groups()
3127
          if st_groups and st_groups[0] == "disk":
3128
            if st_groups[1] == "count":
3129
              val = len(instance.disks)
3130
            elif st_groups[1] == "sizes":
3131
              val = [disk.size for disk in instance.disks]
3132
            elif st_groups[1] == "size":
3133
              try:
3134
                val = instance.FindDisk(st_groups[2]).size
3135
              except errors.OpPrereqError:
3136
                val = None
3137
            else:
3138
              assert False, "Unhandled disk parameter"
3139
          elif st_groups[0] == "nic":
3140
            if st_groups[1] == "count":
3141
              val = len(instance.nics)
3142
            elif st_groups[1] == "macs":
3143
              val = [nic.mac for nic in instance.nics]
3144
            elif st_groups[1] == "ips":
3145
              val = [nic.ip for nic in instance.nics]
3146
            elif st_groups[1] == "bridges":
3147
              val = [nic.bridge for nic in instance.nics]
3148
            else:
3149
              # index-based item
3150
              nic_idx = int(st_groups[2])
3151
              if nic_idx >= len(instance.nics):
3152
                val = None
3153
              else:
3154
                if st_groups[1] == "mac":
3155
                  val = instance.nics[nic_idx].mac
3156
                elif st_groups[1] == "ip":
3157
                  val = instance.nics[nic_idx].ip
3158
                elif st_groups[1] == "bridge":
3159
                  val = instance.nics[nic_idx].bridge
3160
                else:
3161
                  assert False, "Unhandled NIC parameter"
3162
          else:
3163
            assert False, "Unhandled variable parameter"
3164
        else:
3165
          raise errors.ParameterError(field)
3166
        iout.append(val)
3167
      output.append(iout)
3168

    
3169
    return output
3170

    
3171

    
3172
class LUFailoverInstance(LogicalUnit):
3173
  """Failover an instance.
3174

3175
  """
3176
  HPATH = "instance-failover"
3177
  HTYPE = constants.HTYPE_INSTANCE
3178
  _OP_REQP = ["instance_name", "ignore_consistency"]
3179
  REQ_BGL = False
3180

    
3181
  def ExpandNames(self):
3182
    self._ExpandAndLockInstance()
3183
    self.needed_locks[locking.LEVEL_NODE] = []
3184
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3185

    
3186
  def DeclareLocks(self, level):
3187
    if level == locking.LEVEL_NODE:
3188
      self._LockInstancesNodes()
3189

    
3190
  def BuildHooksEnv(self):
3191
    """Build hooks env.
3192

3193
    This runs on master, primary and secondary nodes of the instance.
3194

3195
    """
3196
    env = {
3197
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3198
      }
3199
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3200
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3201
    return env, nl, nl
3202

    
3203
  def CheckPrereq(self):
3204
    """Check prerequisites.
3205

3206
    This checks that the instance is in the cluster.
3207

3208
    """
3209
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3210
    assert self.instance is not None, \
3211
      "Cannot retrieve locked instance %s" % self.op.instance_name
3212

    
3213
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3214
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3215
      raise errors.OpPrereqError("Instance's disk layout is not"
3216
                                 " network mirrored, cannot failover.")
3217

    
3218
    secondary_nodes = instance.secondary_nodes
3219
    if not secondary_nodes:
3220
      raise errors.ProgrammerError("no secondary node but using "
3221
                                   "a mirrored disk template")
3222

    
3223
    target_node = secondary_nodes[0]
3224
    # check memory requirements on the secondary node
3225
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3226
                         instance.name, bep[constants.BE_MEMORY],
3227
                         instance.hypervisor)
3228

    
3229
    # check bridge existance
3230
    brlist = [nic.bridge for nic in instance.nics]
3231
    result = self.rpc.call_bridges_exist(target_node, brlist)
3232
    result.Raise()
3233
    if not result.data:
3234
      raise errors.OpPrereqError("One or more target bridges %s does not"
3235
                                 " exist on destination node '%s'" %
3236
                                 (brlist, target_node))
3237

    
3238
  def Exec(self, feedback_fn):
3239
    """Failover an instance.
3240

3241
    The failover is done by shutting it down on its present node and
3242
    starting it on the secondary.
3243

3244
    """
3245
    instance = self.instance
3246

    
3247
    source_node = instance.primary_node
3248
    target_node = instance.secondary_nodes[0]
3249

    
3250
    feedback_fn("* checking disk consistency between source and target")
3251
    for dev in instance.disks:
3252
      # for drbd, these are drbd over lvm
3253
      if not _CheckDiskConsistency(self, dev, target_node, False):
3254
        if instance.status == "up" and not self.op.ignore_consistency:
3255
          raise errors.OpExecError("Disk %s is degraded on target node,"
3256
                                   " aborting failover." % dev.iv_name)
3257

    
3258
    feedback_fn("* shutting down instance on source node")
3259
    logging.info("Shutting down instance %s on node %s",
3260
                 instance.name, source_node)
3261

    
3262
    result = self.rpc.call_instance_shutdown(source_node, instance)
3263
    if result.failed or not result.data:
3264
      if self.op.ignore_consistency:
3265
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3266
                             " Proceeding"
3267
                             " anyway. Please make sure node %s is down",
3268
                             instance.name, source_node, source_node)
3269
      else:
3270
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3271
                                 (instance.name, source_node))
3272

    
3273
    feedback_fn("* deactivating the instance's disks on source node")
3274
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3275
      raise errors.OpExecError("Can't shut down the instance's disks.")
3276

    
3277
    instance.primary_node = target_node
3278
    # distribute new instance config to the other nodes
3279
    self.cfg.Update(instance)
3280

    
3281
    # Only start the instance if it's marked as up
3282
    if instance.status == "up":
3283
      feedback_fn("* activating the instance's disks on target node")
3284
      logging.info("Starting instance %s on node %s",
3285
                   instance.name, target_node)
3286

    
3287
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3288
                                               ignore_secondaries=True)
3289
      if not disks_ok:
3290
        _ShutdownInstanceDisks(self, instance)
3291
        raise errors.OpExecError("Can't activate the instance's disks")
3292

    
3293
      feedback_fn("* starting the instance on the target node")
3294
      result = self.rpc.call_instance_start(target_node, instance, None)
3295
      if result.failed or not result.data:
3296
        _ShutdownInstanceDisks(self, instance)
3297
        raise errors.OpExecError("Could not start instance %s on node %s." %
3298
                                 (instance.name, target_node))
3299

    
3300

    
3301
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3302
  """Create a tree of block devices on the primary node.
3303

3304
  This always creates all devices.
3305

3306
  """
3307
  if device.children:
3308
    for child in device.children:
3309
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3310
        return False
3311

    
3312
  lu.cfg.SetDiskID(device, node)
3313
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3314
                                       instance.name, True, info)
3315
  if new_id.failed or not new_id.data:
3316
    return False
3317
  if device.physical_id is None:
3318
    device.physical_id = new_id
3319
  return True
3320

    
3321

    
3322
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3323
  """Create a tree of block devices on a secondary node.
3324

3325
  If this device type has to be created on secondaries, create it and
3326
  all its children.
3327

3328
  If not, just recurse to children keeping the same 'force' value.
3329

3330
  """
3331
  if device.CreateOnSecondary():
3332
    force = True
3333
  if device.children:
3334
    for child in device.children:
3335
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3336
                                        child, force, info):
3337
        return False
3338

    
3339
  if not force:
3340
    return True
3341
  lu.cfg.SetDiskID(device, node)
3342
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3343
                                       instance.name, False, info)
3344
  if new_id.failed or not new_id.data:
3345
    return False
3346
  if device.physical_id is None:
3347
    device.physical_id = new_id
3348
  return True
3349

    
3350

    
3351
def _GenerateUniqueNames(lu, exts):
3352
  """Generate a suitable LV name.
3353

3354
  This will generate a logical volume name for the given instance.
3355

3356
  """
3357
  results = []
3358
  for val in exts:
3359
    new_id = lu.cfg.GenerateUniqueID()
3360
    results.append("%s%s" % (new_id, val))
3361
  return results
3362

    
3363

    
3364
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3365
                         p_minor, s_minor):
3366
  """Generate a drbd8 device complete with its children.
3367

3368
  """
3369
  port = lu.cfg.AllocatePort()
3370
  vgname = lu.cfg.GetVGName()
3371
  shared_secret = lu.cfg.GenerateDRBDSecret()
3372
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3373
                          logical_id=(vgname, names[0]))
3374
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3375
                          logical_id=(vgname, names[1]))
3376
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3377
                          logical_id=(primary, secondary, port,
3378
                                      p_minor, s_minor,
3379
                                      shared_secret),
3380
                          children=[dev_data, dev_meta],
3381
                          iv_name=iv_name)
3382
  return drbd_dev
3383

    
3384

    
3385
def _GenerateDiskTemplate(lu, template_name,
3386
                          instance_name, primary_node,
3387
                          secondary_nodes, disk_info,
3388
                          file_storage_dir, file_driver,
3389
                          base_index):
3390
  """Generate the entire disk layout for a given template type.
3391

3392
  """
3393
  #TODO: compute space requirements
3394

    
3395
  vgname = lu.cfg.GetVGName()
3396
  disk_count = len(disk_info)
3397
  disks = []
3398
  if template_name == constants.DT_DISKLESS:
3399
    pass
3400
  elif template_name == constants.DT_PLAIN:
3401
    if len(secondary_nodes) != 0:
3402
      raise errors.ProgrammerError("Wrong template configuration")
3403

    
3404
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3405
                                      for i in range(disk_count)])
3406
    for idx, disk in enumerate(disk_info):
3407
      disk_index = idx + base_index
3408
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3409
                              logical_id=(vgname, names[idx]),
3410
                              iv_name="disk/%d" % disk_index)
3411
      disks.append(disk_dev)
3412
  elif template_name == constants.DT_DRBD8:
3413
    if len(secondary_nodes) != 1:
3414
      raise errors.ProgrammerError("Wrong template configuration")
3415
    remote_node = secondary_nodes[0]
3416
    minors = lu.cfg.AllocateDRBDMinor(
3417
      [primary_node, remote_node] * len(disk_info), instance_name)
3418

    
3419
    names = _GenerateUniqueNames(lu,
3420
                                 [".disk%d_%s" % (i, s)
3421
                                  for i in range(disk_count)
3422
                                  for s in ("data", "meta")
3423
                                  ])
3424
    for idx, disk in enumerate(disk_info):
3425
      disk_index = idx + base_index
3426
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3427
                                      disk["size"], names[idx*2:idx*2+2],
3428
                                      "disk/%d" % disk_index,
3429
                                      minors[idx*2], minors[idx*2+1])
3430
      disks.append(disk_dev)
3431
  elif template_name == constants.DT_FILE:
3432
    if len(secondary_nodes) != 0:
3433
      raise errors.ProgrammerError("Wrong template configuration")
3434

    
3435
    for idx, disk in enumerate(disk_info):
3436
      disk_index = idx + base_index
3437
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3438
                              iv_name="disk/%d" % disk_index,
3439
                              logical_id=(file_driver,
3440
                                          "%s/disk%d" % (file_storage_dir,
3441
                                                         idx)))
3442
      disks.append(disk_dev)
3443
  else:
3444
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3445
  return disks
3446

    
3447

    
3448
def _GetInstanceInfoText(instance):
3449
  """Compute that text that should be added to the disk's metadata.
3450

3451
  """
3452
  return "originstname+%s" % instance.name
3453

    
3454

    
3455
def _CreateDisks(lu, instance):
3456
  """Create all disks for an instance.
3457

3458
  This abstracts away some work from AddInstance.
3459

3460
  @type lu: L{LogicalUnit}
3461
  @param lu: the logical unit on whose behalf we execute
3462
  @type instance: L{objects.Instance}
3463
  @param instance: the instance whose disks we should create
3464
  @rtype: boolean
3465
  @return: the success of the creation
3466

3467
  """
3468
  info = _GetInstanceInfoText(instance)
3469

    
3470
  if instance.disk_template == constants.DT_FILE:
3471
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3472
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3473
                                                 file_storage_dir)
3474

    
3475
    if result.failed or not result.data:
3476
      logging.error("Could not connect to node '%s'", instance.primary_node)
3477
      return False
3478

    
3479
    if not result.data[0]:
3480
      logging.error("Failed to create directory '%s'", file_storage_dir)
3481
      return False
3482

    
3483
  # Note: this needs to be kept in sync with adding of disks in
3484
  # LUSetInstanceParams
3485
  for device in instance.disks:
3486
    logging.info("Creating volume %s for instance %s",
3487
                 device.iv_name, instance.name)
3488
    #HARDCODE
3489
    for secondary_node in instance.secondary_nodes:
3490
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3491
                                        device, False, info):
3492
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3493
                      device.iv_name, device, secondary_node)
3494
        return False
3495
    #HARDCODE
3496
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3497
                                    instance, device, info):
3498
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3499
      return False
3500

    
3501
  return True
3502

    
3503

    
3504
def _RemoveDisks(lu, instance):
3505
  """Remove all disks for an instance.
3506

3507
  This abstracts away some work from `AddInstance()` and
3508
  `RemoveInstance()`. Note that in case some of the devices couldn't
3509
  be removed, the removal will continue with the other ones (compare
3510
  with `_CreateDisks()`).
3511

3512
  @type lu: L{LogicalUnit}
3513
  @param lu: the logical unit on whose behalf we execute
3514
  @type instance: L{objects.Instance}
3515
  @param instance: the instance whose disks we should remove
3516
  @rtype: boolean
3517
  @return: the success of the removal
3518

3519
  """
3520
  logging.info("Removing block devices for instance %s", instance.name)
3521

    
3522
  result = True
3523
  for device in instance.disks:
3524
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3525
      lu.cfg.SetDiskID(disk, node)
3526
      result = lu.rpc.call_blockdev_remove(node, disk)
3527
      if result.failed or not result.data:
3528
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3529
                           " continuing anyway", device.iv_name, node)
3530
        result = False
3531

    
3532
  if instance.disk_template == constants.DT_FILE:
3533
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3534
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3535
                                                 file_storage_dir)
3536
    if result.failed or not result.data:
3537
      logging.error("Could not remove directory '%s'", file_storage_dir)
3538
      result = False
3539

    
3540
  return result
3541

    
3542

    
3543
def _ComputeDiskSize(disk_template, disks):
3544
  """Compute disk size requirements in the volume group
3545

3546
  """
3547
  # Required free disk space as a function of disk and swap space
3548
  req_size_dict = {
3549
    constants.DT_DISKLESS: None,
3550
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3551
    # 128 MB are added for drbd metadata for each disk
3552
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3553
    constants.DT_FILE: None,
3554
  }
3555

    
3556
  if disk_template not in req_size_dict:
3557
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3558
                                 " is unknown" %  disk_template)
3559

    
3560
  return req_size_dict[disk_template]
3561

    
3562

    
3563
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3564
  """Hypervisor parameter validation.
3565

3566
  This function abstract the hypervisor parameter validation to be
3567
  used in both instance create and instance modify.
3568

3569
  @type lu: L{LogicalUnit}
3570
  @param lu: the logical unit for which we check
3571
  @type nodenames: list
3572
  @param nodenames: the list of nodes on which we should check
3573
  @type hvname: string
3574
  @param hvname: the name of the hypervisor we should use
3575
  @type hvparams: dict
3576
  @param hvparams: the parameters which we need to check
3577
  @raise errors.OpPrereqError: if the parameters are not valid
3578

3579
  """
3580
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3581
                                                  hvname,
3582
                                                  hvparams)
3583
  for node in nodenames:
3584
    info = hvinfo[node]
3585
    info.Raise()
3586
    if not info.data or not isinstance(info.data, (tuple, list)):
3587
      raise errors.OpPrereqError("Cannot get current information"
3588
                                 " from node '%s' (%s)" % (node, info.data))
3589
    if not info.data[0]:
3590
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3591
                                 " %s" % info.data[1])
3592

    
3593

    
3594
class LUCreateInstance(LogicalUnit):
3595
  """Create an instance.
3596

3597
  """
3598
  HPATH = "instance-add"
3599
  HTYPE = constants.HTYPE_INSTANCE
3600
  _OP_REQP = ["instance_name", "disks", "disk_template",
3601
              "mode", "start",
3602
              "wait_for_sync", "ip_check", "nics",
3603
              "hvparams", "beparams"]
3604
  REQ_BGL = False
3605

    
3606
  def _ExpandNode(self, node):
3607
    """Expands and checks one node name.
3608

3609
    """
3610
    node_full = self.cfg.ExpandNodeName(node)
3611
    if node_full is None:
3612
      raise errors.OpPrereqError("Unknown node %s" % node)
3613
    return node_full
3614

    
3615
  def ExpandNames(self):
3616
    """ExpandNames for CreateInstance.
3617

3618
    Figure out the right locks for instance creation.
3619

3620
    """
3621
    self.needed_locks = {}
3622

    
3623
    # set optional parameters to none if they don't exist
3624
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3625
      if not hasattr(self.op, attr):
3626
        setattr(self.op, attr, None)
3627

    
3628
    # cheap checks, mostly valid constants given
3629

    
3630
    # verify creation mode
3631
    if self.op.mode not in (constants.INSTANCE_CREATE,
3632
                            constants.INSTANCE_IMPORT):
3633
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3634
                                 self.op.mode)
3635

    
3636
    # disk template and mirror node verification
3637
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3638
      raise errors.OpPrereqError("Invalid disk template name")
3639

    
3640
    if self.op.hypervisor is None:
3641
      self.op.hypervisor = self.cfg.GetHypervisorType()
3642

    
3643
    cluster = self.cfg.GetClusterInfo()
3644
    enabled_hvs = cluster.enabled_hypervisors
3645
    if self.op.hypervisor not in enabled_hvs:
3646
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3647
                                 " cluster (%s)" % (self.op.hypervisor,
3648
                                  ",".join(enabled_hvs)))
3649

    
3650
    # check hypervisor parameter syntax (locally)
3651

    
3652
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3653
                                  self.op.hvparams)
3654
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3655
    hv_type.CheckParameterSyntax(filled_hvp)
3656

    
3657
    # fill and remember the beparams dict
3658
    utils.CheckBEParams(self.op.beparams)
3659
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3660
                                    self.op.beparams)
3661

    
3662
    #### instance parameters check
3663

    
3664
    # instance name verification
3665
    hostname1 = utils.HostInfo(self.op.instance_name)
3666
    self.op.instance_name = instance_name = hostname1.name
3667

    
3668
    # this is just a preventive check, but someone might still add this
3669
    # instance in the meantime, and creation will fail at lock-add time
3670
    if instance_name in self.cfg.GetInstanceList():
3671
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3672
                                 instance_name)
3673

    
3674
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3675

    
3676
    # NIC buildup
3677
    self.nics = []
3678
    for nic in self.op.nics:
3679
      # ip validity checks
3680
      ip = nic.get("ip", None)
3681
      if ip is None or ip.lower() == "none":
3682
        nic_ip = None
3683
      elif ip.lower() == constants.VALUE_AUTO:
3684
        nic_ip = hostname1.ip
3685
      else:
3686
        if not utils.IsValidIP(ip):
3687
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3688
                                     " like a valid IP" % ip)
3689
        nic_ip = ip
3690

    
3691
      # MAC address verification
3692
      mac = nic.get("mac", constants.VALUE_AUTO)
3693
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3694
        if not utils.IsValidMac(mac.lower()):
3695
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3696
                                     mac)
3697
      # bridge verification
3698
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3699
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3700

    
3701
    # disk checks/pre-build
3702
    self.disks = []
3703
    for disk in self.op.disks:
3704
      mode = disk.get("mode", constants.DISK_RDWR)
3705
      if mode not in constants.DISK_ACCESS_SET:
3706
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3707
                                   mode)
3708
      size = disk.get("size", None)
3709
      if size is None:
3710
        raise errors.OpPrereqError("Missing disk size")
3711
      try:
3712
        size = int(size)
3713
      except ValueError:
3714
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3715
      self.disks.append({"size": size, "mode": mode})
3716

    
3717
    # used in CheckPrereq for ip ping check
3718
    self.check_ip = hostname1.ip
3719

    
3720
    # file storage checks
3721
    if (self.op.file_driver and
3722
        not self.op.file_driver in constants.FILE_DRIVER):
3723
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3724
                                 self.op.file_driver)
3725

    
3726
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3727
      raise errors.OpPrereqError("File storage directory path not absolute")
3728

    
3729
    ### Node/iallocator related checks
3730
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3731
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3732
                                 " node must be given")
3733

    
3734
    if self.op.iallocator:
3735
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3736
    else:
3737
      self.op.pnode = self._ExpandNode(self.op.pnode)
3738
      nodelist = [self.op.pnode]
3739
      if self.op.snode is not None:
3740
        self.op.snode = self._ExpandNode(self.op.snode)
3741
        nodelist.append(self.op.snode)
3742
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3743

    
3744
    # in case of import lock the source node too
3745
    if self.op.mode == constants.INSTANCE_IMPORT:
3746
      src_node = getattr(self.op, "src_node", None)
3747
      src_path = getattr(self.op, "src_path", None)
3748

    
3749
      if src_path is None:
3750
        self.op.src_path = src_path = self.op.instance_name
3751

    
3752
      if src_node is None:
3753
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3754
        self.op.src_node = None
3755
        if os.path.isabs(src_path):
3756
          raise errors.OpPrereqError("Importing an instance from an absolute"
3757
                                     " path requires a source node option.")
3758
      else:
3759
        self.op.src_node = src_node = self._ExpandNode(src_node)
3760
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3761
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
3762
        if not os.path.isabs(src_path):
3763
          self.op.src_path = src_path = \
3764
            os.path.join(constants.EXPORT_DIR, src_path)
3765

    
3766
    else: # INSTANCE_CREATE
3767
      if getattr(self.op, "os_type", None) is None:
3768
        raise errors.OpPrereqError("No guest OS specified")
3769

    
3770
  def _RunAllocator(self):
3771
    """Run the allocator based on input opcode.
3772

3773
    """
3774
    nics = [n.ToDict() for n in self.nics]
3775
    ial = IAllocator(self,
3776
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3777
                     name=self.op.instance_name,
3778
                     disk_template=self.op.disk_template,
3779
                     tags=[],
3780
                     os=self.op.os_type,
3781
                     vcpus=self.be_full[constants.BE_VCPUS],
3782
                     mem_size=self.be_full[constants.BE_MEMORY],
3783
                     disks=self.disks,
3784
                     nics=nics,
3785
                     hypervisor=self.op.hypervisor,
3786
                     )
3787

    
3788
    ial.Run(self.op.iallocator)
3789

    
3790
    if not ial.success:
3791
      raise errors.OpPrereqError("Can't compute nodes using"
3792
                                 " iallocator '%s': %s" % (self.op.iallocator,
3793
                                                           ial.info))
3794
    if len(ial.nodes) != ial.required_nodes:
3795
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3796
                                 " of nodes (%s), required %s" %
3797
                                 (self.op.iallocator, len(ial.nodes),
3798
                                  ial.required_nodes))
3799
    self.op.pnode = ial.nodes[0]
3800
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3801
                 self.op.instance_name, self.op.iallocator,
3802
                 ", ".join(ial.nodes))
3803
    if ial.required_nodes == 2:
3804
      self.op.snode = ial.nodes[1]
3805

    
3806
  def BuildHooksEnv(self):
3807
    """Build hooks env.
3808

3809
    This runs on master, primary and secondary nodes of the instance.
3810

3811
    """
3812
    env = {
3813
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3814
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3815
      "INSTANCE_ADD_MODE": self.op.mode,
3816
      }
3817
    if self.op.mode == constants.INSTANCE_IMPORT:
3818
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3819
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3820
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3821

    
3822
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3823
      primary_node=self.op.pnode,
3824
      secondary_nodes=self.secondaries,
3825
      status=self.instance_status,
3826
      os_type=self.op.os_type,
3827
      memory=self.be_full[constants.BE_MEMORY],
3828
      vcpus=self.be_full[constants.BE_VCPUS],
3829
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3830
    ))
3831

    
3832
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3833
          self.secondaries)
3834
    return env, nl, nl
3835

    
3836

    
3837
  def CheckPrereq(self):
3838
    """Check prerequisites.
3839

3840
    """
3841
    if (not self.cfg.GetVGName() and
3842
        self.op.disk_template not in constants.DTS_NOT_LVM):
3843
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3844
                                 " instances")
3845

    
3846

    
3847
    if self.op.mode == constants.INSTANCE_IMPORT:
3848
      src_node = self.op.src_node
3849
      src_path = self.op.src_path
3850

    
3851
      if src_node is None:
3852
        exp_list = self.rpc.call_export_list(
3853
          self.acquired_locks[locking.LEVEL_NODE])
3854
        found = False
3855
        for node in exp_list:
3856
          if not exp_list[node].failed and src_path in exp_list[node].data:
3857
            found = True
3858
            self.op.src_node = src_node = node
3859
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3860
                                                       src_path)
3861
            break
3862
        if not found:
3863
          raise errors.OpPrereqError("No export found for relative path %s" %
3864
                                      src_path)
3865

    
3866
      result = self.rpc.call_export_info(src_node, src_path)
3867
      result.Raise()
3868
      if not result.data:
3869
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3870

    
3871
      export_info = result.data
3872
      if not export_info.has_section(constants.INISECT_EXP):
3873
        raise errors.ProgrammerError("Corrupted export config")
3874

    
3875
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3876
      if (int(ei_version) != constants.EXPORT_VERSION):
3877
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3878
                                   (ei_version, constants.EXPORT_VERSION))
3879

    
3880
      # Check that the new instance doesn't have less disks than the export
3881
      instance_disks = len(self.disks)
3882
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3883
      if instance_disks < export_disks:
3884
        raise errors.OpPrereqError("Not enough disks to import."
3885
                                   " (instance: %d, export: %d)" %
3886
                                   (instance_disks, export_disks))
3887

    
3888
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3889
      disk_images = []
3890
      for idx in range(export_disks):
3891
        option = 'disk%d_dump' % idx
3892
        if export_info.has_option(constants.INISECT_INS, option):
3893
          # FIXME: are the old os-es, disk sizes, etc. useful?
3894
          export_name = export_info.get(constants.INISECT_INS, option)
3895
          image = os.path.join(src_path, export_name)
3896
          disk_images.append(image)
3897
        else:
3898
          disk_images.append(False)
3899

    
3900
      self.src_images = disk_images
3901

    
3902
      old_name = export_info.get(constants.INISECT_INS, 'name')
3903
      # FIXME: int() here could throw a ValueError on broken exports
3904
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3905
      if self.op.instance_name == old_name:
3906
        for idx, nic in enumerate(self.nics):
3907
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3908
            nic_mac_ini = 'nic%d_mac' % idx
3909
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3910

    
3911
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3912
    if self.op.start and not self.op.ip_check:
3913
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3914
                                 " adding an instance in start mode")
3915

    
3916
    if self.op.ip_check:
3917
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3918
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3919
                                   (self.check_ip, self.op.instance_name))
3920

    
3921
    #### allocator run
3922

    
3923
    if self.op.iallocator is not None:
3924
      self._RunAllocator()
3925

    
3926
    #### node related checks
3927

    
3928
    # check primary node
3929
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3930
    assert self.pnode is not None, \
3931
      "Cannot retrieve locked node %s" % self.op.pnode
3932
    self.secondaries = []
3933

    
3934
    # mirror node verification
3935
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3936
      if self.op.snode is None:
3937
        raise errors.OpPrereqError("The networked disk templates need"
3938
                                   " a mirror node")
3939
      if self.op.snode == pnode.name:
3940
        raise errors.OpPrereqError("The secondary node cannot be"
3941
                                   " the primary node.")
3942
      self.secondaries.append(self.op.snode)
3943

    
3944
    nodenames = [pnode.name] + self.secondaries
3945

    
3946
    req_size = _ComputeDiskSize(self.op.disk_template,
3947
                                self.disks)
3948

    
3949
    # Check lv size requirements
3950
    if req_size is not None:
3951
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3952
                                         self.op.hypervisor)
3953
      for node in nodenames:
3954
        info = nodeinfo[node]
3955
        info.Raise()
3956
        info = info.data
3957
        if not info:
3958
          raise errors.OpPrereqError("Cannot get current information"
3959
                                     " from node '%s'" % node)
3960
        vg_free = info.get('vg_free', None)
3961
        if not isinstance(vg_free, int):
3962
          raise errors.OpPrereqError("Can't compute free disk space on"
3963
                                     " node %s" % node)
3964
        if req_size > info['vg_free']:
3965
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3966
                                     " %d MB available, %d MB required" %
3967
                                     (node, info['vg_free'], req_size))
3968

    
3969
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3970

    
3971
    # os verification
3972
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3973
    result.Raise()
3974
    if not isinstance(result.data, objects.OS):
3975
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3976
                                 " primary node"  % self.op.os_type)
3977

    
3978
    # bridge check on primary node
3979
    bridges = [n.bridge for n in self.nics]
3980
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3981
    result.Raise()
3982
    if not result.data:
3983
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
3984
                                 " exist on destination node '%s'" %
3985
                                 (",".join(bridges), pnode.name))
3986

    
3987
    # memory check on primary node
3988
    if self.op.start:
3989
      _CheckNodeFreeMemory(self, self.pnode.name,
3990
                           "creating instance %s" % self.op.instance_name,
3991
                           self.be_full[constants.BE_MEMORY],
3992
                           self.op.hypervisor)
3993

    
3994
    if self.op.start:
3995
      self.instance_status = 'up'
3996
    else:
3997
      self.instance_status = 'down'
3998

    
3999
  def Exec(self, feedback_fn):
4000
    """Create and add the instance to the cluster.
4001

4002
    """
4003
    instance = self.op.instance_name
4004
    pnode_name = self.pnode.name
4005

    
4006
    for nic in self.nics:
4007
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4008
        nic.mac = self.cfg.GenerateMAC()
4009

    
4010
    ht_kind = self.op.hypervisor
4011
    if ht_kind in constants.HTS_REQ_PORT:
4012
      network_port = self.cfg.AllocatePort()
4013
    else:
4014
      network_port = None
4015

    
4016
    ##if self.op.vnc_bind_address is None:
4017
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4018

    
4019
    # this is needed because os.path.join does not accept None arguments
4020
    if self.op.file_storage_dir is None:
4021
      string_file_storage_dir = ""
4022
    else:
4023
      string_file_storage_dir = self.op.file_storage_dir
4024

    
4025
    # build the full file storage dir path
4026
    file_storage_dir = os.path.normpath(os.path.join(
4027
                                        self.cfg.GetFileStorageDir(),
4028
                                        string_file_storage_dir, instance))
4029

    
4030

    
4031
    disks = _GenerateDiskTemplate(self,
4032
                                  self.op.disk_template,
4033
                                  instance, pnode_name,
4034
                                  self.secondaries,
4035
                                  self.disks,
4036
                                  file_storage_dir,
4037
                                  self.op.file_driver,
4038
                                  0)
4039

    
4040
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4041
                            primary_node=pnode_name,
4042
                            nics=self.nics, disks=disks,
4043
                            disk_template=self.op.disk_template,
4044
                            status=self.instance_status,
4045
                            network_port=network_port,
4046
                            beparams=self.op.beparams,
4047
                            hvparams=self.op.hvparams,
4048
                            hypervisor=self.op.hypervisor,
4049
                            )
4050

    
4051
    feedback_fn("* creating instance disks...")
4052
    if not _CreateDisks(self, iobj):
4053
      _RemoveDisks(self, iobj)
4054
      self.cfg.ReleaseDRBDMinors(instance)
4055
      raise errors.OpExecError("Device creation failed, reverting...")
4056

    
4057
    feedback_fn("adding instance %s to cluster config" % instance)
4058

    
4059
    self.cfg.AddInstance(iobj)
4060
    # Declare that we don't want to remove the instance lock anymore, as we've
4061
    # added the instance to the config
4062
    del self.remove_locks[locking.LEVEL_INSTANCE]
4063
    # Remove the temp. assignements for the instance's drbds
4064
    self.cfg.ReleaseDRBDMinors(instance)
4065
    # Unlock all the nodes
4066
    if self.op.mode == constants.INSTANCE_IMPORT:
4067
      nodes_keep = [self.op.src_node]
4068
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4069
                       if node != self.op.src_node]
4070
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4071
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4072
    else:
4073
      self.context.glm.release(locking.LEVEL_NODE)
4074
      del self.acquired_locks[locking.LEVEL_NODE]
4075

    
4076
    if self.op.wait_for_sync:
4077
      disk_abort = not _WaitForSync(self, iobj)
4078
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4079
      # make sure the disks are not degraded (still sync-ing is ok)
4080
      time.sleep(15)
4081
      feedback_fn("* checking mirrors status")
4082
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4083
    else:
4084
      disk_abort = False
4085

    
4086
    if disk_abort:
4087
      _RemoveDisks(self, iobj)
4088
      self.cfg.RemoveInstance(iobj.name)
4089
      # Make sure the instance lock gets removed
4090
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4091
      raise errors.OpExecError("There are some degraded disks for"
4092
                               " this instance")
4093

    
4094
    feedback_fn("creating os for instance %s on node %s" %
4095
                (instance, pnode_name))
4096

    
4097
    if iobj.disk_template != constants.DT_DISKLESS:
4098
      if self.op.mode == constants.INSTANCE_CREATE:
4099
        feedback_fn("* running the instance OS create scripts...")
4100
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4101
        result.Raise()
4102
        if not result.data:
4103
          raise errors.OpExecError("Could not add os for instance %s"
4104
                                   " on node %s" %
4105
                                   (instance, pnode_name))
4106

    
4107
      elif self.op.mode == constants.INSTANCE_IMPORT:
4108
        feedback_fn("* running the instance OS import scripts...")
4109
        src_node = self.op.src_node
4110
        src_images = self.src_images
4111
        cluster_name = self.cfg.GetClusterName()
4112
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4113
                                                         src_node, src_images,
4114
                                                         cluster_name)
4115
        import_result.Raise()
4116
        for idx, result in enumerate(import_result.data):
4117
          if not result:
4118
            self.LogWarning("Could not import the image %s for instance"
4119
                            " %s, disk %d, on node %s" %
4120
                            (src_images[idx], instance, idx, pnode_name))
4121
      else:
4122
        # also checked in the prereq part
4123
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4124
                                     % self.op.mode)
4125

    
4126
    if self.op.start:
4127
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4128
      feedback_fn("* starting instance...")
4129
      result = self.rpc.call_instance_start(pnode_name, iobj, None)
4130
      result.Raise()
4131
      if not result.data:
4132
        raise errors.OpExecError("Could not start instance")
4133

    
4134

    
4135
class LUConnectConsole(NoHooksLU):
4136
  """Connect to an instance's console.
4137

4138
  This is somewhat special in that it returns the command line that
4139
  you need to run on the master node in order to connect to the
4140
  console.
4141

4142
  """
4143
  _OP_REQP = ["instance_name"]
4144
  REQ_BGL = False
4145

    
4146
  def ExpandNames(self):
4147
    self._ExpandAndLockInstance()
4148

    
4149
  def CheckPrereq(self):
4150
    """Check prerequisites.
4151

4152
    This checks that the instance is in the cluster.
4153

4154
    """
4155
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4156
    assert self.instance is not None, \
4157
      "Cannot retrieve locked instance %s" % self.op.instance_name
4158

    
4159
  def Exec(self, feedback_fn):
4160
    """Connect to the console of an instance
4161

4162
    """
4163
    instance = self.instance
4164
    node = instance.primary_node
4165

    
4166
    node_insts = self.rpc.call_instance_list([node],
4167
                                             [instance.hypervisor])[node]
4168
    node_insts.Raise()
4169

    
4170
    if instance.name not in node_insts.data:
4171
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4172

    
4173
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4174

    
4175
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4176
    console_cmd = hyper.GetShellCommandForConsole(instance)
4177

    
4178
    # build ssh cmdline
4179
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4180

    
4181

    
4182
class LUReplaceDisks(LogicalUnit):
4183
  """Replace the disks of an instance.
4184

4185
  """
4186
  HPATH = "mirrors-replace"
4187
  HTYPE = constants.HTYPE_INSTANCE
4188
  _OP_REQP = ["instance_name", "mode", "disks"]
4189
  REQ_BGL = False
4190

    
4191
  def ExpandNames(self):
4192
    self._ExpandAndLockInstance()
4193

    
4194
    if not hasattr(self.op, "remote_node"):
4195
      self.op.remote_node = None
4196

    
4197
    ia_name = getattr(self.op, "iallocator", None)
4198
    if ia_name is not None:
4199
      if self.op.remote_node is not None:
4200
        raise errors.OpPrereqError("Give either the iallocator or the new"
4201
                                   " secondary, not both")
4202
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4203
    elif self.op.remote_node is not None:
4204
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4205
      if remote_node is None:
4206
        raise errors.OpPrereqError("Node '%s' not known" %
4207
                                   self.op.remote_node)
4208
      self.op.remote_node = remote_node
4209
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4210
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4211
    else:
4212
      self.needed_locks[locking.LEVEL_NODE] = []
4213
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4214

    
4215
  def DeclareLocks(self, level):
4216
    # If we're not already locking all nodes in the set we have to declare the
4217
    # instance's primary/secondary nodes.
4218
    if (level == locking.LEVEL_NODE and
4219
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4220
      self._LockInstancesNodes()
4221

    
4222
  def _RunAllocator(self):
4223
    """Compute a new secondary node using an IAllocator.
4224

4225
    """
4226
    ial = IAllocator(self,
4227
                     mode=constants.IALLOCATOR_MODE_RELOC,
4228
                     name=self.op.instance_name,
4229
                     relocate_from=[self.sec_node])
4230

    
4231
    ial.Run(self.op.iallocator)
4232

    
4233
    if not ial.success:
4234
      raise errors.OpPrereqError("Can't compute nodes using"
4235
                                 " iallocator '%s': %s" % (self.op.iallocator,
4236
                                                           ial.info))
4237
    if len(ial.nodes) != ial.required_nodes:
4238
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4239
                                 " of nodes (%s), required %s" %
4240
                                 (len(ial.nodes), ial.required_nodes))
4241
    self.op.remote_node = ial.nodes[0]
4242
    self.LogInfo("Selected new secondary for the instance: %s",
4243
                 self.op.remote_node)
4244

    
4245
  def BuildHooksEnv(self):
4246
    """Build hooks env.
4247

4248
    This runs on the master, the primary and all the secondaries.
4249

4250
    """
4251
    env = {
4252
      "MODE": self.op.mode,
4253
      "NEW_SECONDARY": self.op.remote_node,
4254
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4255
      }
4256
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4257
    nl = [
4258
      self.cfg.GetMasterNode(),
4259
      self.instance.primary_node,
4260
      ]
4261
    if self.op.remote_node is not None:
4262
      nl.append(self.op.remote_node)
4263
    return env, nl, nl
4264

    
4265
  def CheckPrereq(self):
4266
    """Check prerequisites.
4267

4268
    This checks that the instance is in the cluster.
4269

4270
    """
4271
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4272
    assert instance is not None, \
4273
      "Cannot retrieve locked instance %s" % self.op.instance_name
4274
    self.instance = instance
4275

    
4276
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4277
      raise errors.OpPrereqError("Instance's disk layout is not"
4278
                                 " network mirrored.")
4279

    
4280
    if len(instance.secondary_nodes) != 1:
4281
      raise errors.OpPrereqError("The instance has a strange layout,"
4282
                                 " expected one secondary but found %d" %
4283
                                 len(instance.secondary_nodes))
4284

    
4285
    self.sec_node = instance.secondary_nodes[0]
4286

    
4287
    ia_name = getattr(self.op, "iallocator", None)
4288
    if ia_name is not None:
4289
      self._RunAllocator()
4290

    
4291
    remote_node = self.op.remote_node
4292
    if remote_node is not None:
4293
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4294
      assert self.remote_node_info is not None, \
4295
        "Cannot retrieve locked node %s" % remote_node
4296
    else:
4297
      self.remote_node_info = None
4298
    if remote_node == instance.primary_node:
4299
      raise errors.OpPrereqError("The specified node is the primary node of"
4300
                                 " the instance.")
4301
    elif remote_node == self.sec_node:
4302
      if self.op.mode == constants.REPLACE_DISK_SEC:
4303
        # this is for DRBD8, where we can't execute the same mode of
4304
        # replacement as for drbd7 (no different port allocated)
4305
        raise errors.OpPrereqError("Same secondary given, cannot execute"
4306
                                   " replacement")
4307
    if instance.disk_template == constants.DT_DRBD8:
4308
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4309
          remote_node is not None):
4310
        # switch to replace secondary mode
4311
        self.op.mode = constants.REPLACE_DISK_SEC
4312

    
4313
      if self.op.mode == constants.REPLACE_DISK_ALL:
4314
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4315
                                   " secondary disk replacement, not"
4316
                                   " both at once")
4317
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4318
        if remote_node is not None:
4319
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4320
                                     " the secondary while doing a primary"
4321
                                     " node disk replacement")
4322
        self.tgt_node = instance.primary_node
4323
        self.oth_node = instance.secondary_nodes[0]
4324
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4325
        self.new_node = remote_node # this can be None, in which case
4326
                                    # we don't change the secondary
4327
        self.tgt_node = instance.secondary_nodes[0]
4328
        self.oth_node = instance.primary_node
4329
      else:
4330
        raise errors.ProgrammerError("Unhandled disk replace mode")
4331

    
4332
    if not self.op.disks:
4333
      self.op.disks = range(len(instance.disks))
4334

    
4335
    for disk_idx in self.op.disks:
4336
      instance.FindDisk(disk_idx)
4337

    
4338
  def _ExecD8DiskOnly(self, feedback_fn):
4339
    """Replace a disk on the primary or secondary for dbrd8.
4340

4341
    The algorithm for replace is quite complicated:
4342

4343
      1. for each disk to be replaced:
4344

4345
        1. create new LVs on the target node with unique names
4346
        1. detach old LVs from the drbd device
4347
        1. rename old LVs to name_replaced.<time_t>
4348
        1. rename new LVs to old LVs
4349
        1. attach the new LVs (with the old names now) to the drbd device
4350

4351
      1. wait for sync across all devices
4352

4353
      1. for each modified disk:
4354

4355
        1. remove old LVs (which have the name name_replaces.<time_t>)
4356

4357
    Failures are not very well handled.
4358

4359
    """
4360
    steps_total = 6
4361
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4362
    instance = self.instance
4363
    iv_names = {}
4364
    vgname = self.cfg.GetVGName()
4365
    # start of work
4366
    cfg = self.cfg
4367
    tgt_node = self.tgt_node
4368
    oth_node = self.oth_node
4369

    
4370
    # Step: check device activation
4371
    self.proc.LogStep(1, steps_total, "check device existence")
4372
    info("checking volume groups")
4373
    my_vg = cfg.GetVGName()
4374
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4375
    if not results:
4376
      raise errors.OpExecError("Can't list volume groups on the nodes")
4377
    for node in oth_node, tgt_node:
4378
      res = results[node]
4379
      if res.failed or not res.data or my_vg not in res.data:
4380
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4381
                                 (my_vg, node))
4382
    for idx, dev in enumerate(instance.disks):
4383
      if idx not in self.op.disks:
4384
        continue
4385
      for node in tgt_node, oth_node:
4386
        info("checking disk/%d on %s" % (idx, node))
4387
        cfg.SetDiskID(dev, node)
4388
        if not self.rpc.call_blockdev_find(node, dev):
4389
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4390
                                   (idx, node))
4391

    
4392
    # Step: check other node consistency
4393
    self.proc.LogStep(2, steps_total, "check peer consistency")
4394
    for idx, dev in enumerate(instance.disks):
4395
      if idx not in self.op.disks:
4396
        continue
4397
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4398
      if not _CheckDiskConsistency(self, dev, oth_node,
4399
                                   oth_node==instance.primary_node):
4400
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4401
                                 " to replace disks on this node (%s)" %
4402
                                 (oth_node, tgt_node))
4403

    
4404
    # Step: create new storage
4405
    self.proc.LogStep(3, steps_total, "allocate new storage")
4406
    for idx, dev in enumerate(instance.disks):
4407
      if idx not in self.op.disks:
4408
        continue
4409
      size = dev.size
4410
      cfg.SetDiskID(dev, tgt_node)
4411
      lv_names = [".disk%d_%s" % (idx, suf)
4412
                  for suf in ["data", "meta"]]
4413
      names = _GenerateUniqueNames(self, lv_names)
4414
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4415
                             logical_id=(vgname, names[0]))
4416
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4417
                             logical_id=(vgname, names[1]))
4418
      new_lvs = [lv_data, lv_meta]
4419
      old_lvs = dev.children
4420
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4421
      info("creating new local storage on %s for %s" %
4422
           (tgt_node, dev.iv_name))
4423
      # since we *always* want to create this LV, we use the
4424
      # _Create...OnPrimary (which forces the creation), even if we
4425
      # are talking about the secondary node
4426
      for new_lv in new_lvs:
4427
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4428
                                        _GetInstanceInfoText(instance)):
4429
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4430
                                   " node '%s'" %
4431
                                   (new_lv.logical_id[1], tgt_node))
4432

    
4433
    # Step: for each lv, detach+rename*2+attach
4434
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4435
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4436
      info("detaching %s drbd from local storage" % dev.iv_name)
4437
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4438
      result.Raise()
4439
      if not result.data:
4440
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4441
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4442
      #dev.children = []
4443
      #cfg.Update(instance)
4444

    
4445
      # ok, we created the new LVs, so now we know we have the needed
4446
      # storage; as such, we proceed on the target node to rename
4447
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4448
      # using the assumption that logical_id == physical_id (which in
4449
      # turn is the unique_id on that node)
4450

    
4451
      # FIXME(iustin): use a better name for the replaced LVs
4452
      temp_suffix = int(time.time())
4453
      ren_fn = lambda d, suff: (d.physical_id[0],
4454
                                d.physical_id[1] + "_replaced-%s" % suff)
4455
      # build the rename list based on what LVs exist on the node
4456
      rlist = []
4457
      for to_ren in old_lvs:
4458
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4459
        if not find_res.failed and find_res.data is not None: # device exists
4460
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4461

    
4462
      info("renaming the old LVs on the target node")
4463
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4464
      result.Raise()
4465
      if not result.data:
4466
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4467
      # now we rename the new LVs to the old LVs
4468
      info("renaming the new LVs on the target node")
4469
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4470
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4471
      result.Raise()
4472
      if not result.data:
4473
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4474

    
4475
      for old, new in zip(old_lvs, new_lvs):
4476
        new.logical_id = old.logical_id
4477
        cfg.SetDiskID(new, tgt_node)
4478

    
4479
      for disk in old_lvs:
4480
        disk.logical_id = ren_fn(disk, temp_suffix)
4481
        cfg.SetDiskID(disk, tgt_node)
4482

    
4483
      # now that the new lvs have the old name, we can add them to the device
4484
      info("adding new mirror component on %s" % tgt_node)
4485
      result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4486
      if result.failed or not result.data:
4487
        for new_lv in new_lvs:
4488
          result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4489
          if result.failed or not result.data:
4490
            warning("Can't rollback device %s", hint="manually cleanup unused"
4491
                    " logical volumes")
4492
        raise errors.OpExecError("Can't add local storage to drbd")
4493

    
4494
      dev.children = new_lvs
4495
      cfg.Update(instance)
4496

    
4497
    # Step: wait for sync
4498

    
4499
    # this can fail as the old devices are degraded and _WaitForSync
4500
    # does a combined result over all disks, so we don't check its
4501
    # return value
4502
    self.proc.LogStep(5, steps_total, "sync devices")
4503
    _WaitForSync(self, instance, unlock=True)
4504

    
4505
    # so check manually all the devices
4506
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4507
      cfg.SetDiskID(dev, instance.primary_node)
4508
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4509
      if result.failed or result.data[5]:
4510
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4511

    
4512
    # Step: remove old storage
4513
    self.proc.LogStep(6, steps_total, "removing old storage")
4514
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4515
      info("remove logical volumes for %s" % name)
4516
      for lv in old_lvs:
4517
        cfg.SetDiskID(lv, tgt_node)
4518
        result = self.rpc.call_blockdev_remove(tgt_node, lv)
4519
        if result.failed or not result.data:
4520
          warning("Can't remove old LV", hint="manually remove unused LVs")
4521
          continue
4522

    
4523
  def _ExecD8Secondary(self, feedback_fn):
4524
    """Replace the secondary node for drbd8.
4525

4526
    The algorithm for replace is quite complicated:
4527
      - for all disks of the instance:
4528
        - create new LVs on the new node with same names
4529
        - shutdown the drbd device on the old secondary
4530
        - disconnect the drbd network on the primary
4531
        - create the drbd device on the new secondary
4532
        - network attach the drbd on the primary, using an artifice:
4533
          the drbd code for Attach() will connect to the network if it
4534
          finds a device which is connected to the good local disks but
4535
          not network enabled
4536
      - wait for sync across all devices
4537
      - remove all disks from the old secondary
4538

4539
    Failures are not very well handled.
4540

4541
    """
4542
    steps_total = 6
4543
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4544
    instance = self.instance
4545
    iv_names = {}
4546
    vgname = self.cfg.GetVGName()
4547
    # start of work
4548
    cfg = self.cfg
4549
    old_node = self.tgt_node
4550
    new_node = self.new_node
4551
    pri_node = instance.primary_node
4552

    
4553
    # Step: check device activation
4554
    self.proc.LogStep(1, steps_total, "check device existence")
4555
    info("checking volume groups")
4556
    my_vg = cfg.GetVGName()
4557
    results = self.rpc.call_vg_list([pri_node, new_node])
4558
    for node in pri_node, new_node:
4559
      res = results[node]
4560
      if res.failed or not res.data or my_vg not in res.data:
4561
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4562
                                 (my_vg, node))
4563
    for idx, dev in enumerate(instance.disks):
4564
      if idx not in self.op.disks:
4565
        continue
4566
      info("checking disk/%d on %s" % (idx, pri_node))
4567
      cfg.SetDiskID(dev, pri_node)
4568
      result = self.rpc.call_blockdev_find(pri_node, dev)
4569
      result.Raise()
4570
      if not result.data:
4571
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4572
                                 (idx, pri_node))
4573

    
4574
    # Step: check other node consistency
4575
    self.proc.LogStep(2, steps_total, "check peer consistency")
4576
    for idx, dev in enumerate(instance.disks):
4577
      if idx not in self.op.disks:
4578
        continue
4579
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4580
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4581
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4582
                                 " unsafe to replace the secondary" %
4583
                                 pri_node)
4584

    
4585
    # Step: create new storage
4586
    self.proc.LogStep(3, steps_total, "allocate new storage")
4587
    for idx, dev in enumerate(instance.disks):
4588
      size = dev.size
4589
      info("adding new local storage on %s for disk/%d" %
4590
           (new_node, idx))
4591
      # since we *always* want to create this LV, we use the
4592
      # _Create...OnPrimary (which forces the creation), even if we
4593
      # are talking about the secondary node
4594
      for new_lv in dev.children:
4595
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4596
                                        _GetInstanceInfoText(instance)):
4597
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4598
                                   " node '%s'" %
4599
                                   (new_lv.logical_id[1], new_node))
4600

    
4601
    # Step 4: dbrd minors and drbd setups changes
4602
    # after this, we must manually remove the drbd minors on both the
4603
    # error and the success paths
4604
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4605
                                   instance.name)
4606
    logging.debug("Allocated minors %s" % (minors,))
4607
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4608
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4609
      size = dev.size
4610
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4611
      # create new devices on new_node
4612
      if pri_node == dev.logical_id[0]:
4613
        new_logical_id = (pri_node, new_node,
4614
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4615
                          dev.logical_id[5])
4616
      else:
4617
        new_logical_id = (new_node, pri_node,
4618
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4619
                          dev.logical_id[5])
4620
      iv_names[idx] = (dev, dev.children, new_logical_id)
4621
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4622
                    new_logical_id)
4623
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4624
                              logical_id=new_logical_id,
4625
                              children=dev.children)
4626
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4627
                                        new_drbd, False,
4628
                                        _GetInstanceInfoText(instance)):
4629
        self.cfg.ReleaseDRBDMinors(instance.name)
4630
        raise errors.OpExecError("Failed to create new DRBD on"
4631
                                 " node '%s'" % new_node)
4632

    
4633
    for idx, dev in enumerate(instance.disks):
4634
      # we have new devices, shutdown the drbd on the old secondary
4635
      info("shutting down drbd for disk/%d on old node" % idx)
4636
      cfg.SetDiskID(dev, old_node)
4637
      result = self.rpc.call_blockdev_shutdown(old_node, dev)
4638
      if result.failed or not result.data:
4639
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4640
                hint="Please cleanup this device manually as soon as possible")
4641

    
4642
    info("detaching primary drbds from the network (=> standalone)")
4643
    done = 0
4644
    for idx, dev in enumerate(instance.disks):
4645
      cfg.SetDiskID(dev, pri_node)
4646
      # set the network part of the physical (unique in bdev terms) id
4647
      # to None, meaning detach from network
4648
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4649
      # and 'find' the device, which will 'fix' it to match the
4650
      # standalone state
4651
      result = self.rpc.call_blockdev_find(pri_node, dev)
4652
      if not result.failed and result.data:
4653
        done += 1
4654
      else:
4655
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4656
                idx)
4657

    
4658
    if not done:
4659
      # no detaches succeeded (very unlikely)
4660
      self.cfg.ReleaseDRBDMinors(instance.name)
4661
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4662

    
4663
    # if we managed to detach at least one, we update all the disks of
4664
    # the instance to point to the new secondary
4665
    info("updating instance configuration")
4666
    for dev, _, new_logical_id in iv_names.itervalues():
4667
      dev.logical_id = new_logical_id
4668
      cfg.SetDiskID(dev, pri_node)
4669
    cfg.Update(instance)
4670
    # we can remove now the temp minors as now the new values are
4671
    # written to the config file (and therefore stable)
4672
    self.cfg.ReleaseDRBDMinors(instance.name)
4673

    
4674
    # and now perform the drbd attach
4675
    info("attaching primary drbds to new secondary (standalone => connected)")
4676
    failures = []
4677
    for idx, dev in enumerate(instance.disks):
4678
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4679
      # since the attach is smart, it's enough to 'find' the device,
4680
      # it will automatically activate the network, if the physical_id
4681
      # is correct
4682
      cfg.SetDiskID(dev, pri_node)
4683
      logging.debug("Disk to attach: %s", dev)
4684
      result = self.rpc.call_blockdev_find(pri_node, dev)
4685
      if result.failed or not result.data:
4686
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4687
                "please do a gnt-instance info to see the status of disks")
4688

    
4689
    # this can fail as the old devices are degraded and _WaitForSync
4690
    # does a combined result over all disks, so we don't check its
4691
    # return value
4692
    self.proc.LogStep(5, steps_total, "sync devices")
4693
    _WaitForSync(self, instance, unlock=True)
4694

    
4695
    # so check manually all the devices
4696
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4697
      cfg.SetDiskID(dev, pri_node)
4698
      result = self.rpc.call_blockdev_find(pri_node, dev)
4699
      result.Raise()
4700
      if result.data[5]:
4701
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4702

    
4703
    self.proc.LogStep(6, steps_total, "removing old storage")
4704
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4705
      info("remove logical volumes for disk/%d" % idx)
4706
      for lv in old_lvs:
4707
        cfg.SetDiskID(lv, old_node)
4708
        result = self.rpc.call_blockdev_remove(old_node, lv)
4709
        if result.failed or not result.data:
4710
          warning("Can't remove LV on old secondary",
4711
                  hint="Cleanup stale volumes by hand")
4712

    
4713
  def Exec(self, feedback_fn):
4714
    """Execute disk replacement.
4715

4716
    This dispatches the disk replacement to the appropriate handler.
4717

4718
    """
4719
    instance = self.instance
4720

    
4721
    # Activate the instance disks if we're replacing them on a down instance
4722
    if instance.status == "down":
4723
      _StartInstanceDisks(self, instance, True)
4724

    
4725
    if instance.disk_template == constants.DT_DRBD8:
4726
      if self.op.remote_node is None:
4727
        fn = self._ExecD8DiskOnly
4728
      else:
4729
        fn = self._ExecD8Secondary
4730
    else:
4731
      raise errors.ProgrammerError("Unhandled disk replacement case")
4732

    
4733
    ret = fn(feedback_fn)
4734

    
4735
    # Deactivate the instance disks if we're replacing them on a down instance
4736
    if instance.status == "down":
4737
      _SafeShutdownInstanceDisks(self, instance)
4738

    
4739
    return ret
4740

    
4741

    
4742
class LUGrowDisk(LogicalUnit):
4743
  """Grow a disk of an instance.
4744

4745
  """
4746
  HPATH = "disk-grow"
4747
  HTYPE = constants.HTYPE_INSTANCE
4748
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4749
  REQ_BGL = False
4750

    
4751
  def ExpandNames(self):
4752
    self._ExpandAndLockInstance()
4753
    self.needed_locks[locking.LEVEL_NODE] = []
4754
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4755

    
4756
  def DeclareLocks(self, level):
4757
    if level == locking.LEVEL_NODE:
4758
      self._LockInstancesNodes()
4759

    
4760
  def BuildHooksEnv(self):
4761
    """Build hooks env.
4762

4763
    This runs on the master, the primary and all the secondaries.
4764

4765
    """
4766
    env = {
4767
      "DISK": self.op.disk,
4768
      "AMOUNT": self.op.amount,
4769
      }
4770
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4771
    nl = [
4772
      self.cfg.GetMasterNode(),
4773
      self.instance.primary_node,
4774
      ]
4775
    return env, nl, nl
4776

    
4777
  def CheckPrereq(self):
4778
    """Check prerequisites.
4779

4780
    This checks that the instance is in the cluster.
4781

4782
    """
4783
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4784
    assert instance is not None, \
4785
      "Cannot retrieve locked instance %s" % self.op.instance_name
4786

    
4787
    self.instance = instance
4788

    
4789
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4790
      raise errors.OpPrereqError("Instance's disk layout does not support"
4791
                                 " growing.")
4792

    
4793
    self.disk = instance.FindDisk(self.op.disk)
4794

    
4795
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4796
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4797
                                       instance.hypervisor)
4798
    for node in nodenames:
4799
      info = nodeinfo[node]
4800
      if info.failed or not info.data:
4801
        raise errors.OpPrereqError("Cannot get current information"
4802
                                   " from node '%s'" % node)
4803
      vg_free = info.data.get('vg_free', None)
4804
      if not isinstance(vg_free, int):
4805
        raise errors.OpPrereqError("Can't compute free disk space on"
4806
                                   " node %s" % node)
4807
      if self.op.amount > vg_free:
4808
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4809
                                   " %d MiB available, %d MiB required" %
4810
                                   (node, vg_free, self.op.amount))
4811

    
4812
  def Exec(self, feedback_fn):
4813
    """Execute disk grow.
4814

4815
    """
4816
    instance = self.instance
4817
    disk = self.disk
4818
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4819
      self.cfg.SetDiskID(disk, node)
4820
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4821
      result.Raise()
4822
      if (not result.data or not isinstance(result.data, (list, tuple)) or
4823
          len(result.data) != 2):
4824
        raise errors.OpExecError("Grow request failed to node %s" % node)
4825
      elif not result.data[0]:
4826
        raise errors.OpExecError("Grow request failed to node %s: %s" %
4827
                                 (node, result.data[1]))
4828
    disk.RecordGrow(self.op.amount)
4829
    self.cfg.Update(instance)
4830
    if self.op.wait_for_sync:
4831
      disk_abort = not _WaitForSync(self, instance)
4832
      if disk_abort:
4833
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4834
                             " status.\nPlease check the instance.")
4835

    
4836

    
4837
class LUQueryInstanceData(NoHooksLU):
4838
  """Query runtime instance data.
4839

4840
  """
4841
  _OP_REQP = ["instances", "static"]
4842
  REQ_BGL = False
4843

    
4844
  def ExpandNames(self):
4845
    self.needed_locks = {}
4846
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4847

    
4848
    if not isinstance(self.op.instances, list):
4849
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4850

    
4851
    if self.op.instances:
4852
      self.wanted_names = []
4853
      for name in self.op.instances:
4854
        full_name = self.cfg.ExpandInstanceName(name)
4855
        if full_name is None:
4856
          raise errors.OpPrereqError("Instance '%s' not known" %
4857
                                     self.op.instance_name)
4858
        self.wanted_names.append(full_name)
4859
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4860
    else:
4861
      self.wanted_names = None
4862
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4863

    
4864
    self.needed_locks[locking.LEVEL_NODE] = []
4865
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4866

    
4867
  def DeclareLocks(self, level):
4868
    if level == locking.LEVEL_NODE:
4869
      self._LockInstancesNodes()
4870

    
4871
  def CheckPrereq(self):
4872
    """Check prerequisites.
4873

4874
    This only checks the optional instance list against the existing names.
4875

4876
    """
4877
    if self.wanted_names is None:
4878
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4879

    
4880
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4881
                             in self.wanted_names]
4882
    return
4883

    
4884
  def _ComputeDiskStatus(self, instance, snode, dev):
4885
    """Compute block device status.
4886

4887
    """
4888
    static = self.op.static
4889
    if not static:
4890
      self.cfg.SetDiskID(dev, instance.primary_node)
4891
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4892
      dev_pstatus.Raise()
4893
      dev_pstatus = dev_pstatus.data
4894
    else:
4895
      dev_pstatus = None
4896

    
4897
    if dev.dev_type in constants.LDS_DRBD:
4898
      # we change the snode then (otherwise we use the one passed in)
4899
      if dev.logical_id[0] == instance.primary_node:
4900
        snode = dev.logical_id[1]
4901
      else:
4902
        snode = dev.logical_id[0]
4903

    
4904
    if snode and not static:
4905
      self.cfg.SetDiskID(dev, snode)
4906
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4907
      dev_sstatus.Raise()
4908
      dev_sstatus = dev_sstatus.data
4909
    else:
4910
      dev_sstatus = None
4911

    
4912
    if dev.children:
4913
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4914
                      for child in dev.children]
4915
    else:
4916
      dev_children = []
4917

    
4918
    data = {
4919
      "iv_name": dev.iv_name,
4920
      "dev_type": dev.dev_type,
4921
      "logical_id": dev.logical_id,
4922
      "physical_id": dev.physical_id,
4923
      "pstatus": dev_pstatus,
4924
      "sstatus": dev_sstatus,
4925
      "children": dev_children,
4926
      "mode": dev.mode,
4927
      }
4928

    
4929
    return data
4930

    
4931
  def Exec(self, feedback_fn):
4932
    """Gather and return data"""
4933
    result = {}
4934

    
4935
    cluster = self.cfg.GetClusterInfo()
4936

    
4937
    for instance in self.wanted_instances:
4938
      if not self.op.static:
4939
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4940
                                                  instance.name,
4941
                                                  instance.hypervisor)
4942
        remote_info.Raise()
4943
        remote_info = remote_info.data
4944
        if remote_info and "state" in remote_info:
4945
          remote_state = "up"
4946
        else:
4947
          remote_state = "down"
4948
      else:
4949
        remote_state = None
4950
      if instance.status == "down":
4951
        config_state = "down"
4952
      else:
4953
        config_state = "up"
4954

    
4955
      disks = [self._ComputeDiskStatus(instance, None, device)
4956
               for device in instance.disks]
4957

    
4958
      idict = {
4959
        "name": instance.name,
4960
        "config_state": config_state,
4961
        "run_state": remote_state,
4962
        "pnode": instance.primary_node,
4963
        "snodes": instance.secondary_nodes,
4964
        "os": instance.os,
4965
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4966
        "disks": disks,
4967
        "hypervisor": instance.hypervisor,
4968
        "network_port": instance.network_port,
4969
        "hv_instance": instance.hvparams,
4970
        "hv_actual": cluster.FillHV(instance),
4971
        "be_instance": instance.beparams,
4972
        "be_actual": cluster.FillBE(instance),
4973
        }
4974

    
4975
      result[instance.name] = idict
4976

    
4977
    return result
4978

    
4979

    
4980
class LUSetInstanceParams(LogicalUnit):
4981
  """Modifies an instances's parameters.
4982

4983
  """
4984
  HPATH = "instance-modify"
4985
  HTYPE = constants.HTYPE_INSTANCE
4986
  _OP_REQP = ["instance_name"]
4987
  REQ_BGL = False
4988

    
4989
  def CheckArguments(self):
4990
    if not hasattr(self.op, 'nics'):
4991
      self.op.nics = []
4992
    if not hasattr(self.op, 'disks'):
4993
      self.op.disks = []
4994
    if not hasattr(self.op, 'beparams'):
4995
      self.op.beparams = {}
4996
    if not hasattr(self.op, 'hvparams'):
4997
      self.op.hvparams = {}
4998
    self.op.force = getattr(self.op, "force", False)
4999
    if not (self.op.nics or self.op.disks or
5000
            self.op.hvparams or self.op.beparams):
5001
      raise errors.OpPrereqError("No changes submitted")
5002

    
5003
    utils.CheckBEParams(self.op.beparams)
5004

    
5005
    # Disk validation
5006
    disk_addremove = 0
5007
    for disk_op, disk_dict in self.op.disks:
5008
      if disk_op == constants.DDM_REMOVE:
5009
        disk_addremove += 1
5010
        continue
5011
      elif disk_op == constants.DDM_ADD:
5012
        disk_addremove += 1
5013
      else:
5014
        if not isinstance(disk_op, int):
5015
          raise errors.OpPrereqError("Invalid disk index")
5016
      if disk_op == constants.DDM_ADD:
5017
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5018
        if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5019
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5020
        size = disk_dict.get('size', None)
5021
        if size is None:
5022
          raise errors.OpPrereqError("Required disk parameter size missing")
5023
        try:
5024
          size = int(size)
5025
        except ValueError, err:
5026
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5027
                                     str(err))
5028
        disk_dict['size'] = size
5029
      else:
5030
        # modification of disk
5031
        if 'size' in disk_dict:
5032
          raise errors.OpPrereqError("Disk size change not possible, use"
5033
                                     " grow-disk")
5034

    
5035
    if disk_addremove > 1:
5036
      raise errors.OpPrereqError("Only one disk add or remove operation"
5037
                                 " supported at a time")
5038

    
5039
    # NIC validation
5040
    nic_addremove = 0
5041
    for nic_op, nic_dict in self.op.nics:
5042
      if nic_op == constants.DDM_REMOVE:
5043
        nic_addremove += 1
5044
        continue
5045
      elif nic_op == constants.DDM_ADD:
5046
        nic_addremove += 1
5047
      else:
5048
        if not isinstance(nic_op, int):
5049
          raise errors.OpPrereqError("Invalid nic index")
5050

    
5051
      # nic_dict should be a dict
5052
      nic_ip = nic_dict.get('ip', None)
5053
      if nic_ip is not None:
5054
        if nic_ip.lower() == "none":
5055
          nic_dict['ip'] = None
5056
        else:
5057
          if not utils.IsValidIP(nic_ip):
5058
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5059
      # we can only check None bridges and assign the default one
5060
      nic_bridge = nic_dict.get('bridge', None)
5061
      if nic_bridge is None:
5062
        nic_dict['bridge'] = self.cfg.GetDefBridge()
5063
      # but we can validate MACs
5064
      nic_mac = nic_dict.get('mac', None)
5065
      if nic_mac is not None:
5066
        if self.cfg.IsMacInUse(nic_mac):
5067
          raise errors.OpPrereqError("MAC address %s already in use"
5068
                                     " in cluster" % nic_mac)
5069
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5070
          if not utils.IsValidMac(nic_mac):
5071
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5072
    if nic_addremove > 1:
5073
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5074
                                 " supported at a time")
5075

    
5076
  def ExpandNames(self):
5077
    self._ExpandAndLockInstance()
5078
    self.needed_locks[locking.LEVEL_NODE] = []
5079
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5080

    
5081
  def DeclareLocks(self, level):
5082
    if level == locking.LEVEL_NODE:
5083
      self._LockInstancesNodes()
5084

    
5085
  def BuildHooksEnv(self):
5086
    """Build hooks env.
5087

5088
    This runs on the master, primary and secondaries.
5089

5090
    """
5091
    args = dict()
5092
    if constants.BE_MEMORY in self.be_new:
5093
      args['memory'] = self.be_new[constants.BE_MEMORY]
5094
    if constants.BE_VCPUS in self.be_new:
5095
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5096
    # FIXME: readd disk/nic changes
5097
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5098
    nl = [self.cfg.GetMasterNode(),
5099
          self.instance.primary_node] + list(self.instance.secondary_nodes)
5100
    return env, nl, nl
5101

    
5102
  def CheckPrereq(self):
5103
    """Check prerequisites.
5104

5105
    This only checks the instance list against the existing names.
5106

5107
    """
5108
    force = self.force = self.op.force
5109

    
5110
    # checking the new params on the primary/secondary nodes
5111

    
5112
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5113
    assert self.instance is not None, \
5114
      "Cannot retrieve locked instance %s" % self.op.instance_name
5115
    pnode = self.instance.primary_node
5116
    nodelist = [pnode]
5117
    nodelist.extend(instance.secondary_nodes)
5118

    
5119
    # hvparams processing
5120
    if self.op.hvparams:
5121
      i_hvdict = copy.deepcopy(instance.hvparams)
5122
      for key, val in self.op.hvparams.iteritems():
5123
        if val == constants.VALUE_DEFAULT:
5124
          try:
5125
            del i_hvdict[key]
5126
          except KeyError:
5127
            pass
5128
        elif val == constants.VALUE_NONE:
5129
          i_hvdict[key] = None
5130
        else:
5131
          i_hvdict[key] = val
5132
      cluster = self.cfg.GetClusterInfo()
5133
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5134
                                i_hvdict)
5135
      # local check
5136
      hypervisor.GetHypervisor(
5137
        instance.hypervisor).CheckParameterSyntax(hv_new)
5138
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5139
      self.hv_new = hv_new # the new actual values
5140
      self.hv_inst = i_hvdict # the new dict (without defaults)
5141
    else:
5142
      self.hv_new = self.hv_inst = {}
5143

    
5144
    # beparams processing
5145
    if self.op.beparams:
5146
      i_bedict = copy.deepcopy(instance.beparams)
5147
      for key, val in self.op.beparams.iteritems():
5148
        if val == constants.VALUE_DEFAULT:
5149
          try:
5150
            del i_bedict[key]
5151
          except KeyError:
5152
            pass
5153
        else:
5154
          i_bedict[key] = val
5155
      cluster = self.cfg.GetClusterInfo()
5156
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5157
                                i_bedict)
5158
      self.be_new = be_new # the new actual values
5159
      self.be_inst = i_bedict # the new dict (without defaults)
5160
    else:
5161
      self.be_new = self.be_inst = {}
5162

    
5163
    self.warn = []
5164

    
5165
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5166
      mem_check_list = [pnode]
5167
      if be_new[constants.BE_AUTO_BALANCE]:
5168
        # either we changed auto_balance to yes or it was from before
5169
        mem_check_list.extend(instance.secondary_nodes)
5170
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5171
                                                  instance.hypervisor)
5172
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5173
                                         instance.hypervisor)
5174
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5175
        # Assume the primary node is unreachable and go ahead
5176
        self.warn.append("Can't get info from primary node %s" % pnode)
5177
      else:
5178
        if not instance_info.failed and instance_info.data:
5179
          current_mem = instance_info.data['memory']
5180
        else:
5181
          # Assume instance not running
5182
          # (there is a slight race condition here, but it's not very probable,
5183
          # and we have no other way to check)
5184
          current_mem = 0
5185
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5186
                    nodeinfo[pnode].data['memory_free'])
5187
        if miss_mem > 0:
5188
          raise errors.OpPrereqError("This change will prevent the instance"
5189
                                     " from starting, due to %d MB of memory"
5190
                                     " missing on its primary node" % miss_mem)
5191

    
5192
      if be_new[constants.BE_AUTO_BALANCE]:
5193
        for node, nres in instance.secondary_nodes.iteritems():
5194
          if nres.failed or not isinstance(nres.data, dict):
5195
            self.warn.append("Can't get info from secondary node %s" % node)
5196
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5197
            self.warn.append("Not enough memory to failover instance to"
5198
                             " secondary node %s" % node)
5199

    
5200
    # NIC processing
5201
    for nic_op, nic_dict in self.op.nics:
5202
      if nic_op == constants.DDM_REMOVE:
5203
        if not instance.nics:
5204
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5205
        continue
5206
      if nic_op != constants.DDM_ADD:
5207
        # an existing nic
5208
        if nic_op < 0 or nic_op >= len(instance.nics):
5209
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5210
                                     " are 0 to %d" %
5211
                                     (nic_op, len(instance.nics)))
5212
      nic_bridge = nic_dict.get('bridge', None)
5213
      if nic_bridge is not None:
5214
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5215
          msg = ("Bridge '%s' doesn't exist on one of"
5216
                 " the instance nodes" % nic_bridge)
5217
          if self.force:
5218
            self.warn.append(msg)
5219
          else:
5220
            raise errors.OpPrereqError(msg)
5221

    
5222
    # DISK processing
5223
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5224
      raise errors.OpPrereqError("Disk operations not supported for"
5225
                                 " diskless instances")
5226
    for disk_op, disk_dict in self.op.disks:
5227
      if disk_op == constants.DDM_REMOVE:
5228
        if len(instance.disks) == 1:
5229
          raise errors.OpPrereqError("Cannot remove the last disk of"
5230
                                     " an instance")
5231
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5232
        ins_l = ins_l[pnode]
5233
        if not type(ins_l) is list:
5234
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5235
        if instance.name in ins_l:
5236
          raise errors.OpPrereqError("Instance is running, can't remove"
5237
                                     " disks.")
5238

    
5239
      if (disk_op == constants.DDM_ADD and
5240
          len(instance.nics) >= constants.MAX_DISKS):
5241
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5242
                                   " add more" % constants.MAX_DISKS)
5243
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5244
        # an existing disk
5245
        if disk_op < 0 or disk_op >= len(instance.disks):
5246
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5247
                                     " are 0 to %d" %
5248
                                     (disk_op, len(instance.disks)))
5249

    
5250
    return
5251

    
5252
  def Exec(self, feedback_fn):
5253
    """Modifies an instance.
5254

5255
    All parameters take effect only at the next restart of the instance.
5256

5257
    """
5258
    # Process here the warnings from CheckPrereq, as we don't have a
5259
    # feedback_fn there.
5260
    for warn in self.warn:
5261
      feedback_fn("WARNING: %s" % warn)
5262

    
5263
    result = []
5264
    instance = self.instance
5265
    # disk changes
5266
    for disk_op, disk_dict in self.op.disks:
5267
      if disk_op == constants.DDM_REMOVE:
5268
        # remove the last disk
5269
        device = instance.disks.pop()
5270
        device_idx = len(instance.disks)
5271
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5272
          self.cfg.SetDiskID(disk, node)
5273
          result = self.rpc.call_blockdev_remove(node, disk)
5274
          if result.failed or not result.data:
5275
            self.proc.LogWarning("Could not remove disk/%d on node %s,"
5276
                                 " continuing anyway", device_idx, node)
5277
        result.append(("disk/%d" % device_idx, "remove"))
5278
      elif disk_op == constants.DDM_ADD:
5279
        # add a new disk
5280
        if instance.disk_template == constants.DT_FILE:
5281
          file_driver, file_path = instance.disks[0].logical_id
5282
          file_path = os.path.dirname(file_path)
5283
        else:
5284
          file_driver = file_path = None
5285
        disk_idx_base = len(instance.disks)
5286
        new_disk = _GenerateDiskTemplate(self,
5287
                                         instance.disk_template,
5288
                                         instance, instance.primary_node,
5289
                                         instance.secondary_nodes,
5290
                                         [disk_dict],
5291
                                         file_path,
5292
                                         file_driver,
5293
                                         disk_idx_base)[0]
5294
        new_disk.mode = disk_dict['mode']
5295
        instance.disks.append(new_disk)
5296
        info = _GetInstanceInfoText(instance)
5297

    
5298
        logging.info("Creating volume %s for instance %s",
5299
                     new_disk.iv_name, instance.name)
5300
        # Note: this needs to be kept in sync with _CreateDisks
5301
        #HARDCODE
5302
        for secondary_node in instance.secondary_nodes:
5303
          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5304
                                            new_disk, False, info):
5305
            self.LogWarning("Failed to create volume %s (%s) on"
5306
                            " secondary node %s!",
5307
                            new_disk.iv_name, new_disk, secondary_node)
5308
        #HARDCODE
5309
        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5310
                                        instance, new_disk, info):
5311
          self.LogWarning("Failed to create volume %s on primary!",
5312
                          new_disk.iv_name)
5313
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5314
                       (new_disk.size, new_disk.mode)))
5315
      else:
5316
        # change a given disk
5317
        instance.disks[disk_op].mode = disk_dict['mode']
5318
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5319
    # NIC changes
5320
    for nic_op, nic_dict in self.op.nics:
5321
      if nic_op == constants.DDM_REMOVE:
5322
        # remove the last nic
5323
        del instance.nics[-1]
5324
        result.append(("nic.%d" % len(instance.nics), "remove"))
5325
      elif nic_op == constants.DDM_ADD:
5326
        # add a new nic
5327
        if 'mac' not in nic_dict:
5328
          mac = constants.VALUE_GENERATE
5329
        else:
5330
          mac = nic_dict['mac']
5331
        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5332
          mac = self.cfg.GenerateMAC()
5333
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5334
                              bridge=nic_dict.get('bridge', None))
5335
        instance.nics.append(new_nic)
5336
        result.append(("nic.%d" % (len(instance.nics) - 1),
5337
                       "add:mac=%s,ip=%s,bridge=%s" %
5338
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
5339
      else:
5340
        # change a given nic
5341
        for key in 'mac', 'ip', 'bridge':
5342
          if key in nic_dict:
5343
            setattr(instance.nics[nic_op], key, nic_dict[key])
5344
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5345

    
5346
    # hvparams changes
5347
    if self.op.hvparams:
5348
      instance.hvparams = self.hv_new
5349
      for key, val in self.op.hvparams.iteritems():
5350
        result.append(("hv/%s" % key, val))
5351

    
5352
    # beparams changes
5353
    if self.op.beparams:
5354
      instance.beparams = self.be_inst
5355
      for key, val in self.op.beparams.iteritems():
5356
        result.append(("be/%s" % key, val))
5357

    
5358
    self.cfg.Update(instance)
5359

    
5360
    return result
5361

    
5362

    
5363
class LUQueryExports(NoHooksLU):
5364
  """Query the exports list
5365

5366
  """
5367
  _OP_REQP = ['nodes']
5368
  REQ_BGL = False
5369

    
5370
  def ExpandNames(self):
5371
    self.needed_locks = {}
5372
    self.share_locks[locking.LEVEL_NODE] = 1
5373
    if not self.op.nodes:
5374
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5375
    else:
5376
      self.needed_locks[locking.LEVEL_NODE] = \
5377
        _GetWantedNodes(self, self.op.nodes)
5378

    
5379
  def CheckPrereq(self):
5380
    """Check prerequisites.
5381

5382
    """
5383
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5384

    
5385
  def Exec(self, feedback_fn):
5386
    """Compute the list of all the exported system images.
5387

5388
    @rtype: dict
5389
    @return: a dictionary with the structure node->(export-list)
5390
        where export-list is a list of the instances exported on
5391
        that node.
5392

5393
    """
5394
    rpcresult = self.rpc.call_export_list(self.nodes)
5395
    result = {}
5396
    for node in rpcresult:
5397
      if rpcresult[node].failed:
5398
        result[node] = False
5399
      else:
5400
        result[node] = rpcresult[node].data
5401

    
5402
    return result
5403

    
5404

    
5405
class LUExportInstance(LogicalUnit):
5406
  """Export an instance to an image in the cluster.
5407

5408
  """
5409
  HPATH = "instance-export"
5410
  HTYPE = constants.HTYPE_INSTANCE
5411
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
5412
  REQ_BGL = False
5413

    
5414
  def ExpandNames(self):
5415
    self._ExpandAndLockInstance()
5416
    # FIXME: lock only instance primary and destination node
5417
    #
5418
    # Sad but true, for now we have do lock all nodes, as we don't know where
5419
    # the previous export might be, and and in this LU we search for it and
5420
    # remove it from its current node. In the future we could fix this by:
5421
    #  - making a tasklet to search (share-lock all), then create the new one,
5422
    #    then one to remove, after
5423
    #  - removing the removal operation altoghether
5424
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5425

    
5426
  def DeclareLocks(self, level):
5427
    """Last minute lock declaration."""
5428
    # All nodes are locked anyway, so nothing to do here.
5429

    
5430
  def BuildHooksEnv(self):
5431
    """Build hooks env.
5432

5433
    This will run on the master, primary node and target node.
5434

5435
    """
5436
    env = {
5437
      "EXPORT_NODE": self.op.target_node,
5438
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5439
      }
5440
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5441
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5442
          self.op.target_node]
5443
    return env, nl, nl
5444

    
5445
  def CheckPrereq(self):
5446
    """Check prerequisites.
5447

5448
    This checks that the instance and node names are valid.
5449

5450
    """
5451
    instance_name = self.op.instance_name
5452
    self.instance = self.cfg.GetInstanceInfo(instance_name)
5453
    assert self.instance is not None, \
5454
          "Cannot retrieve locked instance %s" % self.op.instance_name
5455

    
5456
    self.dst_node = self.cfg.GetNodeInfo(
5457
      self.cfg.ExpandNodeName(self.op.target_node))
5458

    
5459
    if self.dst_node is None:
5460
      # This is wrong node name, not a non-locked node
5461
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5462

    
5463
    # instance disk type verification
5464
    for disk in self.instance.disks:
5465
      if disk.dev_type == constants.LD_FILE:
5466
        raise errors.OpPrereqError("Export not supported for instances with"
5467
                                   " file-based disks")
5468

    
5469
  def Exec(self, feedback_fn):
5470
    """Export an instance to an image in the cluster.
5471

5472
    """
5473
    instance = self.instance
5474
    dst_node = self.dst_node
5475
    src_node = instance.primary_node
5476
    if self.op.shutdown:
5477
      # shutdown the instance, but not the disks
5478
      result = self.rpc.call_instance_shutdown(src_node, instance)
5479
      result.Raise()
5480
      if not result.data:
5481
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5482
                                 (instance.name, src_node))
5483

    
5484
    vgname = self.cfg.GetVGName()
5485

    
5486
    snap_disks = []
5487

    
5488
    try:
5489
      for disk in instance.disks:
5490
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5491
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5492
        if new_dev_name.failed or not new_dev_name.data:
5493
          self.LogWarning("Could not snapshot block device %s on node %s",
5494
                          disk.logical_id[1], src_node)
5495
          snap_disks.append(False)
5496
        else:
5497
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5498
                                 logical_id=(vgname, new_dev_name.data),
5499
                                 physical_id=(vgname, new_dev_name.data),
5500
                                 iv_name=disk.iv_name)
5501
          snap_disks.append(new_dev)
5502

    
5503
    finally:
5504
      if self.op.shutdown and instance.status == "up":
5505
        result = self.rpc.call_instance_start(src_node, instance, None)
5506
        if result.failed or not result.data:
5507
          _ShutdownInstanceDisks(self, instance)
5508
          raise errors.OpExecError("Could not start instance")
5509

    
5510
    # TODO: check for size
5511

    
5512
    cluster_name = self.cfg.GetClusterName()
5513
    for idx, dev in enumerate(snap_disks):
5514
      if dev:
5515
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5516
                                               instance, cluster_name, idx)
5517
        if result.failed or not result.data:
5518
          self.LogWarning("Could not export block device %s from node %s to"
5519
                          " node %s", dev.logical_id[1], src_node,
5520
                          dst_node.name)
5521
        result = self.rpc.call_blockdev_remove(src_node, dev)
5522
        if result.failed or not result.data:
5523
          self.LogWarning("Could not remove snapshot block device %s from node"
5524
                          " %s", dev.logical_id[1], src_node)
5525

    
5526
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5527
    if result.failed or not result.data:
5528
      self.LogWarning("Could not finalize export for instance %s on node %s",
5529
                      instance.name, dst_node.name)
5530

    
5531
    nodelist = self.cfg.GetNodeList()
5532
    nodelist.remove(dst_node.name)
5533

    
5534
    # on one-node clusters nodelist will be empty after the removal
5535
    # if we proceed the backup would be removed because OpQueryExports
5536
    # substitutes an empty list with the full cluster node list.
5537
    if nodelist:
5538
      exportlist = self.rpc.call_export_list(nodelist)
5539
      for node in exportlist:
5540
        if exportlist[node].failed:
5541
          continue
5542
        if instance.name in exportlist[node].data:
5543
          if not self.rpc.call_export_remove(node, instance.name):
5544
            self.LogWarning("Could not remove older export for instance %s"
5545
                            " on node %s", instance.name, node)
5546

    
5547

    
5548
class LURemoveExport(NoHooksLU):
5549
  """Remove exports related to the named instance.
5550

5551
  """
5552
  _OP_REQP = ["instance_name"]
5553
  REQ_BGL = False
5554

    
5555
  def ExpandNames(self):
5556
    self.needed_locks = {}
5557
    # We need all nodes to be locked in order for RemoveExport to work, but we
5558
    # don't need to lock the instance itself, as nothing will happen to it (and
5559
    # we can remove exports also for a removed instance)
5560
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5561

    
5562
  def CheckPrereq(self):
5563
    """Check prerequisites.
5564
    """
5565
    pass
5566

    
5567
  def Exec(self, feedback_fn):
5568
    """Remove any export.
5569

5570
    """
5571
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5572
    # If the instance was not found we'll try with the name that was passed in.
5573
    # This will only work if it was an FQDN, though.
5574
    fqdn_warn = False
5575
    if not instance_name:
5576
      fqdn_warn = True
5577
      instance_name = self.op.instance_name
5578

    
5579
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5580
      locking.LEVEL_NODE])
5581
    found = False
5582
    for node in exportlist:
5583
      if exportlist[node].failed:
5584
        self.LogWarning("Failed to query node %s, continuing" % node)
5585
        continue
5586
      if instance_name in exportlist[node].data:
5587
        found = True
5588
        result = self.rpc.call_export_remove(node, instance_name)
5589
        if result.failed or not result.data:
5590
          logging.error("Could not remove export for instance %s"
5591
                        " on node %s", instance_name, node)
5592

    
5593
    if fqdn_warn and not found:
5594
      feedback_fn("Export not found. If trying to remove an export belonging"
5595
                  " to a deleted instance please use its Fully Qualified"
5596
                  " Domain Name.")
5597

    
5598

    
5599
class TagsLU(NoHooksLU):
5600
  """Generic tags LU.
5601

5602
  This is an abstract class which is the parent of all the other tags LUs.
5603

5604
  """
5605

    
5606
  def ExpandNames(self):
5607
    self.needed_locks = {}
5608
    if self.op.kind == constants.TAG_NODE:
5609
      name = self.cfg.ExpandNodeName(self.op.name)
5610
      if name is None:
5611
        raise errors.OpPrereqError("Invalid node name (%s)" %
5612
                                   (self.op.name,))
5613
      self.op.name = name
5614
      self.needed_locks[locking.LEVEL_NODE] = name
5615
    elif self.op.kind == constants.TAG_INSTANCE:
5616
      name = self.cfg.ExpandInstanceName(self.op.name)
5617
      if name is None:
5618
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5619
                                   (self.op.name,))
5620
      self.op.name = name
5621
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5622

    
5623
  def CheckPrereq(self):
5624
    """Check prerequisites.
5625

5626
    """
5627
    if self.op.kind == constants.TAG_CLUSTER:
5628
      self.target = self.cfg.GetClusterInfo()
5629
    elif self.op.kind == constants.TAG_NODE:
5630
      self.target = self.cfg.GetNodeInfo(self.op.name)
5631
    elif self.op.kind == constants.TAG_INSTANCE:
5632
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5633
    else:
5634
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5635
                                 str(self.op.kind))
5636

    
5637

    
5638
class LUGetTags(TagsLU):
5639
  """Returns the tags of a given object.
5640

5641
  """
5642
  _OP_REQP = ["kind", "name"]
5643
  REQ_BGL = False
5644

    
5645
  def Exec(self, feedback_fn):
5646
    """Returns the tag list.
5647

5648
    """
5649
    return list(self.target.GetTags())
5650

    
5651

    
5652
class LUSearchTags(NoHooksLU):
5653
  """Searches the tags for a given pattern.
5654

5655
  """
5656
  _OP_REQP = ["pattern"]
5657
  REQ_BGL = False
5658

    
5659
  def ExpandNames(self):
5660
    self.needed_locks = {}
5661

    
5662
  def CheckPrereq(self):
5663
    """Check prerequisites.
5664

5665
    This checks the pattern passed for validity by compiling it.
5666

5667
    """
5668
    try:
5669
      self.re = re.compile(self.op.pattern)
5670
    except re.error, err:
5671
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5672
                                 (self.op.pattern, err))
5673

    
5674
  def Exec(self, feedback_fn):
5675
    """Returns the tag list.
5676

5677
    """
5678
    cfg = self.cfg
5679
    tgts = [("/cluster", cfg.GetClusterInfo())]
5680
    ilist = cfg.GetAllInstancesInfo().values()
5681
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5682
    nlist = cfg.GetAllNodesInfo().values()
5683
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5684
    results = []
5685
    for path, target in tgts:
5686
      for tag in target.GetTags():
5687
        if self.re.search(tag):
5688
          results.append((path, tag))
5689
    return results
5690

    
5691

    
5692
class LUAddTags(TagsLU):
5693
  """Sets a tag on a given object.
5694

5695
  """
5696
  _OP_REQP = ["kind", "name", "tags"]
5697
  REQ_BGL = False
5698

    
5699
  def CheckPrereq(self):
5700
    """Check prerequisites.
5701

5702
    This checks the type and length of the tag name and value.
5703

5704
    """
5705
    TagsLU.CheckPrereq(self)
5706
    for tag in self.op.tags:
5707
      objects.TaggableObject.ValidateTag(tag)
5708

    
5709
  def Exec(self, feedback_fn):
5710
    """Sets the tag.
5711

5712
    """
5713
    try:
5714
      for tag in self.op.tags:
5715
        self.target.AddTag(tag)
5716
    except errors.TagError, err:
5717
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5718
    try:
5719
      self.cfg.Update(self.target)
5720
    except errors.ConfigurationError:
5721
      raise errors.OpRetryError("There has been a modification to the"
5722
                                " config file and the operation has been"
5723
                                " aborted. Please retry.")
5724

    
5725

    
5726
class LUDelTags(TagsLU):
5727
  """Delete a list of tags from a given object.
5728

5729
  """
5730
  _OP_REQP = ["kind", "name", "tags"]
5731
  REQ_BGL = False
5732

    
5733
  def CheckPrereq(self):
5734
    """Check prerequisites.
5735

5736
    This checks that we have the given tag.
5737

5738
    """
5739
    TagsLU.CheckPrereq(self)
5740
    for tag in self.op.tags:
5741
      objects.TaggableObject.ValidateTag(tag)
5742
    del_tags = frozenset(self.op.tags)
5743
    cur_tags = self.target.GetTags()
5744
    if not del_tags <= cur_tags:
5745
      diff_tags = del_tags - cur_tags
5746
      diff_names = ["'%s'" % tag for tag in diff_tags]
5747
      diff_names.sort()
5748
      raise errors.OpPrereqError("Tag(s) %s not found" %
5749
                                 (",".join(diff_names)))
5750

    
5751
  def Exec(self, feedback_fn):
5752
    """Remove the tag from the object.
5753

5754
    """
5755
    for tag in self.op.tags:
5756
      self.target.RemoveTag(tag)
5757
    try:
5758
      self.cfg.Update(self.target)
5759
    except errors.ConfigurationError:
5760
      raise errors.OpRetryError("There has been a modification to the"
5761
                                " config file and the operation has been"
5762
                                " aborted. Please retry.")
5763

    
5764

    
5765
class LUTestDelay(NoHooksLU):
5766
  """Sleep for a specified amount of time.
5767

5768
  This LU sleeps on the master and/or nodes for a specified amount of
5769
  time.
5770

5771
  """
5772
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5773
  REQ_BGL = False
5774

    
5775
  def ExpandNames(self):
5776
    """Expand names and set required locks.
5777

5778
    This expands the node list, if any.
5779

5780
    """
5781
    self.needed_locks = {}
5782
    if self.op.on_nodes:
5783
      # _GetWantedNodes can be used here, but is not always appropriate to use
5784
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5785
      # more information.
5786
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5787
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5788

    
5789
  def CheckPrereq(self):
5790
    """Check prerequisites.
5791

5792
    """
5793

    
5794
  def Exec(self, feedback_fn):
5795
    """Do the actual sleep.
5796

5797
    """
5798
    if self.op.on_master:
5799
      if not utils.TestDelay(self.op.duration):
5800
        raise errors.OpExecError("Error during master delay test")
5801
    if self.op.on_nodes:
5802
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5803
      if not result:
5804
        raise errors.OpExecError("Complete failure from rpc call")
5805
      for node, node_result in result.items():
5806
        node_result.Raise()
5807
        if not node_result.data:
5808
          raise errors.OpExecError("Failure during rpc call to node %s,"
5809
                                   " result: %s" % (node, node_result.data))
5810

    
5811

    
5812
class IAllocator(object):
5813
  """IAllocator framework.
5814

5815
  An IAllocator instance has three sets of attributes:
5816
    - cfg that is needed to query the cluster
5817
    - input data (all members of the _KEYS class attribute are required)
5818
    - four buffer attributes (in|out_data|text), that represent the
5819
      input (to the external script) in text and data structure format,
5820
      and the output from it, again in two formats
5821
    - the result variables from the script (success, info, nodes) for
5822
      easy usage
5823

5824
  """
5825
  _ALLO_KEYS = [
5826
    "mem_size", "disks", "disk_template",
5827
    "os", "tags", "nics", "vcpus", "hypervisor",
5828
    ]
5829
  _RELO_KEYS = [
5830
    "relocate_from",
5831
    ]
5832

    
5833
  def __init__(self, lu, mode, name, **kwargs):
5834
    self.lu = lu
5835
    # init buffer variables
5836
    self.in_text = self.out_text = self.in_data = self.out_data = None
5837
    # init all input fields so that pylint is happy
5838
    self.mode = mode
5839
    self.name = name
5840
    self.mem_size = self.disks = self.disk_template = None
5841
    self.os = self.tags = self.nics = self.vcpus = None
5842
    self.relocate_from = None
5843
    # computed fields
5844
    self.required_nodes = None
5845
    # init result fields
5846
    self.success = self.info = self.nodes = None
5847
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5848
      keyset = self._ALLO_KEYS
5849
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5850
      keyset = self._RELO_KEYS
5851
    else:
5852
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5853
                                   " IAllocator" % self.mode)
5854
    for key in kwargs:
5855
      if key not in keyset:
5856
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5857
                                     " IAllocator" % key)
5858
      setattr(self, key, kwargs[key])
5859
    for key in keyset:
5860
      if key not in kwargs:
5861
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5862
                                     " IAllocator" % key)
5863
    self._BuildInputData()
5864

    
5865
  def _ComputeClusterData(self):
5866
    """Compute the generic allocator input data.
5867

5868
    This is the data that is independent of the actual operation.
5869

5870
    """
5871
    cfg = self.lu.cfg
5872
    cluster_info = cfg.GetClusterInfo()
5873
    # cluster data
5874
    data = {
5875
      "version": 1,
5876
      "cluster_name": cfg.GetClusterName(),
5877
      "cluster_tags": list(cluster_info.GetTags()),
5878
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5879
      # we don't have job IDs
5880
      }
5881
    iinfo = cfg.GetAllInstancesInfo().values()
5882
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5883

    
5884
    # node data
5885
    node_results = {}
5886
    node_list = cfg.GetNodeList()
5887

    
5888
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5889
      hypervisor = self.hypervisor
5890
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5891
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5892

    
5893
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5894
                                           hypervisor)
5895
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5896
                       cluster_info.enabled_hypervisors)
5897
    for nname in node_list:
5898
      ninfo = cfg.GetNodeInfo(nname)
5899
      node_data[nname].Raise()
5900
      if not isinstance(node_data[nname].data, dict):
5901
        raise errors.OpExecError("Can't get data for node %s" % nname)
5902
      remote_info = node_data[nname].data
5903
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5904
                   'vg_size', 'vg_free', 'cpu_total']:
5905
        if attr not in remote_info:
5906
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5907
                                   (nname, attr))
5908
        try:
5909
          remote_info[attr] = int(remote_info[attr])
5910
        except ValueError, err:
5911
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5912
                                   " %s" % (nname, attr, str(err)))
5913
      # compute memory used by primary instances
5914
      i_p_mem = i_p_up_mem = 0
5915
      for iinfo, beinfo in i_list:
5916
        if iinfo.primary_node == nname:
5917
          i_p_mem += beinfo[constants.BE_MEMORY]
5918
          if iinfo.name not in node_iinfo[nname]:
5919
            i_used_mem = 0
5920
          else:
5921
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5922
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5923
          remote_info['memory_free'] -= max(0, i_mem_diff)
5924

    
5925
          if iinfo.status == "up":
5926
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5927

    
5928
      # compute memory used by instances
5929
      pnr = {
5930
        "tags": list(ninfo.GetTags()),
5931
        "total_memory": remote_info['memory_total'],
5932
        "reserved_memory": remote_info['memory_dom0'],
5933
        "free_memory": remote_info['memory_free'],
5934
        "i_pri_memory": i_p_mem,
5935
        "i_pri_up_memory": i_p_up_mem,
5936
        "total_disk": remote_info['vg_size'],
5937
        "free_disk": remote_info['vg_free'],
5938
        "primary_ip": ninfo.primary_ip,
5939
        "secondary_ip": ninfo.secondary_ip,
5940
        "total_cpus": remote_info['cpu_total'],
5941
        "offline": ninfo.offline,
5942
        }
5943
      node_results[nname] = pnr
5944
    data["nodes"] = node_results
5945

    
5946
    # instance data
5947
    instance_data = {}
5948
    for iinfo, beinfo in i_list:
5949
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5950
                  for n in iinfo.nics]
5951
      pir = {
5952
        "tags": list(iinfo.GetTags()),
5953
        "should_run": iinfo.status == "up",
5954
        "vcpus": beinfo[constants.BE_VCPUS],
5955
        "memory": beinfo[constants.BE_MEMORY],
5956
        "os": iinfo.os,
5957
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5958
        "nics": nic_data,
5959
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5960
        "disk_template": iinfo.disk_template,
5961
        "hypervisor": iinfo.hypervisor,
5962
        }
5963
      instance_data[iinfo.name] = pir
5964

    
5965
    data["instances"] = instance_data
5966

    
5967
    self.in_data = data
5968

    
5969
  def _AddNewInstance(self):
5970
    """Add new instance data to allocator structure.
5971

5972
    This in combination with _AllocatorGetClusterData will create the
5973
    correct structure needed as input for the allocator.
5974

5975
    The checks for the completeness of the opcode must have already been
5976
    done.
5977

5978
    """
5979
    data = self.in_data
5980
    if len(self.disks) != 2:
5981
      raise errors.OpExecError("Only two-disk configurations supported")
5982

    
5983
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5984

    
5985
    if self.disk_template in constants.DTS_NET_MIRROR:
5986
      self.required_nodes = 2
5987
    else:
5988
      self.required_nodes = 1
5989
    request = {
5990
      "type": "allocate",
5991
      "name": self.name,
5992
      "disk_template": self.disk_template,
5993
      "tags": self.tags,
5994
      "os": self.os,
5995
      "vcpus": self.vcpus,
5996
      "memory": self.mem_size,
5997
      "disks": self.disks,
5998
      "disk_space_total": disk_space,
5999
      "nics": self.nics,
6000
      "required_nodes": self.required_nodes,
6001
      }
6002
    data["request"] = request
6003

    
6004
  def _AddRelocateInstance(self):
6005
    """Add relocate instance data to allocator structure.
6006

6007
    This in combination with _IAllocatorGetClusterData will create the
6008
    correct structure needed as input for the allocator.
6009

6010
    The checks for the completeness of the opcode must have already been
6011
    done.
6012

6013
    """
6014
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6015
    if instance is None:
6016
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6017
                                   " IAllocator" % self.name)
6018

    
6019
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6020
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6021

    
6022
    if len(instance.secondary_nodes) != 1:
6023
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6024

    
6025
    self.required_nodes = 1
6026
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6027
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6028

    
6029
    request = {
6030
      "type": "relocate",
6031
      "name": self.name,
6032
      "disk_space_total": disk_space,
6033
      "required_nodes": self.required_nodes,
6034
      "relocate_from": self.relocate_from,
6035
      }
6036
    self.in_data["request"] = request
6037

    
6038
  def _BuildInputData(self):
6039
    """Build input data structures.
6040

6041
    """
6042
    self._ComputeClusterData()
6043

    
6044
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6045
      self._AddNewInstance()
6046
    else:
6047
      self._AddRelocateInstance()
6048

    
6049
    self.in_text = serializer.Dump(self.in_data)
6050

    
6051
  def Run(self, name, validate=True, call_fn=None):
6052
    """Run an instance allocator and return the results.
6053

6054
    """
6055
    if call_fn is None:
6056
      call_fn = self.lu.rpc.call_iallocator_runner
6057
    data = self.in_text
6058

    
6059
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6060
    result.Raise()
6061

    
6062
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6063
      raise errors.OpExecError("Invalid result from master iallocator runner")
6064

    
6065
    rcode, stdout, stderr, fail = result.data
6066

    
6067
    if rcode == constants.IARUN_NOTFOUND:
6068
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6069
    elif rcode == constants.IARUN_FAILURE:
6070
      raise errors.OpExecError("Instance allocator call failed: %s,"
6071
                               " output: %s" % (fail, stdout+stderr))
6072
    self.out_text = stdout
6073
    if validate:
6074
      self._ValidateResult()
6075

    
6076
  def _ValidateResult(self):
6077
    """Process the allocator results.
6078

6079
    This will process and if successful save the result in
6080
    self.out_data and the other parameters.
6081

6082
    """
6083
    try:
6084
      rdict = serializer.Load(self.out_text)
6085
    except Exception, err:
6086
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6087

    
6088
    if not isinstance(rdict, dict):
6089
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6090

    
6091
    for key in "success", "info", "nodes":
6092
      if key not in rdict:
6093
        raise errors.OpExecError("Can't parse iallocator results:"
6094
                                 " missing key '%s'" % key)
6095
      setattr(self, key, rdict[key])
6096

    
6097
    if not isinstance(rdict["nodes"], list):
6098
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6099
                               " is not a list")
6100
    self.out_data = rdict
6101

    
6102

    
6103
class LUTestAllocator(NoHooksLU):
6104
  """Run allocator tests.
6105

6106
  This LU runs the allocator tests
6107

6108
  """
6109
  _OP_REQP = ["direction", "mode", "name"]
6110

    
6111
  def CheckPrereq(self):
6112
    """Check prerequisites.
6113

6114
    This checks the opcode parameters depending on the director and mode test.
6115

6116
    """
6117
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6118
      for attr in ["name", "mem_size", "disks", "disk_template",
6119
                   "os", "tags", "nics", "vcpus"]:
6120
        if not hasattr(self.op, attr):
6121
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6122
                                     attr)
6123
      iname = self.cfg.ExpandInstanceName(self.op.name)
6124
      if iname is not None:
6125
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6126
                                   iname)
6127
      if not isinstance(self.op.nics, list):
6128
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6129
      for row in self.op.nics:
6130
        if (not isinstance(row, dict) or
6131
            "mac" not in row or
6132
            "ip" not in row or
6133
            "bridge" not in row):
6134
          raise errors.OpPrereqError("Invalid contents of the"
6135
                                     " 'nics' parameter")
6136
      if not isinstance(self.op.disks, list):
6137
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6138
      if len(self.op.disks) != 2:
6139
        raise errors.OpPrereqError("Only two-disk configurations supported")
6140
      for row in self.op.disks:
6141
        if (not isinstance(row, dict) or
6142
            "size" not in row or
6143
            not isinstance(row["size"], int) or
6144
            "mode" not in row or
6145
            row["mode"] not in ['r', 'w']):
6146
          raise errors.OpPrereqError("Invalid contents of the"
6147
                                     " 'disks' parameter")
6148
      if self.op.hypervisor is None:
6149
        self.op.hypervisor = self.cfg.GetHypervisorType()
6150
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6151
      if not hasattr(self.op, "name"):
6152
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6153
      fname = self.cfg.ExpandInstanceName(self.op.name)
6154
      if fname is None:
6155
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6156
                                   self.op.name)
6157
      self.op.name = fname
6158
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6159
    else:
6160
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6161
                                 self.op.mode)
6162

    
6163
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6164
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6165
        raise errors.OpPrereqError("Missing allocator name")
6166
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6167
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6168
                                 self.op.direction)
6169

    
6170
  def Exec(self, feedback_fn):
6171
    """Run the allocator test.
6172

6173
    """
6174
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6175
      ial = IAllocator(self,
6176
                       mode=self.op.mode,
6177
                       name=self.op.name,
6178
                       mem_size=self.op.mem_size,
6179
                       disks=self.op.disks,
6180
                       disk_template=self.op.disk_template,
6181
                       os=self.op.os,
6182
                       tags=self.op.tags,
6183
                       nics=self.op.nics,
6184
                       vcpus=self.op.vcpus,
6185
                       hypervisor=self.op.hypervisor,
6186
                       )
6187
    else:
6188
      ial = IAllocator(self,
6189
                       mode=self.op.mode,
6190
                       name=self.op.name,
6191
                       relocate_from=list(self.relocate_from),
6192
                       )
6193

    
6194
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6195
      result = ial.in_text
6196
    else:
6197
      ial.Run(self.op.allocator, validate=False)
6198
      result = ial.out_text
6199
    return result