Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ ee513a66

History | View | Annotate | Download (217.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = lu.cfg.GetInstanceList()
396
  return utils.NiceSort(wanted)
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _CheckBooleanOpField(op, name):
419
  """Validates boolean opcode parameters.
420

421
  This will ensure that an opcode parameter is either a boolean value,
422
  or None (but that it always exists).
423

424
  """
425
  val = getattr(op, name, None)
426
  if not (val is None or isinstance(val, bool)):
427
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428
                               (name, str(val)))
429
  setattr(op, name, val)
430

    
431

    
432
def _CheckNodeOnline(lu, node):
433
  """Ensure that a given node is online.
434

435
  @param lu: the LU on behalf of which we make the check
436
  @param node: the node to check
437
  @raise errors.OpPrereqError: if the nodes is offline
438

439
  """
440
  if lu.cfg.GetNodeInfo(node).offline:
441
    raise errors.OpPrereqError("Can't use offline node %s" % node)
442

    
443

    
444
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445
                          memory, vcpus, nics):
446
  """Builds instance related env variables for hooks
447

448
  This builds the hook environment from individual variables.
449

450
  @type name: string
451
  @param name: the name of the instance
452
  @type primary_node: string
453
  @param primary_node: the name of the instance's primary node
454
  @type secondary_nodes: list
455
  @param secondary_nodes: list of secondary nodes as strings
456
  @type os_type: string
457
  @param os_type: the name of the instance's OS
458
  @type status: string
459
  @param status: the desired status of the instances
460
  @type memory: string
461
  @param memory: the memory size of the instance
462
  @type vcpus: string
463
  @param vcpus: the count of VCPUs the instance has
464
  @type nics: list
465
  @param nics: list of tuples (ip, bridge, mac) representing
466
      the NICs the instance  has
467
  @rtype: dict
468
  @return: the hook environment for this instance
469

470
  """
471
  env = {
472
    "OP_TARGET": name,
473
    "INSTANCE_NAME": name,
474
    "INSTANCE_PRIMARY": primary_node,
475
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
476
    "INSTANCE_OS_TYPE": os_type,
477
    "INSTANCE_STATUS": status,
478
    "INSTANCE_MEMORY": memory,
479
    "INSTANCE_VCPUS": vcpus,
480
  }
481

    
482
  if nics:
483
    nic_count = len(nics)
484
    for idx, (ip, bridge, mac) in enumerate(nics):
485
      if ip is None:
486
        ip = ""
487
      env["INSTANCE_NIC%d_IP" % idx] = ip
488
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
489
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
490
  else:
491
    nic_count = 0
492

    
493
  env["INSTANCE_NIC_COUNT"] = nic_count
494

    
495
  return env
496

    
497

    
498
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
499
  """Builds instance related env variables for hooks from an object.
500

501
  @type lu: L{LogicalUnit}
502
  @param lu: the logical unit on whose behalf we execute
503
  @type instance: L{objects.Instance}
504
  @param instance: the instance for which we should build the
505
      environment
506
  @type override: dict
507
  @param override: dictionary with key/values that will override
508
      our values
509
  @rtype: dict
510
  @return: the hook environment dictionary
511

512
  """
513
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
514
  args = {
515
    'name': instance.name,
516
    'primary_node': instance.primary_node,
517
    'secondary_nodes': instance.secondary_nodes,
518
    'os_type': instance.os,
519
    'status': instance.os,
520
    'memory': bep[constants.BE_MEMORY],
521
    'vcpus': bep[constants.BE_VCPUS],
522
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
523
  }
524
  if override:
525
    args.update(override)
526
  return _BuildInstanceHookEnv(**args)
527

    
528

    
529
def _AdjustCandidatePool(lu):
530
  """Adjust the candidate pool after node operations.
531

532
  """
533
  mod_list = lu.cfg.MaintainCandidatePool()
534
  if mod_list:
535
    lu.LogInfo("Promoted nodes to master candidate role: %s",
536
               ", ".join(node.name for node in mod_list))
537
    for name in mod_list:
538
      lu.context.ReaddNode(name)
539
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
540
  if mc_now > mc_max:
541
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
542
               (mc_now, mc_max))
543

    
544

    
545
def _CheckInstanceBridgesExist(lu, instance):
546
  """Check that the brigdes needed by an instance exist.
547

548
  """
549
  # check bridges existance
550
  brlist = [nic.bridge for nic in instance.nics]
551
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
552
  result.Raise()
553
  if not result.data:
554
    raise errors.OpPrereqError("One or more target bridges %s does not"
555
                               " exist on destination node '%s'" %
556
                               (brlist, instance.primary_node))
557

    
558

    
559
class LUDestroyCluster(NoHooksLU):
560
  """Logical unit for destroying the cluster.
561

562
  """
563
  _OP_REQP = []
564

    
565
  def CheckPrereq(self):
566
    """Check prerequisites.
567

568
    This checks whether the cluster is empty.
569

570
    Any errors are signalled by raising errors.OpPrereqError.
571

572
    """
573
    master = self.cfg.GetMasterNode()
574

    
575
    nodelist = self.cfg.GetNodeList()
576
    if len(nodelist) != 1 or nodelist[0] != master:
577
      raise errors.OpPrereqError("There are still %d node(s) in"
578
                                 " this cluster." % (len(nodelist) - 1))
579
    instancelist = self.cfg.GetInstanceList()
580
    if instancelist:
581
      raise errors.OpPrereqError("There are still %d instance(s) in"
582
                                 " this cluster." % len(instancelist))
583

    
584
  def Exec(self, feedback_fn):
585
    """Destroys the cluster.
586

587
    """
588
    master = self.cfg.GetMasterNode()
589
    result = self.rpc.call_node_stop_master(master, False)
590
    result.Raise()
591
    if not result.data:
592
      raise errors.OpExecError("Could not disable the master role")
593
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594
    utils.CreateBackup(priv_key)
595
    utils.CreateBackup(pub_key)
596
    return master
597

    
598

    
599
class LUVerifyCluster(LogicalUnit):
600
  """Verifies the cluster status.
601

602
  """
603
  HPATH = "cluster-verify"
604
  HTYPE = constants.HTYPE_CLUSTER
605
  _OP_REQP = ["skip_checks"]
606
  REQ_BGL = False
607

    
608
  def ExpandNames(self):
609
    self.needed_locks = {
610
      locking.LEVEL_NODE: locking.ALL_SET,
611
      locking.LEVEL_INSTANCE: locking.ALL_SET,
612
    }
613
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
614

    
615
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
616
                  node_result, feedback_fn, master_files):
617
    """Run multiple tests against a node.
618

619
    Test list:
620

621
      - compares ganeti version
622
      - checks vg existance and size > 20G
623
      - checks config file checksum
624
      - checks ssh to other nodes
625

626
    @type nodeinfo: L{objects.Node}
627
    @param nodeinfo: the node to check
628
    @param file_list: required list of files
629
    @param local_cksum: dictionary of local files and their checksums
630
    @param node_result: the results from the node
631
    @param feedback_fn: function used to accumulate results
632
    @param master_files: list of files that only masters should have
633

634
    """
635
    node = nodeinfo.name
636

    
637
    # main result, node_result should be a non-empty dict
638
    if not node_result or not isinstance(node_result, dict):
639
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
640
      return True
641

    
642
    # compares ganeti version
643
    local_version = constants.PROTOCOL_VERSION
644
    remote_version = node_result.get('version', None)
645
    if not remote_version:
646
      feedback_fn("  - ERROR: connection to %s failed" % (node))
647
      return True
648

    
649
    if local_version != remote_version:
650
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
651
                      (local_version, node, remote_version))
652
      return True
653

    
654
    # checks vg existance and size > 20G
655

    
656
    bad = False
657
    vglist = node_result.get(constants.NV_VGLIST, None)
658
    if not vglist:
659
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
660
                      (node,))
661
      bad = True
662
    else:
663
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
664
                                            constants.MIN_VG_SIZE)
665
      if vgstatus:
666
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
667
        bad = True
668

    
669
    # checks config file checksum
670

    
671
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
672
    if not isinstance(remote_cksum, dict):
673
      bad = True
674
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
675
    else:
676
      for file_name in file_list:
677
        node_is_mc = nodeinfo.master_candidate
678
        must_have_file = file_name not in master_files
679
        if file_name not in remote_cksum:
680
          if node_is_mc or must_have_file:
681
            bad = True
682
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
683
        elif remote_cksum[file_name] != local_cksum[file_name]:
684
          if node_is_mc or must_have_file:
685
            bad = True
686
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
687
          else:
688
            # not candidate and this is not a must-have file
689
            bad = True
690
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
691
                        " '%s'" % file_name)
692
        else:
693
          # all good, except non-master/non-must have combination
694
          if not node_is_mc and not must_have_file:
695
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
696
                        " candidates" % file_name)
697

    
698
    # checks ssh to any
699

    
700
    if constants.NV_NODELIST not in node_result:
701
      bad = True
702
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
703
    else:
704
      if node_result[constants.NV_NODELIST]:
705
        bad = True
706
        for node in node_result[constants.NV_NODELIST]:
707
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
708
                          (node, node_result[constants.NV_NODELIST][node]))
709

    
710
    if constants.NV_NODENETTEST not in node_result:
711
      bad = True
712
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
713
    else:
714
      if node_result[constants.NV_NODENETTEST]:
715
        bad = True
716
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
717
        for node in nlist:
718
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
719
                          (node, node_result[constants.NV_NODENETTEST][node]))
720

    
721
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
722
    if isinstance(hyp_result, dict):
723
      for hv_name, hv_result in hyp_result.iteritems():
724
        if hv_result is not None:
725
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
726
                      (hv_name, hv_result))
727
    return bad
728

    
729
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
730
                      node_instance, feedback_fn, n_offline):
731
    """Verify an instance.
732

733
    This function checks to see if the required block devices are
734
    available on the instance's node.
735

736
    """
737
    bad = False
738

    
739
    node_current = instanceconfig.primary_node
740

    
741
    node_vol_should = {}
742
    instanceconfig.MapLVsByNode(node_vol_should)
743

    
744
    for node in node_vol_should:
745
      if node in n_offline:
746
        # ignore missing volumes on offline nodes
747
        continue
748
      for volume in node_vol_should[node]:
749
        if node not in node_vol_is or volume not in node_vol_is[node]:
750
          feedback_fn("  - ERROR: volume %s missing on node %s" %
751
                          (volume, node))
752
          bad = True
753

    
754
    if not instanceconfig.status == 'down':
755
      if ((node_current not in node_instance or
756
          not instance in node_instance[node_current]) and
757
          node_current not in n_offline):
758
        feedback_fn("  - ERROR: instance %s not running on node %s" %
759
                        (instance, node_current))
760
        bad = True
761

    
762
    for node in node_instance:
763
      if (not node == node_current):
764
        if instance in node_instance[node]:
765
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
766
                          (instance, node))
767
          bad = True
768

    
769
    return bad
770

    
771
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
772
    """Verify if there are any unknown volumes in the cluster.
773

774
    The .os, .swap and backup volumes are ignored. All other volumes are
775
    reported as unknown.
776

777
    """
778
    bad = False
779

    
780
    for node in node_vol_is:
781
      for volume in node_vol_is[node]:
782
        if node not in node_vol_should or volume not in node_vol_should[node]:
783
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
784
                      (volume, node))
785
          bad = True
786
    return bad
787

    
788
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
789
    """Verify the list of running instances.
790

791
    This checks what instances are running but unknown to the cluster.
792

793
    """
794
    bad = False
795
    for node in node_instance:
796
      for runninginstance in node_instance[node]:
797
        if runninginstance not in instancelist:
798
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
799
                          (runninginstance, node))
800
          bad = True
801
    return bad
802

    
803
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
804
    """Verify N+1 Memory Resilience.
805

806
    Check that if one single node dies we can still start all the instances it
807
    was primary for.
808

809
    """
810
    bad = False
811

    
812
    for node, nodeinfo in node_info.iteritems():
813
      # This code checks that every node which is now listed as secondary has
814
      # enough memory to host all instances it is supposed to should a single
815
      # other node in the cluster fail.
816
      # FIXME: not ready for failover to an arbitrary node
817
      # FIXME: does not support file-backed instances
818
      # WARNING: we currently take into account down instances as well as up
819
      # ones, considering that even if they're down someone might want to start
820
      # them even in the event of a node failure.
821
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
822
        needed_mem = 0
823
        for instance in instances:
824
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
825
          if bep[constants.BE_AUTO_BALANCE]:
826
            needed_mem += bep[constants.BE_MEMORY]
827
        if nodeinfo['mfree'] < needed_mem:
828
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
829
                      " failovers should node %s fail" % (node, prinode))
830
          bad = True
831
    return bad
832

    
833
  def CheckPrereq(self):
834
    """Check prerequisites.
835

836
    Transform the list of checks we're going to skip into a set and check that
837
    all its members are valid.
838

839
    """
840
    self.skip_set = frozenset(self.op.skip_checks)
841
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
842
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
843

    
844
  def BuildHooksEnv(self):
845
    """Build hooks env.
846

847
    Cluster-Verify hooks just rone in the post phase and their failure makes
848
    the output be logged in the verify output and the verification to fail.
849

850
    """
851
    all_nodes = self.cfg.GetNodeList()
852
    # TODO: populate the environment with useful information for verify hooks
853
    env = {}
854
    return env, [], all_nodes
855

    
856
  def Exec(self, feedback_fn):
857
    """Verify integrity of cluster, performing various test on nodes.
858

859
    """
860
    bad = False
861
    feedback_fn("* Verifying global settings")
862
    for msg in self.cfg.VerifyConfig():
863
      feedback_fn("  - ERROR: %s" % msg)
864

    
865
    vg_name = self.cfg.GetVGName()
866
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
867
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
868
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
869
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
870
    i_non_redundant = [] # Non redundant instances
871
    i_non_a_balanced = [] # Non auto-balanced instances
872
    n_offline = [] # List of offline nodes
873
    node_volume = {}
874
    node_instance = {}
875
    node_info = {}
876
    instance_cfg = {}
877

    
878
    # FIXME: verify OS list
879
    # do local checksums
880
    master_files = [constants.CLUSTER_CONF_FILE]
881

    
882
    file_names = ssconf.SimpleStore().GetFileList()
883
    file_names.append(constants.SSL_CERT_FILE)
884
    file_names.extend(master_files)
885

    
886
    local_checksums = utils.FingerprintFiles(file_names)
887

    
888
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
889
    node_verify_param = {
890
      constants.NV_FILELIST: file_names,
891
      constants.NV_NODELIST: nodelist,
892
      constants.NV_HYPERVISOR: hypervisors,
893
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
894
                                  node.secondary_ip) for node in nodeinfo],
895
      constants.NV_LVLIST: vg_name,
896
      constants.NV_INSTANCELIST: hypervisors,
897
      constants.NV_VGLIST: None,
898
      constants.NV_VERSION: None,
899
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
900
      }
901
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
902
                                           self.cfg.GetClusterName())
903

    
904
    cluster = self.cfg.GetClusterInfo()
905
    master_node = self.cfg.GetMasterNode()
906
    for node_i in nodeinfo:
907
      node = node_i.name
908
      nresult = all_nvinfo[node].data
909

    
910
      if node_i.offline:
911
        feedback_fn("* Skipping offline node %s" % (node,))
912
        n_offline.append(node)
913
        continue
914

    
915
      if node == master_node:
916
        ntype = "master"
917
      elif node_i.master_candidate:
918
        ntype = "master candidate"
919
      else:
920
        ntype = "regular"
921
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
922

    
923
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
924
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
925
        bad = True
926
        continue
927

    
928
      result = self._VerifyNode(node_i, file_names, local_checksums,
929
                                nresult, feedback_fn, master_files)
930
      bad = bad or result
931

    
932
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
933
      if isinstance(lvdata, basestring):
934
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
935
                    (node, lvdata.encode('string_escape')))
936
        bad = True
937
        node_volume[node] = {}
938
      elif not isinstance(lvdata, dict):
939
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
940
        bad = True
941
        continue
942
      else:
943
        node_volume[node] = lvdata
944

    
945
      # node_instance
946
      idata = nresult.get(constants.NV_INSTANCELIST, None)
947
      if not isinstance(idata, list):
948
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
949
                    (node,))
950
        bad = True
951
        continue
952

    
953
      node_instance[node] = idata
954

    
955
      # node_info
956
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
957
      if not isinstance(nodeinfo, dict):
958
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
959
        bad = True
960
        continue
961

    
962
      try:
963
        node_info[node] = {
964
          "mfree": int(nodeinfo['memory_free']),
965
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
966
          "pinst": [],
967
          "sinst": [],
968
          # dictionary holding all instances this node is secondary for,
969
          # grouped by their primary node. Each key is a cluster node, and each
970
          # value is a list of instances which have the key as primary and the
971
          # current node as secondary.  this is handy to calculate N+1 memory
972
          # availability if you can only failover from a primary to its
973
          # secondary.
974
          "sinst-by-pnode": {},
975
        }
976
      except ValueError:
977
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
978
        bad = True
979
        continue
980

    
981
    node_vol_should = {}
982

    
983
    for instance in instancelist:
984
      feedback_fn("* Verifying instance %s" % instance)
985
      inst_config = self.cfg.GetInstanceInfo(instance)
986
      result =  self._VerifyInstance(instance, inst_config, node_volume,
987
                                     node_instance, feedback_fn, n_offline)
988
      bad = bad or result
989

    
990
      inst_config.MapLVsByNode(node_vol_should)
991

    
992
      instance_cfg[instance] = inst_config
993

    
994
      pnode = inst_config.primary_node
995
      if pnode in node_info:
996
        node_info[pnode]['pinst'].append(instance)
997
      elif pnode not in n_offline:
998
        feedback_fn("  - ERROR: instance %s, connection to primary node"
999
                    " %s failed" % (instance, pnode))
1000
        bad = True
1001

    
1002
      # If the instance is non-redundant we cannot survive losing its primary
1003
      # node, so we are not N+1 compliant. On the other hand we have no disk
1004
      # templates with more than one secondary so that situation is not well
1005
      # supported either.
1006
      # FIXME: does not support file-backed instances
1007
      if len(inst_config.secondary_nodes) == 0:
1008
        i_non_redundant.append(instance)
1009
      elif len(inst_config.secondary_nodes) > 1:
1010
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1011
                    % instance)
1012

    
1013
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1014
        i_non_a_balanced.append(instance)
1015

    
1016
      for snode in inst_config.secondary_nodes:
1017
        if snode in node_info:
1018
          node_info[snode]['sinst'].append(instance)
1019
          if pnode not in node_info[snode]['sinst-by-pnode']:
1020
            node_info[snode]['sinst-by-pnode'][pnode] = []
1021
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1022
        elif snode not in n_offline:
1023
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1024
                      " %s failed" % (instance, snode))
1025

    
1026
    feedback_fn("* Verifying orphan volumes")
1027
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1028
                                       feedback_fn)
1029
    bad = bad or result
1030

    
1031
    feedback_fn("* Verifying remaining instances")
1032
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1033
                                         feedback_fn)
1034
    bad = bad or result
1035

    
1036
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1037
      feedback_fn("* Verifying N+1 Memory redundancy")
1038
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1039
      bad = bad or result
1040

    
1041
    feedback_fn("* Other Notes")
1042
    if i_non_redundant:
1043
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1044
                  % len(i_non_redundant))
1045

    
1046
    if i_non_a_balanced:
1047
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1048
                  % len(i_non_a_balanced))
1049

    
1050
    if n_offline:
1051
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1052

    
1053
    return not bad
1054

    
1055
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1056
    """Analize the post-hooks' result
1057

1058
    This method analyses the hook result, handles it, and sends some
1059
    nicely-formatted feedback back to the user.
1060

1061
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1062
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1063
    @param hooks_results: the results of the multi-node hooks rpc call
1064
    @param feedback_fn: function used send feedback back to the caller
1065
    @param lu_result: previous Exec result
1066
    @return: the new Exec result, based on the previous result
1067
        and hook results
1068

1069
    """
1070
    # We only really run POST phase hooks, and are only interested in
1071
    # their results
1072
    if phase == constants.HOOKS_PHASE_POST:
1073
      # Used to change hooks' output to proper indentation
1074
      indent_re = re.compile('^', re.M)
1075
      feedback_fn("* Hooks Results")
1076
      if not hooks_results:
1077
        feedback_fn("  - ERROR: general communication failure")
1078
        lu_result = 1
1079
      else:
1080
        for node_name in hooks_results:
1081
          show_node_header = True
1082
          res = hooks_results[node_name]
1083
          if res.failed or res.data is False or not isinstance(res.data, list):
1084
            if res.offline:
1085
              # no need to warn or set fail return value
1086
              continue
1087
            feedback_fn("    Communication failure in hooks execution")
1088
            lu_result = 1
1089
            continue
1090
          for script, hkr, output in res.data:
1091
            if hkr == constants.HKR_FAIL:
1092
              # The node header is only shown once, if there are
1093
              # failing hooks on that node
1094
              if show_node_header:
1095
                feedback_fn("  Node %s:" % node_name)
1096
                show_node_header = False
1097
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1098
              output = indent_re.sub('      ', output)
1099
              feedback_fn("%s" % output)
1100
              lu_result = 1
1101

    
1102
      return lu_result
1103

    
1104

    
1105
class LUVerifyDisks(NoHooksLU):
1106
  """Verifies the cluster disks status.
1107

1108
  """
1109
  _OP_REQP = []
1110
  REQ_BGL = False
1111

    
1112
  def ExpandNames(self):
1113
    self.needed_locks = {
1114
      locking.LEVEL_NODE: locking.ALL_SET,
1115
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1116
    }
1117
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1118

    
1119
  def CheckPrereq(self):
1120
    """Check prerequisites.
1121

1122
    This has no prerequisites.
1123

1124
    """
1125
    pass
1126

    
1127
  def Exec(self, feedback_fn):
1128
    """Verify integrity of cluster disks.
1129

1130
    """
1131
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1132

    
1133
    vg_name = self.cfg.GetVGName()
1134
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1135
    instances = [self.cfg.GetInstanceInfo(name)
1136
                 for name in self.cfg.GetInstanceList()]
1137

    
1138
    nv_dict = {}
1139
    for inst in instances:
1140
      inst_lvs = {}
1141
      if (inst.status != "up" or
1142
          inst.disk_template not in constants.DTS_NET_MIRROR):
1143
        continue
1144
      inst.MapLVsByNode(inst_lvs)
1145
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1146
      for node, vol_list in inst_lvs.iteritems():
1147
        for vol in vol_list:
1148
          nv_dict[(node, vol)] = inst
1149

    
1150
    if not nv_dict:
1151
      return result
1152

    
1153
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1154

    
1155
    to_act = set()
1156
    for node in nodes:
1157
      # node_volume
1158
      lvs = node_lvs[node]
1159
      if lvs.failed:
1160
        if not lvs.offline:
1161
          self.LogWarning("Connection to node %s failed: %s" %
1162
                          (node, lvs.data))
1163
        continue
1164
      lvs = lvs.data
1165
      if isinstance(lvs, basestring):
1166
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1167
        res_nlvm[node] = lvs
1168
      elif not isinstance(lvs, dict):
1169
        logging.warning("Connection to node %s failed or invalid data"
1170
                        " returned", node)
1171
        res_nodes.append(node)
1172
        continue
1173

    
1174
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1175
        inst = nv_dict.pop((node, lv_name), None)
1176
        if (not lv_online and inst is not None
1177
            and inst.name not in res_instances):
1178
          res_instances.append(inst.name)
1179

    
1180
    # any leftover items in nv_dict are missing LVs, let's arrange the
1181
    # data better
1182
    for key, inst in nv_dict.iteritems():
1183
      if inst.name not in res_missing:
1184
        res_missing[inst.name] = []
1185
      res_missing[inst.name].append(key)
1186

    
1187
    return result
1188

    
1189

    
1190
class LURenameCluster(LogicalUnit):
1191
  """Rename the cluster.
1192

1193
  """
1194
  HPATH = "cluster-rename"
1195
  HTYPE = constants.HTYPE_CLUSTER
1196
  _OP_REQP = ["name"]
1197

    
1198
  def BuildHooksEnv(self):
1199
    """Build hooks env.
1200

1201
    """
1202
    env = {
1203
      "OP_TARGET": self.cfg.GetClusterName(),
1204
      "NEW_NAME": self.op.name,
1205
      }
1206
    mn = self.cfg.GetMasterNode()
1207
    return env, [mn], [mn]
1208

    
1209
  def CheckPrereq(self):
1210
    """Verify that the passed name is a valid one.
1211

1212
    """
1213
    hostname = utils.HostInfo(self.op.name)
1214

    
1215
    new_name = hostname.name
1216
    self.ip = new_ip = hostname.ip
1217
    old_name = self.cfg.GetClusterName()
1218
    old_ip = self.cfg.GetMasterIP()
1219
    if new_name == old_name and new_ip == old_ip:
1220
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1221
                                 " cluster has changed")
1222
    if new_ip != old_ip:
1223
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1224
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1225
                                   " reachable on the network. Aborting." %
1226
                                   new_ip)
1227

    
1228
    self.op.name = new_name
1229

    
1230
  def Exec(self, feedback_fn):
1231
    """Rename the cluster.
1232

1233
    """
1234
    clustername = self.op.name
1235
    ip = self.ip
1236

    
1237
    # shutdown the master IP
1238
    master = self.cfg.GetMasterNode()
1239
    result = self.rpc.call_node_stop_master(master, False)
1240
    if result.failed or not result.data:
1241
      raise errors.OpExecError("Could not disable the master role")
1242

    
1243
    try:
1244
      cluster = self.cfg.GetClusterInfo()
1245
      cluster.cluster_name = clustername
1246
      cluster.master_ip = ip
1247
      self.cfg.Update(cluster)
1248

    
1249
      # update the known hosts file
1250
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1251
      node_list = self.cfg.GetNodeList()
1252
      try:
1253
        node_list.remove(master)
1254
      except ValueError:
1255
        pass
1256
      result = self.rpc.call_upload_file(node_list,
1257
                                         constants.SSH_KNOWN_HOSTS_FILE)
1258
      for to_node, to_result in result.iteritems():
1259
        if to_result.failed or not to_result.data:
1260
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1261

    
1262
    finally:
1263
      result = self.rpc.call_node_start_master(master, False)
1264
      if result.failed or not result.data:
1265
        self.LogWarning("Could not re-enable the master role on"
1266
                        " the master, please restart manually.")
1267

    
1268

    
1269
def _RecursiveCheckIfLVMBased(disk):
1270
  """Check if the given disk or its children are lvm-based.
1271

1272
  @type disk: L{objects.Disk}
1273
  @param disk: the disk to check
1274
  @rtype: booleean
1275
  @return: boolean indicating whether a LD_LV dev_type was found or not
1276

1277
  """
1278
  if disk.children:
1279
    for chdisk in disk.children:
1280
      if _RecursiveCheckIfLVMBased(chdisk):
1281
        return True
1282
  return disk.dev_type == constants.LD_LV
1283

    
1284

    
1285
class LUSetClusterParams(LogicalUnit):
1286
  """Change the parameters of the cluster.
1287

1288
  """
1289
  HPATH = "cluster-modify"
1290
  HTYPE = constants.HTYPE_CLUSTER
1291
  _OP_REQP = []
1292
  REQ_BGL = False
1293

    
1294
  def CheckParameters(self):
1295
    """Check parameters
1296

1297
    """
1298
    if not hasattr(self.op, "candidate_pool_size"):
1299
      self.op.candidate_pool_size = None
1300
    if self.op.candidate_pool_size is not None:
1301
      try:
1302
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1303
      except ValueError, err:
1304
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1305
                                   str(err))
1306
      if self.op.candidate_pool_size < 1:
1307
        raise errors.OpPrereqError("At least one master candidate needed")
1308

    
1309
  def ExpandNames(self):
1310
    # FIXME: in the future maybe other cluster params won't require checking on
1311
    # all nodes to be modified.
1312
    self.needed_locks = {
1313
      locking.LEVEL_NODE: locking.ALL_SET,
1314
    }
1315
    self.share_locks[locking.LEVEL_NODE] = 1
1316

    
1317
  def BuildHooksEnv(self):
1318
    """Build hooks env.
1319

1320
    """
1321
    env = {
1322
      "OP_TARGET": self.cfg.GetClusterName(),
1323
      "NEW_VG_NAME": self.op.vg_name,
1324
      }
1325
    mn = self.cfg.GetMasterNode()
1326
    return env, [mn], [mn]
1327

    
1328
  def CheckPrereq(self):
1329
    """Check prerequisites.
1330

1331
    This checks whether the given params don't conflict and
1332
    if the given volume group is valid.
1333

1334
    """
1335
    # FIXME: This only works because there is only one parameter that can be
1336
    # changed or removed.
1337
    if self.op.vg_name is not None and not self.op.vg_name:
1338
      instances = self.cfg.GetAllInstancesInfo().values()
1339
      for inst in instances:
1340
        for disk in inst.disks:
1341
          if _RecursiveCheckIfLVMBased(disk):
1342
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1343
                                       " lvm-based instances exist")
1344

    
1345
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1346

    
1347
    # if vg_name not None, checks given volume group on all nodes
1348
    if self.op.vg_name:
1349
      vglist = self.rpc.call_vg_list(node_list)
1350
      for node in node_list:
1351
        if vglist[node].failed:
1352
          # ignoring down node
1353
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1354
          continue
1355
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1356
                                              self.op.vg_name,
1357
                                              constants.MIN_VG_SIZE)
1358
        if vgstatus:
1359
          raise errors.OpPrereqError("Error on node '%s': %s" %
1360
                                     (node, vgstatus))
1361

    
1362
    self.cluster = cluster = self.cfg.GetClusterInfo()
1363
    # validate beparams changes
1364
    if self.op.beparams:
1365
      utils.CheckBEParams(self.op.beparams)
1366
      self.new_beparams = cluster.FillDict(
1367
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1368

    
1369
    # hypervisor list/parameters
1370
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1371
    if self.op.hvparams:
1372
      if not isinstance(self.op.hvparams, dict):
1373
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1374
      for hv_name, hv_dict in self.op.hvparams.items():
1375
        if hv_name not in self.new_hvparams:
1376
          self.new_hvparams[hv_name] = hv_dict
1377
        else:
1378
          self.new_hvparams[hv_name].update(hv_dict)
1379

    
1380
    if self.op.enabled_hypervisors is not None:
1381
      self.hv_list = self.op.enabled_hypervisors
1382
    else:
1383
      self.hv_list = cluster.enabled_hypervisors
1384

    
1385
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1386
      # either the enabled list has changed, or the parameters have, validate
1387
      for hv_name, hv_params in self.new_hvparams.items():
1388
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1389
            (self.op.enabled_hypervisors and
1390
             hv_name in self.op.enabled_hypervisors)):
1391
          # either this is a new hypervisor, or its parameters have changed
1392
          hv_class = hypervisor.GetHypervisor(hv_name)
1393
          hv_class.CheckParameterSyntax(hv_params)
1394
          _CheckHVParams(self, node_list, hv_name, hv_params)
1395

    
1396
  def Exec(self, feedback_fn):
1397
    """Change the parameters of the cluster.
1398

1399
    """
1400
    if self.op.vg_name is not None:
1401
      if self.op.vg_name != self.cfg.GetVGName():
1402
        self.cfg.SetVGName(self.op.vg_name)
1403
      else:
1404
        feedback_fn("Cluster LVM configuration already in desired"
1405
                    " state, not changing")
1406
    if self.op.hvparams:
1407
      self.cluster.hvparams = self.new_hvparams
1408
    if self.op.enabled_hypervisors is not None:
1409
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1410
    if self.op.beparams:
1411
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1412
    if self.op.candidate_pool_size is not None:
1413
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1414

    
1415
    self.cfg.Update(self.cluster)
1416

    
1417
    # we want to update nodes after the cluster so that if any errors
1418
    # happen, we have recorded and saved the cluster info
1419
    if self.op.candidate_pool_size is not None:
1420
      _AdjustCandidatePool(self)
1421

    
1422

    
1423
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1424
  """Sleep and poll for an instance's disk to sync.
1425

1426
  """
1427
  if not instance.disks:
1428
    return True
1429

    
1430
  if not oneshot:
1431
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1432

    
1433
  node = instance.primary_node
1434

    
1435
  for dev in instance.disks:
1436
    lu.cfg.SetDiskID(dev, node)
1437

    
1438
  retries = 0
1439
  while True:
1440
    max_time = 0
1441
    done = True
1442
    cumul_degraded = False
1443
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1444
    if rstats.failed or not rstats.data:
1445
      lu.LogWarning("Can't get any data from node %s", node)
1446
      retries += 1
1447
      if retries >= 10:
1448
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1449
                                 " aborting." % node)
1450
      time.sleep(6)
1451
      continue
1452
    rstats = rstats.data
1453
    retries = 0
1454
    for i in range(len(rstats)):
1455
      mstat = rstats[i]
1456
      if mstat is None:
1457
        lu.LogWarning("Can't compute data for node %s/%s",
1458
                           node, instance.disks[i].iv_name)
1459
        continue
1460
      # we ignore the ldisk parameter
1461
      perc_done, est_time, is_degraded, _ = mstat
1462
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1463
      if perc_done is not None:
1464
        done = False
1465
        if est_time is not None:
1466
          rem_time = "%d estimated seconds remaining" % est_time
1467
          max_time = est_time
1468
        else:
1469
          rem_time = "no time estimate"
1470
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1471
                        (instance.disks[i].iv_name, perc_done, rem_time))
1472
    if done or oneshot:
1473
      break
1474

    
1475
    time.sleep(min(60, max_time))
1476

    
1477
  if done:
1478
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1479
  return not cumul_degraded
1480

    
1481

    
1482
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1483
  """Check that mirrors are not degraded.
1484

1485
  The ldisk parameter, if True, will change the test from the
1486
  is_degraded attribute (which represents overall non-ok status for
1487
  the device(s)) to the ldisk (representing the local storage status).
1488

1489
  """
1490
  lu.cfg.SetDiskID(dev, node)
1491
  if ldisk:
1492
    idx = 6
1493
  else:
1494
    idx = 5
1495

    
1496
  result = True
1497
  if on_primary or dev.AssembleOnSecondary():
1498
    rstats = lu.rpc.call_blockdev_find(node, dev)
1499
    if rstats.failed or not rstats.data:
1500
      logging.warning("Node %s: disk degraded, not found or node down", node)
1501
      result = False
1502
    else:
1503
      result = result and (not rstats.data[idx])
1504
  if dev.children:
1505
    for child in dev.children:
1506
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1507

    
1508
  return result
1509

    
1510

    
1511
class LUDiagnoseOS(NoHooksLU):
1512
  """Logical unit for OS diagnose/query.
1513

1514
  """
1515
  _OP_REQP = ["output_fields", "names"]
1516
  REQ_BGL = False
1517
  _FIELDS_STATIC = utils.FieldSet()
1518
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1519

    
1520
  def ExpandNames(self):
1521
    if self.op.names:
1522
      raise errors.OpPrereqError("Selective OS query not supported")
1523

    
1524
    _CheckOutputFields(static=self._FIELDS_STATIC,
1525
                       dynamic=self._FIELDS_DYNAMIC,
1526
                       selected=self.op.output_fields)
1527

    
1528
    # Lock all nodes, in shared mode
1529
    self.needed_locks = {}
1530
    self.share_locks[locking.LEVEL_NODE] = 1
1531
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1532

    
1533
  def CheckPrereq(self):
1534
    """Check prerequisites.
1535

1536
    """
1537

    
1538
  @staticmethod
1539
  def _DiagnoseByOS(node_list, rlist):
1540
    """Remaps a per-node return list into an a per-os per-node dictionary
1541

1542
    @param node_list: a list with the names of all nodes
1543
    @param rlist: a map with node names as keys and OS objects as values
1544

1545
    @rtype: dict
1546
    @returns: a dictionary with osnames as keys and as value another map, with
1547
        nodes as keys and list of OS objects as values, eg::
1548

1549
          {"debian-etch": {"node1": [<object>,...],
1550
                           "node2": [<object>,]}
1551
          }
1552

1553
    """
1554
    all_os = {}
1555
    for node_name, nr in rlist.iteritems():
1556
      if nr.failed or not nr.data:
1557
        continue
1558
      for os_obj in nr.data:
1559
        if os_obj.name not in all_os:
1560
          # build a list of nodes for this os containing empty lists
1561
          # for each node in node_list
1562
          all_os[os_obj.name] = {}
1563
          for nname in node_list:
1564
            all_os[os_obj.name][nname] = []
1565
        all_os[os_obj.name][node_name].append(os_obj)
1566
    return all_os
1567

    
1568
  def Exec(self, feedback_fn):
1569
    """Compute the list of OSes.
1570

1571
    """
1572
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1573
    node_data = self.rpc.call_os_diagnose(node_list)
1574
    if node_data == False:
1575
      raise errors.OpExecError("Can't gather the list of OSes")
1576
    pol = self._DiagnoseByOS(node_list, node_data)
1577
    output = []
1578
    for os_name, os_data in pol.iteritems():
1579
      row = []
1580
      for field in self.op.output_fields:
1581
        if field == "name":
1582
          val = os_name
1583
        elif field == "valid":
1584
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1585
        elif field == "node_status":
1586
          val = {}
1587
          for node_name, nos_list in os_data.iteritems():
1588
            val[node_name] = [(v.status, v.path) for v in nos_list]
1589
        else:
1590
          raise errors.ParameterError(field)
1591
        row.append(val)
1592
      output.append(row)
1593

    
1594
    return output
1595

    
1596

    
1597
class LURemoveNode(LogicalUnit):
1598
  """Logical unit for removing a node.
1599

1600
  """
1601
  HPATH = "node-remove"
1602
  HTYPE = constants.HTYPE_NODE
1603
  _OP_REQP = ["node_name"]
1604

    
1605
  def BuildHooksEnv(self):
1606
    """Build hooks env.
1607

1608
    This doesn't run on the target node in the pre phase as a failed
1609
    node would then be impossible to remove.
1610

1611
    """
1612
    env = {
1613
      "OP_TARGET": self.op.node_name,
1614
      "NODE_NAME": self.op.node_name,
1615
      }
1616
    all_nodes = self.cfg.GetNodeList()
1617
    all_nodes.remove(self.op.node_name)
1618
    return env, all_nodes, all_nodes
1619

    
1620
  def CheckPrereq(self):
1621
    """Check prerequisites.
1622

1623
    This checks:
1624
     - the node exists in the configuration
1625
     - it does not have primary or secondary instances
1626
     - it's not the master
1627

1628
    Any errors are signalled by raising errors.OpPrereqError.
1629

1630
    """
1631
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1632
    if node is None:
1633
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1634

    
1635
    instance_list = self.cfg.GetInstanceList()
1636

    
1637
    masternode = self.cfg.GetMasterNode()
1638
    if node.name == masternode:
1639
      raise errors.OpPrereqError("Node is the master node,"
1640
                                 " you need to failover first.")
1641

    
1642
    for instance_name in instance_list:
1643
      instance = self.cfg.GetInstanceInfo(instance_name)
1644
      if node.name == instance.primary_node:
1645
        raise errors.OpPrereqError("Instance %s still running on the node,"
1646
                                   " please remove first." % instance_name)
1647
      if node.name in instance.secondary_nodes:
1648
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1649
                                   " please remove first." % instance_name)
1650
    self.op.node_name = node.name
1651
    self.node = node
1652

    
1653
  def Exec(self, feedback_fn):
1654
    """Removes the node from the cluster.
1655

1656
    """
1657
    node = self.node
1658
    logging.info("Stopping the node daemon and removing configs from node %s",
1659
                 node.name)
1660

    
1661
    self.context.RemoveNode(node.name)
1662

    
1663
    self.rpc.call_node_leave_cluster(node.name)
1664

    
1665
    # Promote nodes to master candidate as needed
1666
    _AdjustCandidatePool(self)
1667

    
1668

    
1669
class LUQueryNodes(NoHooksLU):
1670
  """Logical unit for querying nodes.
1671

1672
  """
1673
  _OP_REQP = ["output_fields", "names"]
1674
  REQ_BGL = False
1675
  _FIELDS_DYNAMIC = utils.FieldSet(
1676
    "dtotal", "dfree",
1677
    "mtotal", "mnode", "mfree",
1678
    "bootid",
1679
    "ctotal",
1680
    )
1681

    
1682
  _FIELDS_STATIC = utils.FieldSet(
1683
    "name", "pinst_cnt", "sinst_cnt",
1684
    "pinst_list", "sinst_list",
1685
    "pip", "sip", "tags",
1686
    "serial_no",
1687
    "master_candidate",
1688
    "master",
1689
    "offline",
1690
    )
1691

    
1692
  def ExpandNames(self):
1693
    _CheckOutputFields(static=self._FIELDS_STATIC,
1694
                       dynamic=self._FIELDS_DYNAMIC,
1695
                       selected=self.op.output_fields)
1696

    
1697
    self.needed_locks = {}
1698
    self.share_locks[locking.LEVEL_NODE] = 1
1699

    
1700
    if self.op.names:
1701
      self.wanted = _GetWantedNodes(self, self.op.names)
1702
    else:
1703
      self.wanted = locking.ALL_SET
1704

    
1705
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1706
    if self.do_locking:
1707
      # if we don't request only static fields, we need to lock the nodes
1708
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1709

    
1710

    
1711
  def CheckPrereq(self):
1712
    """Check prerequisites.
1713

1714
    """
1715
    # The validation of the node list is done in the _GetWantedNodes,
1716
    # if non empty, and if empty, there's no validation to do
1717
    pass
1718

    
1719
  def Exec(self, feedback_fn):
1720
    """Computes the list of nodes and their attributes.
1721

1722
    """
1723
    all_info = self.cfg.GetAllNodesInfo()
1724
    if self.do_locking:
1725
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1726
    elif self.wanted != locking.ALL_SET:
1727
      nodenames = self.wanted
1728
      missing = set(nodenames).difference(all_info.keys())
1729
      if missing:
1730
        raise errors.OpExecError(
1731
          "Some nodes were removed before retrieving their data: %s" % missing)
1732
    else:
1733
      nodenames = all_info.keys()
1734

    
1735
    nodenames = utils.NiceSort(nodenames)
1736
    nodelist = [all_info[name] for name in nodenames]
1737

    
1738
    # begin data gathering
1739

    
1740
    if self.do_locking:
1741
      live_data = {}
1742
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1743
                                          self.cfg.GetHypervisorType())
1744
      for name in nodenames:
1745
        nodeinfo = node_data[name]
1746
        if not nodeinfo.failed and nodeinfo.data:
1747
          nodeinfo = nodeinfo.data
1748
          fn = utils.TryConvert
1749
          live_data[name] = {
1750
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1751
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1752
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1753
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1754
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1755
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1756
            "bootid": nodeinfo.get('bootid', None),
1757
            }
1758
        else:
1759
          live_data[name] = {}
1760
    else:
1761
      live_data = dict.fromkeys(nodenames, {})
1762

    
1763
    node_to_primary = dict([(name, set()) for name in nodenames])
1764
    node_to_secondary = dict([(name, set()) for name in nodenames])
1765

    
1766
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1767
                             "sinst_cnt", "sinst_list"))
1768
    if inst_fields & frozenset(self.op.output_fields):
1769
      instancelist = self.cfg.GetInstanceList()
1770

    
1771
      for instance_name in instancelist:
1772
        inst = self.cfg.GetInstanceInfo(instance_name)
1773
        if inst.primary_node in node_to_primary:
1774
          node_to_primary[inst.primary_node].add(inst.name)
1775
        for secnode in inst.secondary_nodes:
1776
          if secnode in node_to_secondary:
1777
            node_to_secondary[secnode].add(inst.name)
1778

    
1779
    master_node = self.cfg.GetMasterNode()
1780

    
1781
    # end data gathering
1782

    
1783
    output = []
1784
    for node in nodelist:
1785
      node_output = []
1786
      for field in self.op.output_fields:
1787
        if field == "name":
1788
          val = node.name
1789
        elif field == "pinst_list":
1790
          val = list(node_to_primary[node.name])
1791
        elif field == "sinst_list":
1792
          val = list(node_to_secondary[node.name])
1793
        elif field == "pinst_cnt":
1794
          val = len(node_to_primary[node.name])
1795
        elif field == "sinst_cnt":
1796
          val = len(node_to_secondary[node.name])
1797
        elif field == "pip":
1798
          val = node.primary_ip
1799
        elif field == "sip":
1800
          val = node.secondary_ip
1801
        elif field == "tags":
1802
          val = list(node.GetTags())
1803
        elif field == "serial_no":
1804
          val = node.serial_no
1805
        elif field == "master_candidate":
1806
          val = node.master_candidate
1807
        elif field == "master":
1808
          val = node.name == master_node
1809
        elif field == "offline":
1810
          val = node.offline
1811
        elif self._FIELDS_DYNAMIC.Matches(field):
1812
          val = live_data[node.name].get(field, None)
1813
        else:
1814
          raise errors.ParameterError(field)
1815
        node_output.append(val)
1816
      output.append(node_output)
1817

    
1818
    return output
1819

    
1820

    
1821
class LUQueryNodeVolumes(NoHooksLU):
1822
  """Logical unit for getting volumes on node(s).
1823

1824
  """
1825
  _OP_REQP = ["nodes", "output_fields"]
1826
  REQ_BGL = False
1827
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1828
  _FIELDS_STATIC = utils.FieldSet("node")
1829

    
1830
  def ExpandNames(self):
1831
    _CheckOutputFields(static=self._FIELDS_STATIC,
1832
                       dynamic=self._FIELDS_DYNAMIC,
1833
                       selected=self.op.output_fields)
1834

    
1835
    self.needed_locks = {}
1836
    self.share_locks[locking.LEVEL_NODE] = 1
1837
    if not self.op.nodes:
1838
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1839
    else:
1840
      self.needed_locks[locking.LEVEL_NODE] = \
1841
        _GetWantedNodes(self, self.op.nodes)
1842

    
1843
  def CheckPrereq(self):
1844
    """Check prerequisites.
1845

1846
    This checks that the fields required are valid output fields.
1847

1848
    """
1849
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1850

    
1851
  def Exec(self, feedback_fn):
1852
    """Computes the list of nodes and their attributes.
1853

1854
    """
1855
    nodenames = self.nodes
1856
    volumes = self.rpc.call_node_volumes(nodenames)
1857

    
1858
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1859
             in self.cfg.GetInstanceList()]
1860

    
1861
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1862

    
1863
    output = []
1864
    for node in nodenames:
1865
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1866
        continue
1867

    
1868
      node_vols = volumes[node].data[:]
1869
      node_vols.sort(key=lambda vol: vol['dev'])
1870

    
1871
      for vol in node_vols:
1872
        node_output = []
1873
        for field in self.op.output_fields:
1874
          if field == "node":
1875
            val = node
1876
          elif field == "phys":
1877
            val = vol['dev']
1878
          elif field == "vg":
1879
            val = vol['vg']
1880
          elif field == "name":
1881
            val = vol['name']
1882
          elif field == "size":
1883
            val = int(float(vol['size']))
1884
          elif field == "instance":
1885
            for inst in ilist:
1886
              if node not in lv_by_node[inst]:
1887
                continue
1888
              if vol['name'] in lv_by_node[inst][node]:
1889
                val = inst.name
1890
                break
1891
            else:
1892
              val = '-'
1893
          else:
1894
            raise errors.ParameterError(field)
1895
          node_output.append(str(val))
1896

    
1897
        output.append(node_output)
1898

    
1899
    return output
1900

    
1901

    
1902
class LUAddNode(LogicalUnit):
1903
  """Logical unit for adding node to the cluster.
1904

1905
  """
1906
  HPATH = "node-add"
1907
  HTYPE = constants.HTYPE_NODE
1908
  _OP_REQP = ["node_name"]
1909

    
1910
  def BuildHooksEnv(self):
1911
    """Build hooks env.
1912

1913
    This will run on all nodes before, and on all nodes + the new node after.
1914

1915
    """
1916
    env = {
1917
      "OP_TARGET": self.op.node_name,
1918
      "NODE_NAME": self.op.node_name,
1919
      "NODE_PIP": self.op.primary_ip,
1920
      "NODE_SIP": self.op.secondary_ip,
1921
      }
1922
    nodes_0 = self.cfg.GetNodeList()
1923
    nodes_1 = nodes_0 + [self.op.node_name, ]
1924
    return env, nodes_0, nodes_1
1925

    
1926
  def CheckPrereq(self):
1927
    """Check prerequisites.
1928

1929
    This checks:
1930
     - the new node is not already in the config
1931
     - it is resolvable
1932
     - its parameters (single/dual homed) matches the cluster
1933

1934
    Any errors are signalled by raising errors.OpPrereqError.
1935

1936
    """
1937
    node_name = self.op.node_name
1938
    cfg = self.cfg
1939

    
1940
    dns_data = utils.HostInfo(node_name)
1941

    
1942
    node = dns_data.name
1943
    primary_ip = self.op.primary_ip = dns_data.ip
1944
    secondary_ip = getattr(self.op, "secondary_ip", None)
1945
    if secondary_ip is None:
1946
      secondary_ip = primary_ip
1947
    if not utils.IsValidIP(secondary_ip):
1948
      raise errors.OpPrereqError("Invalid secondary IP given")
1949
    self.op.secondary_ip = secondary_ip
1950

    
1951
    node_list = cfg.GetNodeList()
1952
    if not self.op.readd and node in node_list:
1953
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1954
                                 node)
1955
    elif self.op.readd and node not in node_list:
1956
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1957

    
1958
    for existing_node_name in node_list:
1959
      existing_node = cfg.GetNodeInfo(existing_node_name)
1960

    
1961
      if self.op.readd and node == existing_node_name:
1962
        if (existing_node.primary_ip != primary_ip or
1963
            existing_node.secondary_ip != secondary_ip):
1964
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1965
                                     " address configuration as before")
1966
        continue
1967

    
1968
      if (existing_node.primary_ip == primary_ip or
1969
          existing_node.secondary_ip == primary_ip or
1970
          existing_node.primary_ip == secondary_ip or
1971
          existing_node.secondary_ip == secondary_ip):
1972
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1973
                                   " existing node %s" % existing_node.name)
1974

    
1975
    # check that the type of the node (single versus dual homed) is the
1976
    # same as for the master
1977
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1978
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1979
    newbie_singlehomed = secondary_ip == primary_ip
1980
    if master_singlehomed != newbie_singlehomed:
1981
      if master_singlehomed:
1982
        raise errors.OpPrereqError("The master has no private ip but the"
1983
                                   " new node has one")
1984
      else:
1985
        raise errors.OpPrereqError("The master has a private ip but the"
1986
                                   " new node doesn't have one")
1987

    
1988
    # checks reachablity
1989
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1990
      raise errors.OpPrereqError("Node not reachable by ping")
1991

    
1992
    if not newbie_singlehomed:
1993
      # check reachability from my secondary ip to newbie's secondary ip
1994
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1995
                           source=myself.secondary_ip):
1996
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1997
                                   " based ping to noded port")
1998

    
1999
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2000
    node_info = self.cfg.GetAllNodesInfo().values()
2001
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2002
    master_candidate = mc_now < cp_size
2003

    
2004
    self.new_node = objects.Node(name=node,
2005
                                 primary_ip=primary_ip,
2006
                                 secondary_ip=secondary_ip,
2007
                                 master_candidate=master_candidate,
2008
                                 offline=False)
2009

    
2010
  def Exec(self, feedback_fn):
2011
    """Adds the new node to the cluster.
2012

2013
    """
2014
    new_node = self.new_node
2015
    node = new_node.name
2016

    
2017
    # check connectivity
2018
    result = self.rpc.call_version([node])[node]
2019
    result.Raise()
2020
    if result.data:
2021
      if constants.PROTOCOL_VERSION == result.data:
2022
        logging.info("Communication to node %s fine, sw version %s match",
2023
                     node, result.data)
2024
      else:
2025
        raise errors.OpExecError("Version mismatch master version %s,"
2026
                                 " node version %s" %
2027
                                 (constants.PROTOCOL_VERSION, result.data))
2028
    else:
2029
      raise errors.OpExecError("Cannot get version from the new node")
2030

    
2031
    # setup ssh on node
2032
    logging.info("Copy ssh key to node %s", node)
2033
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2034
    keyarray = []
2035
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2036
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2037
                priv_key, pub_key]
2038

    
2039
    for i in keyfiles:
2040
      f = open(i, 'r')
2041
      try:
2042
        keyarray.append(f.read())
2043
      finally:
2044
        f.close()
2045

    
2046
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2047
                                    keyarray[2],
2048
                                    keyarray[3], keyarray[4], keyarray[5])
2049

    
2050
    if result.failed or not result.data:
2051
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2052

    
2053
    # Add node to our /etc/hosts, and add key to known_hosts
2054
    utils.AddHostToEtcHosts(new_node.name)
2055

    
2056
    if new_node.secondary_ip != new_node.primary_ip:
2057
      result = self.rpc.call_node_has_ip_address(new_node.name,
2058
                                                 new_node.secondary_ip)
2059
      if result.failed or not result.data:
2060
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2061
                                 " you gave (%s). Please fix and re-run this"
2062
                                 " command." % new_node.secondary_ip)
2063

    
2064
    node_verify_list = [self.cfg.GetMasterNode()]
2065
    node_verify_param = {
2066
      'nodelist': [node],
2067
      # TODO: do a node-net-test as well?
2068
    }
2069

    
2070
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2071
                                       self.cfg.GetClusterName())
2072
    for verifier in node_verify_list:
2073
      if result[verifier].failed or not result[verifier].data:
2074
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2075
                                 " for remote verification" % verifier)
2076
      if result[verifier].data['nodelist']:
2077
        for failed in result[verifier].data['nodelist']:
2078
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2079
                      (verifier, result[verifier]['nodelist'][failed]))
2080
        raise errors.OpExecError("ssh/hostname verification failed.")
2081

    
2082
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2083
    # including the node just added
2084
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2085
    dist_nodes = self.cfg.GetNodeList()
2086
    if not self.op.readd:
2087
      dist_nodes.append(node)
2088
    if myself.name in dist_nodes:
2089
      dist_nodes.remove(myself.name)
2090

    
2091
    logging.debug("Copying hosts and known_hosts to all nodes")
2092
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2093
      result = self.rpc.call_upload_file(dist_nodes, fname)
2094
      for to_node, to_result in result.iteritems():
2095
        if to_result.failed or not to_result.data:
2096
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2097

    
2098
    to_copy = []
2099
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2100
      to_copy.append(constants.VNC_PASSWORD_FILE)
2101
    for fname in to_copy:
2102
      result = self.rpc.call_upload_file([node], fname)
2103
      if result[node].failed or not result[node]:
2104
        logging.error("Could not copy file %s to node %s", fname, node)
2105

    
2106
    if self.op.readd:
2107
      self.context.ReaddNode(new_node)
2108
    else:
2109
      self.context.AddNode(new_node)
2110

    
2111

    
2112
class LUSetNodeParams(LogicalUnit):
2113
  """Modifies the parameters of a node.
2114

2115
  """
2116
  HPATH = "node-modify"
2117
  HTYPE = constants.HTYPE_NODE
2118
  _OP_REQP = ["node_name"]
2119
  REQ_BGL = False
2120

    
2121
  def CheckArguments(self):
2122
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2123
    if node_name is None:
2124
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2125
    self.op.node_name = node_name
2126
    _CheckBooleanOpField(self.op, 'master_candidate')
2127
    _CheckBooleanOpField(self.op, 'offline')
2128
    if self.op.master_candidate is None and self.op.offline is None:
2129
      raise errors.OpPrereqError("Please pass at least one modification")
2130
    if self.op.offline == True and self.op.master_candidate == True:
2131
      raise errors.OpPrereqError("Can't set the node into offline and"
2132
                                 " master_candidate at the same time")
2133

    
2134
  def ExpandNames(self):
2135
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2136

    
2137
  def BuildHooksEnv(self):
2138
    """Build hooks env.
2139

2140
    This runs on the master node.
2141

2142
    """
2143
    env = {
2144
      "OP_TARGET": self.op.node_name,
2145
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2146
      "OFFLINE": str(self.op.offline),
2147
      }
2148
    nl = [self.cfg.GetMasterNode(),
2149
          self.op.node_name]
2150
    return env, nl, nl
2151

    
2152
  def CheckPrereq(self):
2153
    """Check prerequisites.
2154

2155
    This only checks the instance list against the existing names.
2156

2157
    """
2158
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2159

    
2160
    if ((self.op.master_candidate == False or self.op.offline == True)
2161
        and node.master_candidate):
2162
      # we will demote the node from master_candidate
2163
      if self.op.node_name == self.cfg.GetMasterNode():
2164
        raise errors.OpPrereqError("The master node has to be a"
2165
                                   " master candidate and online")
2166
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2167
      node_info = self.cfg.GetAllNodesInfo().values()
2168
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2169
      if num_candidates <= cp_size:
2170
        msg = ("Not enough master candidates (desired"
2171
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2172
        if self.op.force:
2173
          self.LogWarning(msg)
2174
        else:
2175
          raise errors.OpPrereqError(msg)
2176

    
2177
    if (self.op.master_candidate == True and node.offline and
2178
        not self.op.offline == False):
2179
      raise errors.OpPrereqError("Can't set an offline node to"
2180
                                 " master_candidate")
2181

    
2182
    return
2183

    
2184
  def Exec(self, feedback_fn):
2185
    """Modifies a node.
2186

2187
    """
2188
    node = self.node
2189

    
2190
    result = []
2191

    
2192
    if self.op.offline is not None:
2193
      node.offline = self.op.offline
2194
      result.append(("offline", str(self.op.offline)))
2195
      if self.op.offline == True and node.master_candidate:
2196
        node.master_candidate = False
2197
        result.append(("master_candidate", "auto-demotion due to offline"))
2198

    
2199
    if self.op.master_candidate is not None:
2200
      node.master_candidate = self.op.master_candidate
2201
      result.append(("master_candidate", str(self.op.master_candidate)))
2202
      if self.op.master_candidate == False:
2203
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2204
        if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2205
            or len(rrc.data) != 2):
2206
          self.LogWarning("Node rpc error: %s" % rrc.error)
2207
        elif not rrc.data[0]:
2208
          self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2209

    
2210
    # this will trigger configuration file update, if needed
2211
    self.cfg.Update(node)
2212
    # this will trigger job queue propagation or cleanup
2213
    if self.op.node_name != self.cfg.GetMasterNode():
2214
      self.context.ReaddNode(node)
2215

    
2216
    return result
2217

    
2218

    
2219
class LUQueryClusterInfo(NoHooksLU):
2220
  """Query cluster configuration.
2221

2222
  """
2223
  _OP_REQP = []
2224
  REQ_BGL = False
2225

    
2226
  def ExpandNames(self):
2227
    self.needed_locks = {}
2228

    
2229
  def CheckPrereq(self):
2230
    """No prerequsites needed for this LU.
2231

2232
    """
2233
    pass
2234

    
2235
  def Exec(self, feedback_fn):
2236
    """Return cluster config.
2237

2238
    """
2239
    cluster = self.cfg.GetClusterInfo()
2240
    result = {
2241
      "software_version": constants.RELEASE_VERSION,
2242
      "protocol_version": constants.PROTOCOL_VERSION,
2243
      "config_version": constants.CONFIG_VERSION,
2244
      "os_api_version": constants.OS_API_VERSION,
2245
      "export_version": constants.EXPORT_VERSION,
2246
      "architecture": (platform.architecture()[0], platform.machine()),
2247
      "name": cluster.cluster_name,
2248
      "master": cluster.master_node,
2249
      "default_hypervisor": cluster.default_hypervisor,
2250
      "enabled_hypervisors": cluster.enabled_hypervisors,
2251
      "hvparams": cluster.hvparams,
2252
      "beparams": cluster.beparams,
2253
      "candidate_pool_size": cluster.candidate_pool_size,
2254
      }
2255

    
2256
    return result
2257

    
2258

    
2259
class LUQueryConfigValues(NoHooksLU):
2260
  """Return configuration values.
2261

2262
  """
2263
  _OP_REQP = []
2264
  REQ_BGL = False
2265
  _FIELDS_DYNAMIC = utils.FieldSet()
2266
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2267

    
2268
  def ExpandNames(self):
2269
    self.needed_locks = {}
2270

    
2271
    _CheckOutputFields(static=self._FIELDS_STATIC,
2272
                       dynamic=self._FIELDS_DYNAMIC,
2273
                       selected=self.op.output_fields)
2274

    
2275
  def CheckPrereq(self):
2276
    """No prerequisites.
2277

2278
    """
2279
    pass
2280

    
2281
  def Exec(self, feedback_fn):
2282
    """Dump a representation of the cluster config to the standard output.
2283

2284
    """
2285
    values = []
2286
    for field in self.op.output_fields:
2287
      if field == "cluster_name":
2288
        entry = self.cfg.GetClusterName()
2289
      elif field == "master_node":
2290
        entry = self.cfg.GetMasterNode()
2291
      elif field == "drain_flag":
2292
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2293
      else:
2294
        raise errors.ParameterError(field)
2295
      values.append(entry)
2296
    return values
2297

    
2298

    
2299
class LUActivateInstanceDisks(NoHooksLU):
2300
  """Bring up an instance's disks.
2301

2302
  """
2303
  _OP_REQP = ["instance_name"]
2304
  REQ_BGL = False
2305

    
2306
  def ExpandNames(self):
2307
    self._ExpandAndLockInstance()
2308
    self.needed_locks[locking.LEVEL_NODE] = []
2309
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2310

    
2311
  def DeclareLocks(self, level):
2312
    if level == locking.LEVEL_NODE:
2313
      self._LockInstancesNodes()
2314

    
2315
  def CheckPrereq(self):
2316
    """Check prerequisites.
2317

2318
    This checks that the instance is in the cluster.
2319

2320
    """
2321
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2322
    assert self.instance is not None, \
2323
      "Cannot retrieve locked instance %s" % self.op.instance_name
2324
    _CheckNodeOnline(self, instance.primary_node)
2325

    
2326
  def Exec(self, feedback_fn):
2327
    """Activate the disks.
2328

2329
    """
2330
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2331
    if not disks_ok:
2332
      raise errors.OpExecError("Cannot activate block devices")
2333

    
2334
    return disks_info
2335

    
2336

    
2337
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2338
  """Prepare the block devices for an instance.
2339

2340
  This sets up the block devices on all nodes.
2341

2342
  @type lu: L{LogicalUnit}
2343
  @param lu: the logical unit on whose behalf we execute
2344
  @type instance: L{objects.Instance}
2345
  @param instance: the instance for whose disks we assemble
2346
  @type ignore_secondaries: boolean
2347
  @param ignore_secondaries: if true, errors on secondary nodes
2348
      won't result in an error return from the function
2349
  @return: False if the operation failed, otherwise a list of
2350
      (host, instance_visible_name, node_visible_name)
2351
      with the mapping from node devices to instance devices
2352

2353
  """
2354
  device_info = []
2355
  disks_ok = True
2356
  iname = instance.name
2357
  # With the two passes mechanism we try to reduce the window of
2358
  # opportunity for the race condition of switching DRBD to primary
2359
  # before handshaking occured, but we do not eliminate it
2360

    
2361
  # The proper fix would be to wait (with some limits) until the
2362
  # connection has been made and drbd transitions from WFConnection
2363
  # into any other network-connected state (Connected, SyncTarget,
2364
  # SyncSource, etc.)
2365

    
2366
  # 1st pass, assemble on all nodes in secondary mode
2367
  for inst_disk in instance.disks:
2368
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2369
      lu.cfg.SetDiskID(node_disk, node)
2370
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2371
      if result.failed or not result:
2372
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2373
                           " (is_primary=False, pass=1)",
2374
                           inst_disk.iv_name, node)
2375
        if not ignore_secondaries:
2376
          disks_ok = False
2377

    
2378
  # FIXME: race condition on drbd migration to primary
2379

    
2380
  # 2nd pass, do only the primary node
2381
  for inst_disk in instance.disks:
2382
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2383
      if node != instance.primary_node:
2384
        continue
2385
      lu.cfg.SetDiskID(node_disk, node)
2386
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2387
      if result.failed or not result:
2388
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2389
                           " (is_primary=True, pass=2)",
2390
                           inst_disk.iv_name, node)
2391
        disks_ok = False
2392
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2393

    
2394
  # leave the disks configured for the primary node
2395
  # this is a workaround that would be fixed better by
2396
  # improving the logical/physical id handling
2397
  for disk in instance.disks:
2398
    lu.cfg.SetDiskID(disk, instance.primary_node)
2399

    
2400
  return disks_ok, device_info
2401

    
2402

    
2403
def _StartInstanceDisks(lu, instance, force):
2404
  """Start the disks of an instance.
2405

2406
  """
2407
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2408
                                           ignore_secondaries=force)
2409
  if not disks_ok:
2410
    _ShutdownInstanceDisks(lu, instance)
2411
    if force is not None and not force:
2412
      lu.proc.LogWarning("", hint="If the message above refers to a"
2413
                         " secondary node,"
2414
                         " you can retry the operation using '--force'.")
2415
    raise errors.OpExecError("Disk consistency error")
2416

    
2417

    
2418
class LUDeactivateInstanceDisks(NoHooksLU):
2419
  """Shutdown an instance's disks.
2420

2421
  """
2422
  _OP_REQP = ["instance_name"]
2423
  REQ_BGL = False
2424

    
2425
  def ExpandNames(self):
2426
    self._ExpandAndLockInstance()
2427
    self.needed_locks[locking.LEVEL_NODE] = []
2428
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2429

    
2430
  def DeclareLocks(self, level):
2431
    if level == locking.LEVEL_NODE:
2432
      self._LockInstancesNodes()
2433

    
2434
  def CheckPrereq(self):
2435
    """Check prerequisites.
2436

2437
    This checks that the instance is in the cluster.
2438

2439
    """
2440
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2441
    assert self.instance is not None, \
2442
      "Cannot retrieve locked instance %s" % self.op.instance_name
2443

    
2444
  def Exec(self, feedback_fn):
2445
    """Deactivate the disks
2446

2447
    """
2448
    instance = self.instance
2449
    _SafeShutdownInstanceDisks(self, instance)
2450

    
2451

    
2452
def _SafeShutdownInstanceDisks(lu, instance):
2453
  """Shutdown block devices of an instance.
2454

2455
  This function checks if an instance is running, before calling
2456
  _ShutdownInstanceDisks.
2457

2458
  """
2459
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2460
                                      [instance.hypervisor])
2461
  ins_l = ins_l[instance.primary_node]
2462
  if ins_l.failed or not isinstance(ins_l.data, list):
2463
    raise errors.OpExecError("Can't contact node '%s'" %
2464
                             instance.primary_node)
2465

    
2466
  if instance.name in ins_l.data:
2467
    raise errors.OpExecError("Instance is running, can't shutdown"
2468
                             " block devices.")
2469

    
2470
  _ShutdownInstanceDisks(lu, instance)
2471

    
2472

    
2473
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2474
  """Shutdown block devices of an instance.
2475

2476
  This does the shutdown on all nodes of the instance.
2477

2478
  If the ignore_primary is false, errors on the primary node are
2479
  ignored.
2480

2481
  """
2482
  result = True
2483
  for disk in instance.disks:
2484
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2485
      lu.cfg.SetDiskID(top_disk, node)
2486
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2487
      if result.failed or not result.data:
2488
        logging.error("Could not shutdown block device %s on node %s",
2489
                      disk.iv_name, node)
2490
        if not ignore_primary or node != instance.primary_node:
2491
          result = False
2492
  return result
2493

    
2494

    
2495
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2496
  """Checks if a node has enough free memory.
2497

2498
  This function check if a given node has the needed amount of free
2499
  memory. In case the node has less memory or we cannot get the
2500
  information from the node, this function raise an OpPrereqError
2501
  exception.
2502

2503
  @type lu: C{LogicalUnit}
2504
  @param lu: a logical unit from which we get configuration data
2505
  @type node: C{str}
2506
  @param node: the node to check
2507
  @type reason: C{str}
2508
  @param reason: string to use in the error message
2509
  @type requested: C{int}
2510
  @param requested: the amount of memory in MiB to check for
2511
  @type hypervisor: C{str}
2512
  @param hypervisor: the hypervisor to ask for memory stats
2513
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2514
      we cannot check the node
2515

2516
  """
2517
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2518
  nodeinfo[node].Raise()
2519
  free_mem = nodeinfo[node].data.get('memory_free')
2520
  if not isinstance(free_mem, int):
2521
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2522
                             " was '%s'" % (node, free_mem))
2523
  if requested > free_mem:
2524
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2525
                             " needed %s MiB, available %s MiB" %
2526
                             (node, reason, requested, free_mem))
2527

    
2528

    
2529
class LUStartupInstance(LogicalUnit):
2530
  """Starts an instance.
2531

2532
  """
2533
  HPATH = "instance-start"
2534
  HTYPE = constants.HTYPE_INSTANCE
2535
  _OP_REQP = ["instance_name", "force"]
2536
  REQ_BGL = False
2537

    
2538
  def ExpandNames(self):
2539
    self._ExpandAndLockInstance()
2540

    
2541
  def BuildHooksEnv(self):
2542
    """Build hooks env.
2543

2544
    This runs on master, primary and secondary nodes of the instance.
2545

2546
    """
2547
    env = {
2548
      "FORCE": self.op.force,
2549
      }
2550
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2551
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2552
          list(self.instance.secondary_nodes))
2553
    return env, nl, nl
2554

    
2555
  def CheckPrereq(self):
2556
    """Check prerequisites.
2557

2558
    This checks that the instance is in the cluster.
2559

2560
    """
2561
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2562
    assert self.instance is not None, \
2563
      "Cannot retrieve locked instance %s" % self.op.instance_name
2564

    
2565
    _CheckNodeOnline(self, instance.primary_node)
2566

    
2567
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2568
    # check bridges existance
2569
    _CheckInstanceBridgesExist(self, instance)
2570

    
2571
    _CheckNodeFreeMemory(self, instance.primary_node,
2572
                         "starting instance %s" % instance.name,
2573
                         bep[constants.BE_MEMORY], instance.hypervisor)
2574

    
2575
  def Exec(self, feedback_fn):
2576
    """Start the instance.
2577

2578
    """
2579
    instance = self.instance
2580
    force = self.op.force
2581
    extra_args = getattr(self.op, "extra_args", "")
2582

    
2583
    self.cfg.MarkInstanceUp(instance.name)
2584

    
2585
    node_current = instance.primary_node
2586

    
2587
    _StartInstanceDisks(self, instance, force)
2588

    
2589
    result = self.rpc.call_instance_start(node_current, instance, extra_args)
2590
    if result.failed or not result.data:
2591
      _ShutdownInstanceDisks(self, instance)
2592
      raise errors.OpExecError("Could not start instance")
2593

    
2594

    
2595
class LURebootInstance(LogicalUnit):
2596
  """Reboot an instance.
2597

2598
  """
2599
  HPATH = "instance-reboot"
2600
  HTYPE = constants.HTYPE_INSTANCE
2601
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2602
  REQ_BGL = False
2603

    
2604
  def ExpandNames(self):
2605
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2606
                                   constants.INSTANCE_REBOOT_HARD,
2607
                                   constants.INSTANCE_REBOOT_FULL]:
2608
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2609
                                  (constants.INSTANCE_REBOOT_SOFT,
2610
                                   constants.INSTANCE_REBOOT_HARD,
2611
                                   constants.INSTANCE_REBOOT_FULL))
2612
    self._ExpandAndLockInstance()
2613

    
2614
  def BuildHooksEnv(self):
2615
    """Build hooks env.
2616

2617
    This runs on master, primary and secondary nodes of the instance.
2618

2619
    """
2620
    env = {
2621
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2622
      }
2623
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2624
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2625
          list(self.instance.secondary_nodes))
2626
    return env, nl, nl
2627

    
2628
  def CheckPrereq(self):
2629
    """Check prerequisites.
2630

2631
    This checks that the instance is in the cluster.
2632

2633
    """
2634
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2635
    assert self.instance is not None, \
2636
      "Cannot retrieve locked instance %s" % self.op.instance_name
2637

    
2638
    _CheckNodeOnline(self, instance.primary_node)
2639

    
2640
    # check bridges existance
2641
    _CheckInstanceBridgesExist(self, instance)
2642

    
2643
  def Exec(self, feedback_fn):
2644
    """Reboot the instance.
2645

2646
    """
2647
    instance = self.instance
2648
    ignore_secondaries = self.op.ignore_secondaries
2649
    reboot_type = self.op.reboot_type
2650
    extra_args = getattr(self.op, "extra_args", "")
2651

    
2652
    node_current = instance.primary_node
2653

    
2654
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2655
                       constants.INSTANCE_REBOOT_HARD]:
2656
      result = self.rpc.call_instance_reboot(node_current, instance,
2657
                                             reboot_type, extra_args)
2658
      if result.failed or not result.data:
2659
        raise errors.OpExecError("Could not reboot instance")
2660
    else:
2661
      if not self.rpc.call_instance_shutdown(node_current, instance):
2662
        raise errors.OpExecError("could not shutdown instance for full reboot")
2663
      _ShutdownInstanceDisks(self, instance)
2664
      _StartInstanceDisks(self, instance, ignore_secondaries)
2665
      result = self.rpc.call_instance_start(node_current, instance, extra_args)
2666
      if result.failed or not result.data:
2667
        _ShutdownInstanceDisks(self, instance)
2668
        raise errors.OpExecError("Could not start instance for full reboot")
2669

    
2670
    self.cfg.MarkInstanceUp(instance.name)
2671

    
2672

    
2673
class LUShutdownInstance(LogicalUnit):
2674
  """Shutdown an instance.
2675

2676
  """
2677
  HPATH = "instance-stop"
2678
  HTYPE = constants.HTYPE_INSTANCE
2679
  _OP_REQP = ["instance_name"]
2680
  REQ_BGL = False
2681

    
2682
  def ExpandNames(self):
2683
    self._ExpandAndLockInstance()
2684

    
2685
  def BuildHooksEnv(self):
2686
    """Build hooks env.
2687

2688
    This runs on master, primary and secondary nodes of the instance.
2689

2690
    """
2691
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2692
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2693
          list(self.instance.secondary_nodes))
2694
    return env, nl, nl
2695

    
2696
  def CheckPrereq(self):
2697
    """Check prerequisites.
2698

2699
    This checks that the instance is in the cluster.
2700

2701
    """
2702
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2703
    assert self.instance is not None, \
2704
      "Cannot retrieve locked instance %s" % self.op.instance_name
2705
    _CheckNodeOnline(self, instance.primary_node)
2706

    
2707
  def Exec(self, feedback_fn):
2708
    """Shutdown the instance.
2709

2710
    """
2711
    instance = self.instance
2712
    node_current = instance.primary_node
2713
    self.cfg.MarkInstanceDown(instance.name)
2714
    result = self.rpc.call_instance_shutdown(node_current, instance)
2715
    if result.failed or not result.data:
2716
      self.proc.LogWarning("Could not shutdown instance")
2717

    
2718
    _ShutdownInstanceDisks(self, instance)
2719

    
2720

    
2721
class LUReinstallInstance(LogicalUnit):
2722
  """Reinstall an instance.
2723

2724
  """
2725
  HPATH = "instance-reinstall"
2726
  HTYPE = constants.HTYPE_INSTANCE
2727
  _OP_REQP = ["instance_name"]
2728
  REQ_BGL = False
2729

    
2730
  def ExpandNames(self):
2731
    self._ExpandAndLockInstance()
2732

    
2733
  def BuildHooksEnv(self):
2734
    """Build hooks env.
2735

2736
    This runs on master, primary and secondary nodes of the instance.
2737

2738
    """
2739
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2740
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2741
          list(self.instance.secondary_nodes))
2742
    return env, nl, nl
2743

    
2744
  def CheckPrereq(self):
2745
    """Check prerequisites.
2746

2747
    This checks that the instance is in the cluster and is not running.
2748

2749
    """
2750
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2751
    assert instance is not None, \
2752
      "Cannot retrieve locked instance %s" % self.op.instance_name
2753
    _CheckNodeOnline(self, instance.primary_node)
2754

    
2755
    if instance.disk_template == constants.DT_DISKLESS:
2756
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2757
                                 self.op.instance_name)
2758
    if instance.status != "down":
2759
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2760
                                 self.op.instance_name)
2761
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2762
                                              instance.name,
2763
                                              instance.hypervisor)
2764
    if remote_info.failed or remote_info.data:
2765
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2766
                                 (self.op.instance_name,
2767
                                  instance.primary_node))
2768

    
2769
    self.op.os_type = getattr(self.op, "os_type", None)
2770
    if self.op.os_type is not None:
2771
      # OS verification
2772
      pnode = self.cfg.GetNodeInfo(
2773
        self.cfg.ExpandNodeName(instance.primary_node))
2774
      if pnode is None:
2775
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2776
                                   self.op.pnode)
2777
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2778
      result.Raise()
2779
      if not isinstance(result.data, objects.OS):
2780
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2781
                                   " primary node"  % self.op.os_type)
2782

    
2783
    self.instance = instance
2784

    
2785
  def Exec(self, feedback_fn):
2786
    """Reinstall the instance.
2787

2788
    """
2789
    inst = self.instance
2790

    
2791
    if self.op.os_type is not None:
2792
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2793
      inst.os = self.op.os_type
2794
      self.cfg.Update(inst)
2795

    
2796
    _StartInstanceDisks(self, inst, None)
2797
    try:
2798
      feedback_fn("Running the instance OS create scripts...")
2799
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2800
      result.Raise()
2801
      if not result.data:
2802
        raise errors.OpExecError("Could not install OS for instance %s"
2803
                                 " on node %s" %
2804
                                 (inst.name, inst.primary_node))
2805
    finally:
2806
      _ShutdownInstanceDisks(self, inst)
2807

    
2808

    
2809
class LURenameInstance(LogicalUnit):
2810
  """Rename an instance.
2811

2812
  """
2813
  HPATH = "instance-rename"
2814
  HTYPE = constants.HTYPE_INSTANCE
2815
  _OP_REQP = ["instance_name", "new_name"]
2816

    
2817
  def BuildHooksEnv(self):
2818
    """Build hooks env.
2819

2820
    This runs on master, primary and secondary nodes of the instance.
2821

2822
    """
2823
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2824
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2825
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2826
          list(self.instance.secondary_nodes))
2827
    return env, nl, nl
2828

    
2829
  def CheckPrereq(self):
2830
    """Check prerequisites.
2831

2832
    This checks that the instance is in the cluster and is not running.
2833

2834
    """
2835
    instance = self.cfg.GetInstanceInfo(
2836
      self.cfg.ExpandInstanceName(self.op.instance_name))
2837
    if instance is None:
2838
      raise errors.OpPrereqError("Instance '%s' not known" %
2839
                                 self.op.instance_name)
2840
    _CheckNodeOnline(self, instance.primary_node)
2841

    
2842
    if instance.status != "down":
2843
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2844
                                 self.op.instance_name)
2845
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2846
                                              instance.name,
2847
                                              instance.hypervisor)
2848
    remote_info.Raise()
2849
    if remote_info.data:
2850
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2851
                                 (self.op.instance_name,
2852
                                  instance.primary_node))
2853
    self.instance = instance
2854

    
2855
    # new name verification
2856
    name_info = utils.HostInfo(self.op.new_name)
2857

    
2858
    self.op.new_name = new_name = name_info.name
2859
    instance_list = self.cfg.GetInstanceList()
2860
    if new_name in instance_list:
2861
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2862
                                 new_name)
2863

    
2864
    if not getattr(self.op, "ignore_ip", False):
2865
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2866
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2867
                                   (name_info.ip, new_name))
2868

    
2869

    
2870
  def Exec(self, feedback_fn):
2871
    """Reinstall the instance.
2872

2873
    """
2874
    inst = self.instance
2875
    old_name = inst.name
2876

    
2877
    if inst.disk_template == constants.DT_FILE:
2878
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2879

    
2880
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2881
    # Change the instance lock. This is definitely safe while we hold the BGL
2882
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2883
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2884

    
2885
    # re-read the instance from the configuration after rename
2886
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2887

    
2888
    if inst.disk_template == constants.DT_FILE:
2889
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2890
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2891
                                                     old_file_storage_dir,
2892
                                                     new_file_storage_dir)
2893
      result.Raise()
2894
      if not result.data:
2895
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2896
                                 " directory '%s' to '%s' (but the instance"
2897
                                 " has been renamed in Ganeti)" % (
2898
                                 inst.primary_node, old_file_storage_dir,
2899
                                 new_file_storage_dir))
2900

    
2901
      if not result.data[0]:
2902
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2903
                                 " (but the instance has been renamed in"
2904
                                 " Ganeti)" % (old_file_storage_dir,
2905
                                               new_file_storage_dir))
2906

    
2907
    _StartInstanceDisks(self, inst, None)
2908
    try:
2909
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2910
                                                 old_name)
2911
      if result.failed or not result.data:
2912
        msg = ("Could not run OS rename script for instance %s on node %s"
2913
               " (but the instance has been renamed in Ganeti)" %
2914
               (inst.name, inst.primary_node))
2915
        self.proc.LogWarning(msg)
2916
    finally:
2917
      _ShutdownInstanceDisks(self, inst)
2918

    
2919

    
2920
class LURemoveInstance(LogicalUnit):
2921
  """Remove an instance.
2922

2923
  """
2924
  HPATH = "instance-remove"
2925
  HTYPE = constants.HTYPE_INSTANCE
2926
  _OP_REQP = ["instance_name", "ignore_failures"]
2927
  REQ_BGL = False
2928

    
2929
  def ExpandNames(self):
2930
    self._ExpandAndLockInstance()
2931
    self.needed_locks[locking.LEVEL_NODE] = []
2932
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2933

    
2934
  def DeclareLocks(self, level):
2935
    if level == locking.LEVEL_NODE:
2936
      self._LockInstancesNodes()
2937

    
2938
  def BuildHooksEnv(self):
2939
    """Build hooks env.
2940

2941
    This runs on master, primary and secondary nodes of the instance.
2942

2943
    """
2944
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2945
    nl = [self.cfg.GetMasterNode()]
2946
    return env, nl, nl
2947

    
2948
  def CheckPrereq(self):
2949
    """Check prerequisites.
2950

2951
    This checks that the instance is in the cluster.
2952

2953
    """
2954
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2955
    assert self.instance is not None, \
2956
      "Cannot retrieve locked instance %s" % self.op.instance_name
2957

    
2958
  def Exec(self, feedback_fn):
2959
    """Remove the instance.
2960

2961
    """
2962
    instance = self.instance
2963
    logging.info("Shutting down instance %s on node %s",
2964
                 instance.name, instance.primary_node)
2965

    
2966
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2967
    if result.failed or not result.data:
2968
      if self.op.ignore_failures:
2969
        feedback_fn("Warning: can't shutdown instance")
2970
      else:
2971
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2972
                                 (instance.name, instance.primary_node))
2973

    
2974
    logging.info("Removing block devices for instance %s", instance.name)
2975

    
2976
    if not _RemoveDisks(self, instance):
2977
      if self.op.ignore_failures:
2978
        feedback_fn("Warning: can't remove instance's disks")
2979
      else:
2980
        raise errors.OpExecError("Can't remove instance's disks")
2981

    
2982
    logging.info("Removing instance %s out of cluster config", instance.name)
2983

    
2984
    self.cfg.RemoveInstance(instance.name)
2985
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2986

    
2987

    
2988
class LUQueryInstances(NoHooksLU):
2989
  """Logical unit for querying instances.
2990

2991
  """
2992
  _OP_REQP = ["output_fields", "names"]
2993
  REQ_BGL = False
2994
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2995
                                    "admin_state", "admin_ram",
2996
                                    "disk_template", "ip", "mac", "bridge",
2997
                                    "sda_size", "sdb_size", "vcpus", "tags",
2998
                                    "network_port", "beparams",
2999
                                    "(disk).(size)/([0-9]+)",
3000
                                    "(disk).(sizes)",
3001
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
3002
                                    "(nic).(macs|ips|bridges)",
3003
                                    "(disk|nic).(count)",
3004
                                    "serial_no", "hypervisor", "hvparams",] +
3005
                                  ["hv/%s" % name
3006
                                   for name in constants.HVS_PARAMETERS] +
3007
                                  ["be/%s" % name
3008
                                   for name in constants.BES_PARAMETERS])
3009
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3010

    
3011

    
3012
  def ExpandNames(self):
3013
    _CheckOutputFields(static=self._FIELDS_STATIC,
3014
                       dynamic=self._FIELDS_DYNAMIC,
3015
                       selected=self.op.output_fields)
3016

    
3017
    self.needed_locks = {}
3018
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3019
    self.share_locks[locking.LEVEL_NODE] = 1
3020

    
3021
    if self.op.names:
3022
      self.wanted = _GetWantedInstances(self, self.op.names)
3023
    else:
3024
      self.wanted = locking.ALL_SET
3025

    
3026
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3027
    if self.do_locking:
3028
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3029
      self.needed_locks[locking.LEVEL_NODE] = []
3030
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3031

    
3032
  def DeclareLocks(self, level):
3033
    if level == locking.LEVEL_NODE and self.do_locking:
3034
      self._LockInstancesNodes()
3035

    
3036
  def CheckPrereq(self):
3037
    """Check prerequisites.
3038

3039
    """
3040
    pass
3041

    
3042
  def Exec(self, feedback_fn):
3043
    """Computes the list of nodes and their attributes.
3044

3045
    """
3046
    all_info = self.cfg.GetAllInstancesInfo()
3047
    if self.do_locking:
3048
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3049
    elif self.wanted != locking.ALL_SET:
3050
      instance_names = self.wanted
3051
      missing = set(instance_names).difference(all_info.keys())
3052
      if missing:
3053
        raise errors.OpExecError(
3054
          "Some instances were removed before retrieving their data: %s"
3055
          % missing)
3056
    else:
3057
      instance_names = all_info.keys()
3058

    
3059
    instance_names = utils.NiceSort(instance_names)
3060
    instance_list = [all_info[iname] for iname in instance_names]
3061

    
3062
    # begin data gathering
3063

    
3064
    nodes = frozenset([inst.primary_node for inst in instance_list])
3065
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3066

    
3067
    bad_nodes = []
3068
    off_nodes = []
3069
    if self.do_locking:
3070
      live_data = {}
3071
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3072
      for name in nodes:
3073
        result = node_data[name]
3074
        if result.offline:
3075
          # offline nodes will be in both lists
3076
          off_nodes.append(name)
3077
        if result.failed:
3078
          bad_nodes.append(name)
3079
        else:
3080
          if result.data:
3081
            live_data.update(result.data)
3082
            # else no instance is alive
3083
    else:
3084
      live_data = dict([(name, {}) for name in instance_names])
3085

    
3086
    # end data gathering
3087

    
3088
    HVPREFIX = "hv/"
3089
    BEPREFIX = "be/"
3090
    output = []
3091
    for instance in instance_list:
3092
      iout = []
3093
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3094
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3095
      for field in self.op.output_fields:
3096
        st_match = self._FIELDS_STATIC.Matches(field)
3097
        if field == "name":
3098
          val = instance.name
3099
        elif field == "os":
3100
          val = instance.os
3101
        elif field == "pnode":
3102
          val = instance.primary_node
3103
        elif field == "snodes":
3104
          val = list(instance.secondary_nodes)
3105
        elif field == "admin_state":
3106
          val = (instance.status != "down")
3107
        elif field == "oper_state":
3108
          if instance.primary_node in bad_nodes:
3109
            val = None
3110
          else:
3111
            val = bool(live_data.get(instance.name))
3112
        elif field == "status":
3113
          if instance.primary_node in off_nodes:
3114
            val = "ERROR_nodeoffline"
3115
          elif instance.primary_node in bad_nodes:
3116
            val = "ERROR_nodedown"
3117
          else:
3118
            running = bool(live_data.get(instance.name))
3119
            if running:
3120
              if instance.status != "down":
3121
                val = "running"
3122
              else:
3123
                val = "ERROR_up"
3124
            else:
3125
              if instance.status != "down":
3126
                val = "ERROR_down"
3127
              else:
3128
                val = "ADMIN_down"
3129
        elif field == "oper_ram":
3130
          if instance.primary_node in bad_nodes:
3131
            val = None
3132
          elif instance.name in live_data:
3133
            val = live_data[instance.name].get("memory", "?")
3134
          else:
3135
            val = "-"
3136
        elif field == "disk_template":
3137
          val = instance.disk_template
3138
        elif field == "ip":
3139
          val = instance.nics[0].ip
3140
        elif field == "bridge":
3141
          val = instance.nics[0].bridge
3142
        elif field == "mac":
3143
          val = instance.nics[0].mac
3144
        elif field == "sda_size" or field == "sdb_size":
3145
          idx = ord(field[2]) - ord('a')
3146
          try:
3147
            val = instance.FindDisk(idx).size
3148
          except errors.OpPrereqError:
3149
            val = None
3150
        elif field == "tags":
3151
          val = list(instance.GetTags())
3152
        elif field == "serial_no":
3153
          val = instance.serial_no
3154
        elif field == "network_port":
3155
          val = instance.network_port
3156
        elif field == "hypervisor":
3157
          val = instance.hypervisor
3158
        elif field == "hvparams":
3159
          val = i_hv
3160
        elif (field.startswith(HVPREFIX) and
3161
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3162
          val = i_hv.get(field[len(HVPREFIX):], None)
3163
        elif field == "beparams":
3164
          val = i_be
3165
        elif (field.startswith(BEPREFIX) and
3166
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3167
          val = i_be.get(field[len(BEPREFIX):], None)
3168
        elif st_match and st_match.groups():
3169
          # matches a variable list
3170
          st_groups = st_match.groups()
3171
          if st_groups and st_groups[0] == "disk":
3172
            if st_groups[1] == "count":
3173
              val = len(instance.disks)
3174
            elif st_groups[1] == "sizes":
3175
              val = [disk.size for disk in instance.disks]
3176
            elif st_groups[1] == "size":
3177
              try:
3178
                val = instance.FindDisk(st_groups[2]).size
3179
              except errors.OpPrereqError:
3180
                val = None
3181
            else:
3182
              assert False, "Unhandled disk parameter"
3183
          elif st_groups[0] == "nic":
3184
            if st_groups[1] == "count":
3185
              val = len(instance.nics)
3186
            elif st_groups[1] == "macs":
3187
              val = [nic.mac for nic in instance.nics]
3188
            elif st_groups[1] == "ips":
3189
              val = [nic.ip for nic in instance.nics]
3190
            elif st_groups[1] == "bridges":
3191
              val = [nic.bridge for nic in instance.nics]
3192
            else:
3193
              # index-based item
3194
              nic_idx = int(st_groups[2])
3195
              if nic_idx >= len(instance.nics):
3196
                val = None
3197
              else:
3198
                if st_groups[1] == "mac":
3199
                  val = instance.nics[nic_idx].mac
3200
                elif st_groups[1] == "ip":
3201
                  val = instance.nics[nic_idx].ip
3202
                elif st_groups[1] == "bridge":
3203
                  val = instance.nics[nic_idx].bridge
3204
                else:
3205
                  assert False, "Unhandled NIC parameter"
3206
          else:
3207
            assert False, "Unhandled variable parameter"
3208
        else:
3209
          raise errors.ParameterError(field)
3210
        iout.append(val)
3211
      output.append(iout)
3212

    
3213
    return output
3214

    
3215

    
3216
class LUFailoverInstance(LogicalUnit):
3217
  """Failover an instance.
3218

3219
  """
3220
  HPATH = "instance-failover"
3221
  HTYPE = constants.HTYPE_INSTANCE
3222
  _OP_REQP = ["instance_name", "ignore_consistency"]
3223
  REQ_BGL = False
3224

    
3225
  def ExpandNames(self):
3226
    self._ExpandAndLockInstance()
3227
    self.needed_locks[locking.LEVEL_NODE] = []
3228
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3229

    
3230
  def DeclareLocks(self, level):
3231
    if level == locking.LEVEL_NODE:
3232
      self._LockInstancesNodes()
3233

    
3234
  def BuildHooksEnv(self):
3235
    """Build hooks env.
3236

3237
    This runs on master, primary and secondary nodes of the instance.
3238

3239
    """
3240
    env = {
3241
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3242
      }
3243
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3244
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3245
    return env, nl, nl
3246

    
3247
  def CheckPrereq(self):
3248
    """Check prerequisites.
3249

3250
    This checks that the instance is in the cluster.
3251

3252
    """
3253
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3254
    assert self.instance is not None, \
3255
      "Cannot retrieve locked instance %s" % self.op.instance_name
3256

    
3257
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3258
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3259
      raise errors.OpPrereqError("Instance's disk layout is not"
3260
                                 " network mirrored, cannot failover.")
3261

    
3262
    secondary_nodes = instance.secondary_nodes
3263
    if not secondary_nodes:
3264
      raise errors.ProgrammerError("no secondary node but using "
3265
                                   "a mirrored disk template")
3266

    
3267
    target_node = secondary_nodes[0]
3268
    _CheckNodeOnline(self, target_node)
3269
    # check memory requirements on the secondary node
3270
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3271
                         instance.name, bep[constants.BE_MEMORY],
3272
                         instance.hypervisor)
3273

    
3274
    # check bridge existance
3275
    brlist = [nic.bridge for nic in instance.nics]
3276
    result = self.rpc.call_bridges_exist(target_node, brlist)
3277
    result.Raise()
3278
    if not result.data:
3279
      raise errors.OpPrereqError("One or more target bridges %s does not"
3280
                                 " exist on destination node '%s'" %
3281
                                 (brlist, target_node))
3282

    
3283
  def Exec(self, feedback_fn):
3284
    """Failover an instance.
3285

3286
    The failover is done by shutting it down on its present node and
3287
    starting it on the secondary.
3288

3289
    """
3290
    instance = self.instance
3291

    
3292
    source_node = instance.primary_node
3293
    target_node = instance.secondary_nodes[0]
3294

    
3295
    feedback_fn("* checking disk consistency between source and target")
3296
    for dev in instance.disks:
3297
      # for drbd, these are drbd over lvm
3298
      if not _CheckDiskConsistency(self, dev, target_node, False):
3299
        if instance.status == "up" and not self.op.ignore_consistency:
3300
          raise errors.OpExecError("Disk %s is degraded on target node,"
3301
                                   " aborting failover." % dev.iv_name)
3302

    
3303
    feedback_fn("* shutting down instance on source node")
3304
    logging.info("Shutting down instance %s on node %s",
3305
                 instance.name, source_node)
3306

    
3307
    result = self.rpc.call_instance_shutdown(source_node, instance)
3308
    if result.failed or not result.data:
3309
      if self.op.ignore_consistency:
3310
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3311
                             " Proceeding"
3312
                             " anyway. Please make sure node %s is down",
3313
                             instance.name, source_node, source_node)
3314
      else:
3315
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3316
                                 (instance.name, source_node))
3317

    
3318
    feedback_fn("* deactivating the instance's disks on source node")
3319
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3320
      raise errors.OpExecError("Can't shut down the instance's disks.")
3321

    
3322
    instance.primary_node = target_node
3323
    # distribute new instance config to the other nodes
3324
    self.cfg.Update(instance)
3325

    
3326
    # Only start the instance if it's marked as up
3327
    if instance.status == "up":
3328
      feedback_fn("* activating the instance's disks on target node")
3329
      logging.info("Starting instance %s on node %s",
3330
                   instance.name, target_node)
3331

    
3332
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3333
                                               ignore_secondaries=True)
3334
      if not disks_ok:
3335
        _ShutdownInstanceDisks(self, instance)
3336
        raise errors.OpExecError("Can't activate the instance's disks")
3337

    
3338
      feedback_fn("* starting the instance on the target node")
3339
      result = self.rpc.call_instance_start(target_node, instance, None)
3340
      if result.failed or not result.data:
3341
        _ShutdownInstanceDisks(self, instance)
3342
        raise errors.OpExecError("Could not start instance %s on node %s." %
3343
                                 (instance.name, target_node))
3344

    
3345

    
3346
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3347
  """Create a tree of block devices on the primary node.
3348

3349
  This always creates all devices.
3350

3351
  """
3352
  if device.children:
3353
    for child in device.children:
3354
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3355
        return False
3356

    
3357
  lu.cfg.SetDiskID(device, node)
3358
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3359
                                       instance.name, True, info)
3360
  if new_id.failed or not new_id.data:
3361
    return False
3362
  if device.physical_id is None:
3363
    device.physical_id = new_id
3364
  return True
3365

    
3366

    
3367
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3368
  """Create a tree of block devices on a secondary node.
3369

3370
  If this device type has to be created on secondaries, create it and
3371
  all its children.
3372

3373
  If not, just recurse to children keeping the same 'force' value.
3374

3375
  """
3376
  if device.CreateOnSecondary():
3377
    force = True
3378
  if device.children:
3379
    for child in device.children:
3380
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3381
                                        child, force, info):
3382
        return False
3383

    
3384
  if not force:
3385
    return True
3386
  lu.cfg.SetDiskID(device, node)
3387
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3388
                                       instance.name, False, info)
3389
  if new_id.failed or not new_id.data:
3390
    return False
3391
  if device.physical_id is None:
3392
    device.physical_id = new_id
3393
  return True
3394

    
3395

    
3396
def _GenerateUniqueNames(lu, exts):
3397
  """Generate a suitable LV name.
3398

3399
  This will generate a logical volume name for the given instance.
3400

3401
  """
3402
  results = []
3403
  for val in exts:
3404
    new_id = lu.cfg.GenerateUniqueID()
3405
    results.append("%s%s" % (new_id, val))
3406
  return results
3407

    
3408

    
3409
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3410
                         p_minor, s_minor):
3411
  """Generate a drbd8 device complete with its children.
3412

3413
  """
3414
  port = lu.cfg.AllocatePort()
3415
  vgname = lu.cfg.GetVGName()
3416
  shared_secret = lu.cfg.GenerateDRBDSecret()
3417
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3418
                          logical_id=(vgname, names[0]))
3419
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3420
                          logical_id=(vgname, names[1]))
3421
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3422
                          logical_id=(primary, secondary, port,
3423
                                      p_minor, s_minor,
3424
                                      shared_secret),
3425
                          children=[dev_data, dev_meta],
3426
                          iv_name=iv_name)
3427
  return drbd_dev
3428

    
3429

    
3430
def _GenerateDiskTemplate(lu, template_name,
3431
                          instance_name, primary_node,
3432
                          secondary_nodes, disk_info,
3433
                          file_storage_dir, file_driver,
3434
                          base_index):
3435
  """Generate the entire disk layout for a given template type.
3436

3437
  """
3438
  #TODO: compute space requirements
3439

    
3440
  vgname = lu.cfg.GetVGName()
3441
  disk_count = len(disk_info)
3442
  disks = []
3443
  if template_name == constants.DT_DISKLESS:
3444
    pass
3445
  elif template_name == constants.DT_PLAIN:
3446
    if len(secondary_nodes) != 0:
3447
      raise errors.ProgrammerError("Wrong template configuration")
3448

    
3449
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3450
                                      for i in range(disk_count)])
3451
    for idx, disk in enumerate(disk_info):
3452
      disk_index = idx + base_index
3453
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3454
                              logical_id=(vgname, names[idx]),
3455
                              iv_name="disk/%d" % disk_index)
3456
      disks.append(disk_dev)
3457
  elif template_name == constants.DT_DRBD8:
3458
    if len(secondary_nodes) != 1:
3459
      raise errors.ProgrammerError("Wrong template configuration")
3460
    remote_node = secondary_nodes[0]
3461
    minors = lu.cfg.AllocateDRBDMinor(
3462
      [primary_node, remote_node] * len(disk_info), instance_name)
3463

    
3464
    names = _GenerateUniqueNames(lu,
3465
                                 [".disk%d_%s" % (i, s)
3466
                                  for i in range(disk_count)
3467
                                  for s in ("data", "meta")
3468
                                  ])
3469
    for idx, disk in enumerate(disk_info):
3470
      disk_index = idx + base_index
3471
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3472
                                      disk["size"], names[idx*2:idx*2+2],
3473
                                      "disk/%d" % disk_index,
3474
                                      minors[idx*2], minors[idx*2+1])
3475
      disks.append(disk_dev)
3476
  elif template_name == constants.DT_FILE:
3477
    if len(secondary_nodes) != 0:
3478
      raise errors.ProgrammerError("Wrong template configuration")
3479

    
3480
    for idx, disk in enumerate(disk_info):
3481
      disk_index = idx + base_index
3482
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3483
                              iv_name="disk/%d" % disk_index,
3484
                              logical_id=(file_driver,
3485
                                          "%s/disk%d" % (file_storage_dir,
3486
                                                         idx)))
3487
      disks.append(disk_dev)
3488
  else:
3489
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3490
  return disks
3491

    
3492

    
3493
def _GetInstanceInfoText(instance):
3494
  """Compute that text that should be added to the disk's metadata.
3495

3496
  """
3497
  return "originstname+%s" % instance.name
3498

    
3499

    
3500
def _CreateDisks(lu, instance):
3501
  """Create all disks for an instance.
3502

3503
  This abstracts away some work from AddInstance.
3504

3505
  @type lu: L{LogicalUnit}
3506
  @param lu: the logical unit on whose behalf we execute
3507
  @type instance: L{objects.Instance}
3508
  @param instance: the instance whose disks we should create
3509
  @rtype: boolean
3510
  @return: the success of the creation
3511

3512
  """
3513
  info = _GetInstanceInfoText(instance)
3514

    
3515
  if instance.disk_template == constants.DT_FILE:
3516
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3517
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3518
                                                 file_storage_dir)
3519

    
3520
    if result.failed or not result.data:
3521
      logging.error("Could not connect to node '%s'", instance.primary_node)
3522
      return False
3523

    
3524
    if not result.data[0]:
3525
      logging.error("Failed to create directory '%s'", file_storage_dir)
3526
      return False
3527

    
3528
  # Note: this needs to be kept in sync with adding of disks in
3529
  # LUSetInstanceParams
3530
  for device in instance.disks:
3531
    logging.info("Creating volume %s for instance %s",
3532
                 device.iv_name, instance.name)
3533
    #HARDCODE
3534
    for secondary_node in instance.secondary_nodes:
3535
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3536
                                        device, False, info):
3537
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3538
                      device.iv_name, device, secondary_node)
3539
        return False
3540
    #HARDCODE
3541
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3542
                                    instance, device, info):
3543
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3544
      return False
3545

    
3546
  return True
3547

    
3548

    
3549
def _RemoveDisks(lu, instance):
3550
  """Remove all disks for an instance.
3551

3552
  This abstracts away some work from `AddInstance()` and
3553
  `RemoveInstance()`. Note that in case some of the devices couldn't
3554
  be removed, the removal will continue with the other ones (compare
3555
  with `_CreateDisks()`).
3556

3557
  @type lu: L{LogicalUnit}
3558
  @param lu: the logical unit on whose behalf we execute
3559
  @type instance: L{objects.Instance}
3560
  @param instance: the instance whose disks we should remove
3561
  @rtype: boolean
3562
  @return: the success of the removal
3563

3564
  """
3565
  logging.info("Removing block devices for instance %s", instance.name)
3566

    
3567
  result = True
3568
  for device in instance.disks:
3569
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3570
      lu.cfg.SetDiskID(disk, node)
3571
      result = lu.rpc.call_blockdev_remove(node, disk)
3572
      if result.failed or not result.data:
3573
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3574
                           " continuing anyway", device.iv_name, node)
3575
        result = False
3576

    
3577
  if instance.disk_template == constants.DT_FILE:
3578
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3579
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3580
                                                 file_storage_dir)
3581
    if result.failed or not result.data:
3582
      logging.error("Could not remove directory '%s'", file_storage_dir)
3583
      result = False
3584

    
3585
  return result
3586

    
3587

    
3588
def _ComputeDiskSize(disk_template, disks):
3589
  """Compute disk size requirements in the volume group
3590

3591
  """
3592
  # Required free disk space as a function of disk and swap space
3593
  req_size_dict = {
3594
    constants.DT_DISKLESS: None,
3595
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3596
    # 128 MB are added for drbd metadata for each disk
3597
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3598
    constants.DT_FILE: None,
3599
  }
3600

    
3601
  if disk_template not in req_size_dict:
3602
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3603
                                 " is unknown" %  disk_template)
3604

    
3605
  return req_size_dict[disk_template]
3606

    
3607

    
3608
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3609
  """Hypervisor parameter validation.
3610

3611
  This function abstract the hypervisor parameter validation to be
3612
  used in both instance create and instance modify.
3613

3614
  @type lu: L{LogicalUnit}
3615
  @param lu: the logical unit for which we check
3616
  @type nodenames: list
3617
  @param nodenames: the list of nodes on which we should check
3618
  @type hvname: string
3619
  @param hvname: the name of the hypervisor we should use
3620
  @type hvparams: dict
3621
  @param hvparams: the parameters which we need to check
3622
  @raise errors.OpPrereqError: if the parameters are not valid
3623

3624
  """
3625
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3626
                                                  hvname,
3627
                                                  hvparams)
3628
  for node in nodenames:
3629
    info = hvinfo[node]
3630
    info.Raise()
3631
    if not info.data or not isinstance(info.data, (tuple, list)):
3632
      raise errors.OpPrereqError("Cannot get current information"
3633
                                 " from node '%s' (%s)" % (node, info.data))
3634
    if not info.data[0]:
3635
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3636
                                 " %s" % info.data[1])
3637

    
3638

    
3639
class LUCreateInstance(LogicalUnit):
3640
  """Create an instance.
3641

3642
  """
3643
  HPATH = "instance-add"
3644
  HTYPE = constants.HTYPE_INSTANCE
3645
  _OP_REQP = ["instance_name", "disks", "disk_template",
3646
              "mode", "start",
3647
              "wait_for_sync", "ip_check", "nics",
3648
              "hvparams", "beparams"]
3649
  REQ_BGL = False
3650

    
3651
  def _ExpandNode(self, node):
3652
    """Expands and checks one node name.
3653

3654
    """
3655
    node_full = self.cfg.ExpandNodeName(node)
3656
    if node_full is None:
3657
      raise errors.OpPrereqError("Unknown node %s" % node)
3658
    return node_full
3659

    
3660
  def ExpandNames(self):
3661
    """ExpandNames for CreateInstance.
3662

3663
    Figure out the right locks for instance creation.
3664

3665
    """
3666
    self.needed_locks = {}
3667

    
3668
    # set optional parameters to none if they don't exist
3669
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3670
      if not hasattr(self.op, attr):
3671
        setattr(self.op, attr, None)
3672

    
3673
    # cheap checks, mostly valid constants given
3674

    
3675
    # verify creation mode
3676
    if self.op.mode not in (constants.INSTANCE_CREATE,
3677
                            constants.INSTANCE_IMPORT):
3678
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3679
                                 self.op.mode)
3680

    
3681
    # disk template and mirror node verification
3682
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3683
      raise errors.OpPrereqError("Invalid disk template name")
3684

    
3685
    if self.op.hypervisor is None:
3686
      self.op.hypervisor = self.cfg.GetHypervisorType()
3687

    
3688
    cluster = self.cfg.GetClusterInfo()
3689
    enabled_hvs = cluster.enabled_hypervisors
3690
    if self.op.hypervisor not in enabled_hvs:
3691
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3692
                                 " cluster (%s)" % (self.op.hypervisor,
3693
                                  ",".join(enabled_hvs)))
3694

    
3695
    # check hypervisor parameter syntax (locally)
3696

    
3697
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3698
                                  self.op.hvparams)
3699
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3700
    hv_type.CheckParameterSyntax(filled_hvp)
3701

    
3702
    # fill and remember the beparams dict
3703
    utils.CheckBEParams(self.op.beparams)
3704
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3705
                                    self.op.beparams)
3706

    
3707
    #### instance parameters check
3708

    
3709
    # instance name verification
3710
    hostname1 = utils.HostInfo(self.op.instance_name)
3711
    self.op.instance_name = instance_name = hostname1.name
3712

    
3713
    # this is just a preventive check, but someone might still add this
3714
    # instance in the meantime, and creation will fail at lock-add time
3715
    if instance_name in self.cfg.GetInstanceList():
3716
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3717
                                 instance_name)
3718

    
3719
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3720

    
3721
    # NIC buildup
3722
    self.nics = []
3723
    for nic in self.op.nics:
3724
      # ip validity checks
3725
      ip = nic.get("ip", None)
3726
      if ip is None or ip.lower() == "none":
3727
        nic_ip = None
3728
      elif ip.lower() == constants.VALUE_AUTO:
3729
        nic_ip = hostname1.ip
3730
      else:
3731
        if not utils.IsValidIP(ip):
3732
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3733
                                     " like a valid IP" % ip)
3734
        nic_ip = ip
3735

    
3736
      # MAC address verification
3737
      mac = nic.get("mac", constants.VALUE_AUTO)
3738
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3739
        if not utils.IsValidMac(mac.lower()):
3740
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3741
                                     mac)
3742
      # bridge verification
3743
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3744
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3745

    
3746
    # disk checks/pre-build
3747
    self.disks = []
3748
    for disk in self.op.disks:
3749
      mode = disk.get("mode", constants.DISK_RDWR)
3750
      if mode not in constants.DISK_ACCESS_SET:
3751
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3752
                                   mode)
3753
      size = disk.get("size", None)
3754
      if size is None:
3755
        raise errors.OpPrereqError("Missing disk size")
3756
      try:
3757
        size = int(size)
3758
      except ValueError:
3759
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3760
      self.disks.append({"size": size, "mode": mode})
3761

    
3762
    # used in CheckPrereq for ip ping check
3763
    self.check_ip = hostname1.ip
3764

    
3765
    # file storage checks
3766
    if (self.op.file_driver and
3767
        not self.op.file_driver in constants.FILE_DRIVER):
3768
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3769
                                 self.op.file_driver)
3770

    
3771
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3772
      raise errors.OpPrereqError("File storage directory path not absolute")
3773

    
3774
    ### Node/iallocator related checks
3775
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3776
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3777
                                 " node must be given")
3778

    
3779
    if self.op.iallocator:
3780
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3781
    else:
3782
      self.op.pnode = self._ExpandNode(self.op.pnode)
3783
      nodelist = [self.op.pnode]
3784
      if self.op.snode is not None:
3785
        self.op.snode = self._ExpandNode(self.op.snode)
3786
        nodelist.append(self.op.snode)
3787
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3788

    
3789
    # in case of import lock the source node too
3790
    if self.op.mode == constants.INSTANCE_IMPORT:
3791
      src_node = getattr(self.op, "src_node", None)
3792
      src_path = getattr(self.op, "src_path", None)
3793

    
3794
      if src_path is None:
3795
        self.op.src_path = src_path = self.op.instance_name
3796

    
3797
      if src_node is None:
3798
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3799
        self.op.src_node = None
3800
        if os.path.isabs(src_path):
3801
          raise errors.OpPrereqError("Importing an instance from an absolute"
3802
                                     " path requires a source node option.")
3803
      else:
3804
        self.op.src_node = src_node = self._ExpandNode(src_node)
3805
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3806
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
3807
        if not os.path.isabs(src_path):
3808
          self.op.src_path = src_path = \
3809
            os.path.join(constants.EXPORT_DIR, src_path)
3810

    
3811
    else: # INSTANCE_CREATE
3812
      if getattr(self.op, "os_type", None) is None:
3813
        raise errors.OpPrereqError("No guest OS specified")
3814

    
3815
  def _RunAllocator(self):
3816
    """Run the allocator based on input opcode.
3817

3818
    """
3819
    nics = [n.ToDict() for n in self.nics]
3820
    ial = IAllocator(self,
3821
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3822
                     name=self.op.instance_name,
3823
                     disk_template=self.op.disk_template,
3824
                     tags=[],
3825
                     os=self.op.os_type,
3826
                     vcpus=self.be_full[constants.BE_VCPUS],
3827
                     mem_size=self.be_full[constants.BE_MEMORY],
3828
                     disks=self.disks,
3829
                     nics=nics,
3830
                     hypervisor=self.op.hypervisor,
3831
                     )
3832

    
3833
    ial.Run(self.op.iallocator)
3834

    
3835
    if not ial.success:
3836
      raise errors.OpPrereqError("Can't compute nodes using"
3837
                                 " iallocator '%s': %s" % (self.op.iallocator,
3838
                                                           ial.info))
3839
    if len(ial.nodes) != ial.required_nodes:
3840
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3841
                                 " of nodes (%s), required %s" %
3842
                                 (self.op.iallocator, len(ial.nodes),
3843
                                  ial.required_nodes))
3844
    self.op.pnode = ial.nodes[0]
3845
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3846
                 self.op.instance_name, self.op.iallocator,
3847
                 ", ".join(ial.nodes))
3848
    if ial.required_nodes == 2:
3849
      self.op.snode = ial.nodes[1]
3850

    
3851
  def BuildHooksEnv(self):
3852
    """Build hooks env.
3853

3854
    This runs on master, primary and secondary nodes of the instance.
3855

3856
    """
3857
    env = {
3858
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3859
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3860
      "INSTANCE_ADD_MODE": self.op.mode,
3861
      }
3862
    if self.op.mode == constants.INSTANCE_IMPORT:
3863
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3864
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3865
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3866

    
3867
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3868
      primary_node=self.op.pnode,
3869
      secondary_nodes=self.secondaries,
3870
      status=self.instance_status,
3871
      os_type=self.op.os_type,
3872
      memory=self.be_full[constants.BE_MEMORY],
3873
      vcpus=self.be_full[constants.BE_VCPUS],
3874
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3875
    ))
3876

    
3877
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3878
          self.secondaries)
3879
    return env, nl, nl
3880

    
3881

    
3882
  def CheckPrereq(self):
3883
    """Check prerequisites.
3884

3885
    """
3886
    if (not self.cfg.GetVGName() and
3887
        self.op.disk_template not in constants.DTS_NOT_LVM):
3888
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3889
                                 " instances")
3890

    
3891

    
3892
    if self.op.mode == constants.INSTANCE_IMPORT:
3893
      src_node = self.op.src_node
3894
      src_path = self.op.src_path
3895

    
3896
      if src_node is None:
3897
        exp_list = self.rpc.call_export_list(
3898
          self.acquired_locks[locking.LEVEL_NODE])
3899
        found = False
3900
        for node in exp_list:
3901
          if not exp_list[node].failed and src_path in exp_list[node].data:
3902
            found = True
3903
            self.op.src_node = src_node = node
3904
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3905
                                                       src_path)
3906
            break
3907
        if not found:
3908
          raise errors.OpPrereqError("No export found for relative path %s" %
3909
                                      src_path)
3910

    
3911
      _CheckNodeOnline(self, src_node)
3912
      result = self.rpc.call_export_info(src_node, src_path)
3913
      result.Raise()
3914
      if not result.data:
3915
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3916

    
3917
      export_info = result.data
3918
      if not export_info.has_section(constants.INISECT_EXP):
3919
        raise errors.ProgrammerError("Corrupted export config")
3920

    
3921
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3922
      if (int(ei_version) != constants.EXPORT_VERSION):
3923
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3924
                                   (ei_version, constants.EXPORT_VERSION))
3925

    
3926
      # Check that the new instance doesn't have less disks than the export
3927
      instance_disks = len(self.disks)
3928
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3929
      if instance_disks < export_disks:
3930
        raise errors.OpPrereqError("Not enough disks to import."
3931
                                   " (instance: %d, export: %d)" %
3932
                                   (instance_disks, export_disks))
3933

    
3934
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3935
      disk_images = []
3936
      for idx in range(export_disks):
3937
        option = 'disk%d_dump' % idx
3938
        if export_info.has_option(constants.INISECT_INS, option):
3939
          # FIXME: are the old os-es, disk sizes, etc. useful?
3940
          export_name = export_info.get(constants.INISECT_INS, option)
3941
          image = os.path.join(src_path, export_name)
3942
          disk_images.append(image)
3943
        else:
3944
          disk_images.append(False)
3945

    
3946
      self.src_images = disk_images
3947

    
3948
      old_name = export_info.get(constants.INISECT_INS, 'name')
3949
      # FIXME: int() here could throw a ValueError on broken exports
3950
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3951
      if self.op.instance_name == old_name:
3952
        for idx, nic in enumerate(self.nics):
3953
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3954
            nic_mac_ini = 'nic%d_mac' % idx
3955
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3956

    
3957
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3958
    if self.op.start and not self.op.ip_check:
3959
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3960
                                 " adding an instance in start mode")
3961

    
3962
    if self.op.ip_check:
3963
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3964
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3965
                                   (self.check_ip, self.op.instance_name))
3966

    
3967
    #### allocator run
3968

    
3969
    if self.op.iallocator is not None:
3970
      self._RunAllocator()
3971

    
3972
    #### node related checks
3973

    
3974
    # check primary node
3975
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3976
    assert self.pnode is not None, \
3977
      "Cannot retrieve locked node %s" % self.op.pnode
3978
    if pnode.offline:
3979
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
3980
                                 pnode.name)
3981

    
3982
    self.secondaries = []
3983

    
3984
    # mirror node verification
3985
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3986
      if self.op.snode is None:
3987
        raise errors.OpPrereqError("The networked disk templates need"
3988
                                   " a mirror node")
3989
      if self.op.snode == pnode.name:
3990
        raise errors.OpPrereqError("The secondary node cannot be"
3991
                                   " the primary node.")
3992
      self.secondaries.append(self.op.snode)
3993
      _CheckNodeOnline(self, self.op.snode)
3994

    
3995
    nodenames = [pnode.name] + self.secondaries
3996

    
3997
    req_size = _ComputeDiskSize(self.op.disk_template,
3998
                                self.disks)
3999

    
4000
    # Check lv size requirements
4001
    if req_size is not None:
4002
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4003
                                         self.op.hypervisor)
4004
      for node in nodenames:
4005
        info = nodeinfo[node]
4006
        info.Raise()
4007
        info = info.data
4008
        if not info:
4009
          raise errors.OpPrereqError("Cannot get current information"
4010
                                     " from node '%s'" % node)
4011
        vg_free = info.get('vg_free', None)
4012
        if not isinstance(vg_free, int):
4013
          raise errors.OpPrereqError("Can't compute free disk space on"
4014
                                     " node %s" % node)
4015
        if req_size > info['vg_free']:
4016
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4017
                                     " %d MB available, %d MB required" %
4018
                                     (node, info['vg_free'], req_size))
4019

    
4020
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4021

    
4022
    # os verification
4023
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4024
    result.Raise()
4025
    if not isinstance(result.data, objects.OS):
4026
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
4027
                                 " primary node"  % self.op.os_type)
4028

    
4029
    # bridge check on primary node
4030
    bridges = [n.bridge for n in self.nics]
4031
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4032
    result.Raise()
4033
    if not result.data:
4034
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
4035
                                 " exist on destination node '%s'" %
4036
                                 (",".join(bridges), pnode.name))
4037

    
4038
    # memory check on primary node
4039
    if self.op.start:
4040
      _CheckNodeFreeMemory(self, self.pnode.name,
4041
                           "creating instance %s" % self.op.instance_name,
4042
                           self.be_full[constants.BE_MEMORY],
4043
                           self.op.hypervisor)
4044

    
4045
    if self.op.start:
4046
      self.instance_status = 'up'
4047
    else:
4048
      self.instance_status = 'down'
4049

    
4050
  def Exec(self, feedback_fn):
4051
    """Create and add the instance to the cluster.
4052

4053
    """
4054
    instance = self.op.instance_name
4055
    pnode_name = self.pnode.name
4056

    
4057
    for nic in self.nics:
4058
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4059
        nic.mac = self.cfg.GenerateMAC()
4060

    
4061
    ht_kind = self.op.hypervisor
4062
    if ht_kind in constants.HTS_REQ_PORT:
4063
      network_port = self.cfg.AllocatePort()
4064
    else:
4065
      network_port = None
4066

    
4067
    ##if self.op.vnc_bind_address is None:
4068
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4069

    
4070
    # this is needed because os.path.join does not accept None arguments
4071
    if self.op.file_storage_dir is None:
4072
      string_file_storage_dir = ""
4073
    else:
4074
      string_file_storage_dir = self.op.file_storage_dir
4075

    
4076
    # build the full file storage dir path
4077
    file_storage_dir = os.path.normpath(os.path.join(
4078
                                        self.cfg.GetFileStorageDir(),
4079
                                        string_file_storage_dir, instance))
4080

    
4081

    
4082
    disks = _GenerateDiskTemplate(self,
4083
                                  self.op.disk_template,
4084
                                  instance, pnode_name,
4085
                                  self.secondaries,
4086
                                  self.disks,
4087
                                  file_storage_dir,
4088
                                  self.op.file_driver,
4089
                                  0)
4090

    
4091
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4092
                            primary_node=pnode_name,
4093
                            nics=self.nics, disks=disks,
4094
                            disk_template=self.op.disk_template,
4095
                            status=self.instance_status,
4096
                            network_port=network_port,
4097
                            beparams=self.op.beparams,
4098
                            hvparams=self.op.hvparams,
4099
                            hypervisor=self.op.hypervisor,
4100
                            )
4101

    
4102
    feedback_fn("* creating instance disks...")
4103
    if not _CreateDisks(self, iobj):
4104
      _RemoveDisks(self, iobj)
4105
      self.cfg.ReleaseDRBDMinors(instance)
4106
      raise errors.OpExecError("Device creation failed, reverting...")
4107

    
4108
    feedback_fn("adding instance %s to cluster config" % instance)
4109

    
4110
    self.cfg.AddInstance(iobj)
4111
    # Declare that we don't want to remove the instance lock anymore, as we've
4112
    # added the instance to the config
4113
    del self.remove_locks[locking.LEVEL_INSTANCE]
4114
    # Remove the temp. assignements for the instance's drbds
4115
    self.cfg.ReleaseDRBDMinors(instance)
4116
    # Unlock all the nodes
4117
    if self.op.mode == constants.INSTANCE_IMPORT:
4118
      nodes_keep = [self.op.src_node]
4119
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4120
                       if node != self.op.src_node]
4121
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4122
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4123
    else:
4124
      self.context.glm.release(locking.LEVEL_NODE)
4125
      del self.acquired_locks[locking.LEVEL_NODE]
4126

    
4127
    if self.op.wait_for_sync:
4128
      disk_abort = not _WaitForSync(self, iobj)
4129
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4130
      # make sure the disks are not degraded (still sync-ing is ok)
4131
      time.sleep(15)
4132
      feedback_fn("* checking mirrors status")
4133
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4134
    else:
4135
      disk_abort = False
4136

    
4137
    if disk_abort:
4138
      _RemoveDisks(self, iobj)
4139
      self.cfg.RemoveInstance(iobj.name)
4140
      # Make sure the instance lock gets removed
4141
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4142
      raise errors.OpExecError("There are some degraded disks for"
4143
                               " this instance")
4144

    
4145
    feedback_fn("creating os for instance %s on node %s" %
4146
                (instance, pnode_name))
4147

    
4148
    if iobj.disk_template != constants.DT_DISKLESS:
4149
      if self.op.mode == constants.INSTANCE_CREATE:
4150
        feedback_fn("* running the instance OS create scripts...")
4151
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4152
        result.Raise()
4153
        if not result.data:
4154
          raise errors.OpExecError("Could not add os for instance %s"
4155
                                   " on node %s" %
4156
                                   (instance, pnode_name))
4157

    
4158
      elif self.op.mode == constants.INSTANCE_IMPORT:
4159
        feedback_fn("* running the instance OS import scripts...")
4160
        src_node = self.op.src_node
4161
        src_images = self.src_images
4162
        cluster_name = self.cfg.GetClusterName()
4163
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4164
                                                         src_node, src_images,
4165
                                                         cluster_name)
4166
        import_result.Raise()
4167
        for idx, result in enumerate(import_result.data):
4168
          if not result:
4169
            self.LogWarning("Could not import the image %s for instance"
4170
                            " %s, disk %d, on node %s" %
4171
                            (src_images[idx], instance, idx, pnode_name))
4172
      else:
4173
        # also checked in the prereq part
4174
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4175
                                     % self.op.mode)
4176

    
4177
    if self.op.start:
4178
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4179
      feedback_fn("* starting instance...")
4180
      result = self.rpc.call_instance_start(pnode_name, iobj, None)
4181
      result.Raise()
4182
      if not result.data:
4183
        raise errors.OpExecError("Could not start instance")
4184

    
4185

    
4186
class LUConnectConsole(NoHooksLU):
4187
  """Connect to an instance's console.
4188

4189
  This is somewhat special in that it returns the command line that
4190
  you need to run on the master node in order to connect to the
4191
  console.
4192

4193
  """
4194
  _OP_REQP = ["instance_name"]
4195
  REQ_BGL = False
4196

    
4197
  def ExpandNames(self):
4198
    self._ExpandAndLockInstance()
4199

    
4200
  def CheckPrereq(self):
4201
    """Check prerequisites.
4202

4203
    This checks that the instance is in the cluster.
4204

4205
    """
4206
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4207
    assert self.instance is not None, \
4208
      "Cannot retrieve locked instance %s" % self.op.instance_name
4209
    _CheckNodeOnline(self, self.op.primary_node)
4210

    
4211
  def Exec(self, feedback_fn):
4212
    """Connect to the console of an instance
4213

4214
    """
4215
    instance = self.instance
4216
    node = instance.primary_node
4217

    
4218
    node_insts = self.rpc.call_instance_list([node],
4219
                                             [instance.hypervisor])[node]
4220
    node_insts.Raise()
4221

    
4222
    if instance.name not in node_insts.data:
4223
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4224

    
4225
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4226

    
4227
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4228
    console_cmd = hyper.GetShellCommandForConsole(instance)
4229

    
4230
    # build ssh cmdline
4231
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4232

    
4233

    
4234
class LUReplaceDisks(LogicalUnit):
4235
  """Replace the disks of an instance.
4236

4237
  """
4238
  HPATH = "mirrors-replace"
4239
  HTYPE = constants.HTYPE_INSTANCE
4240
  _OP_REQP = ["instance_name", "mode", "disks"]
4241
  REQ_BGL = False
4242

    
4243
  def ExpandNames(self):
4244
    self._ExpandAndLockInstance()
4245

    
4246
    if not hasattr(self.op, "remote_node"):
4247
      self.op.remote_node = None
4248

    
4249
    ia_name = getattr(self.op, "iallocator", None)
4250
    if ia_name is not None:
4251
      if self.op.remote_node is not None:
4252
        raise errors.OpPrereqError("Give either the iallocator or the new"
4253
                                   " secondary, not both")
4254
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4255
    elif self.op.remote_node is not None:
4256
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4257
      if remote_node is None:
4258
        raise errors.OpPrereqError("Node '%s' not known" %
4259
                                   self.op.remote_node)
4260
      self.op.remote_node = remote_node
4261
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4262
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4263
    else:
4264
      self.needed_locks[locking.LEVEL_NODE] = []
4265
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4266

    
4267
  def DeclareLocks(self, level):
4268
    # If we're not already locking all nodes in the set we have to declare the
4269
    # instance's primary/secondary nodes.
4270
    if (level == locking.LEVEL_NODE and
4271
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4272
      self._LockInstancesNodes()
4273

    
4274
  def _RunAllocator(self):
4275
    """Compute a new secondary node using an IAllocator.
4276

4277
    """
4278
    ial = IAllocator(self,
4279
                     mode=constants.IALLOCATOR_MODE_RELOC,
4280
                     name=self.op.instance_name,
4281
                     relocate_from=[self.sec_node])
4282

    
4283
    ial.Run(self.op.iallocator)
4284

    
4285
    if not ial.success:
4286
      raise errors.OpPrereqError("Can't compute nodes using"
4287
                                 " iallocator '%s': %s" % (self.op.iallocator,
4288
                                                           ial.info))
4289
    if len(ial.nodes) != ial.required_nodes:
4290
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4291
                                 " of nodes (%s), required %s" %
4292
                                 (len(ial.nodes), ial.required_nodes))
4293
    self.op.remote_node = ial.nodes[0]
4294
    self.LogInfo("Selected new secondary for the instance: %s",
4295
                 self.op.remote_node)
4296

    
4297
  def BuildHooksEnv(self):
4298
    """Build hooks env.
4299

4300
    This runs on the master, the primary and all the secondaries.
4301

4302
    """
4303
    env = {
4304
      "MODE": self.op.mode,
4305
      "NEW_SECONDARY": self.op.remote_node,
4306
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4307
      }
4308
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4309
    nl = [
4310
      self.cfg.GetMasterNode(),
4311
      self.instance.primary_node,
4312
      ]
4313
    if self.op.remote_node is not None:
4314
      nl.append(self.op.remote_node)
4315
    return env, nl, nl
4316

    
4317
  def CheckPrereq(self):
4318
    """Check prerequisites.
4319

4320
    This checks that the instance is in the cluster.
4321

4322
    """
4323
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4324
    assert instance is not None, \
4325
      "Cannot retrieve locked instance %s" % self.op.instance_name
4326
    self.instance = instance
4327

    
4328
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4329
      raise errors.OpPrereqError("Instance's disk layout is not"
4330
                                 " network mirrored.")
4331

    
4332
    if len(instance.secondary_nodes) != 1:
4333
      raise errors.OpPrereqError("The instance has a strange layout,"
4334
                                 " expected one secondary but found %d" %
4335
                                 len(instance.secondary_nodes))
4336

    
4337
    self.sec_node = instance.secondary_nodes[0]
4338

    
4339
    ia_name = getattr(self.op, "iallocator", None)
4340
    if ia_name is not None:
4341
      self._RunAllocator()
4342

    
4343
    remote_node = self.op.remote_node
4344
    if remote_node is not None:
4345
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4346
      assert self.remote_node_info is not None, \
4347
        "Cannot retrieve locked node %s" % remote_node
4348
    else:
4349
      self.remote_node_info = None
4350
    if remote_node == instance.primary_node:
4351
      raise errors.OpPrereqError("The specified node is the primary node of"
4352
                                 " the instance.")
4353
    elif remote_node == self.sec_node:
4354
      if self.op.mode == constants.REPLACE_DISK_SEC:
4355
        # this is for DRBD8, where we can't execute the same mode of
4356
        # replacement as for drbd7 (no different port allocated)
4357
        raise errors.OpPrereqError("Same secondary given, cannot execute"
4358
                                   " replacement")
4359
    if instance.disk_template == constants.DT_DRBD8:
4360
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4361
          remote_node is not None):
4362
        # switch to replace secondary mode
4363
        self.op.mode = constants.REPLACE_DISK_SEC
4364

    
4365
      if self.op.mode == constants.REPLACE_DISK_ALL:
4366
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4367
                                   " secondary disk replacement, not"
4368
                                   " both at once")
4369
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4370
        if remote_node is not None:
4371
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4372
                                     " the secondary while doing a primary"
4373
                                     " node disk replacement")
4374
        self.tgt_node = instance.primary_node
4375
        self.oth_node = instance.secondary_nodes[0]
4376
        _CheckNodeOnline(self, self.tgt_node)
4377
        _CheckNodeOnline(self, self.oth_node)
4378
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4379
        self.new_node = remote_node # this can be None, in which case
4380
                                    # we don't change the secondary
4381
        self.tgt_node = instance.secondary_nodes[0]
4382
        self.oth_node = instance.primary_node
4383
        _CheckNodeOnline(self, self.oth_node)
4384
        if self.new_node is not None:
4385
          _CheckNodeOnline(self, self.new_node)
4386
        else:
4387
          _CheckNodeOnline(self, self.tgt_node)
4388
      else:
4389
        raise errors.ProgrammerError("Unhandled disk replace mode")
4390

    
4391
    if not self.op.disks:
4392
      self.op.disks = range(len(instance.disks))
4393

    
4394
    for disk_idx in self.op.disks:
4395
      instance.FindDisk(disk_idx)
4396

    
4397
  def _ExecD8DiskOnly(self, feedback_fn):
4398
    """Replace a disk on the primary or secondary for dbrd8.
4399

4400
    The algorithm for replace is quite complicated:
4401

4402
      1. for each disk to be replaced:
4403

4404
        1. create new LVs on the target node with unique names
4405
        1. detach old LVs from the drbd device
4406
        1. rename old LVs to name_replaced.<time_t>
4407
        1. rename new LVs to old LVs
4408
        1. attach the new LVs (with the old names now) to the drbd device
4409

4410
      1. wait for sync across all devices
4411

4412
      1. for each modified disk:
4413

4414
        1. remove old LVs (which have the name name_replaces.<time_t>)
4415

4416
    Failures are not very well handled.
4417

4418
    """
4419
    steps_total = 6
4420
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4421
    instance = self.instance
4422
    iv_names = {}
4423
    vgname = self.cfg.GetVGName()
4424
    # start of work
4425
    cfg = self.cfg
4426
    tgt_node = self.tgt_node
4427
    oth_node = self.oth_node
4428

    
4429
    # Step: check device activation
4430
    self.proc.LogStep(1, steps_total, "check device existence")
4431
    info("checking volume groups")
4432
    my_vg = cfg.GetVGName()
4433
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4434
    if not results:
4435
      raise errors.OpExecError("Can't list volume groups on the nodes")
4436
    for node in oth_node, tgt_node:
4437
      res = results[node]
4438
      if res.failed or not res.data or my_vg not in res.data:
4439
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4440
                                 (my_vg, node))
4441
    for idx, dev in enumerate(instance.disks):
4442
      if idx not in self.op.disks:
4443
        continue
4444
      for node in tgt_node, oth_node:
4445
        info("checking disk/%d on %s" % (idx, node))
4446
        cfg.SetDiskID(dev, node)
4447
        if not self.rpc.call_blockdev_find(node, dev):
4448
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4449
                                   (idx, node))
4450

    
4451
    # Step: check other node consistency
4452
    self.proc.LogStep(2, steps_total, "check peer consistency")
4453
    for idx, dev in enumerate(instance.disks):
4454
      if idx not in self.op.disks:
4455
        continue
4456
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4457
      if not _CheckDiskConsistency(self, dev, oth_node,
4458
                                   oth_node==instance.primary_node):
4459
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4460
                                 " to replace disks on this node (%s)" %
4461
                                 (oth_node, tgt_node))
4462

    
4463
    # Step: create new storage
4464
    self.proc.LogStep(3, steps_total, "allocate new storage")
4465
    for idx, dev in enumerate(instance.disks):
4466
      if idx not in self.op.disks:
4467
        continue
4468
      size = dev.size
4469
      cfg.SetDiskID(dev, tgt_node)
4470
      lv_names = [".disk%d_%s" % (idx, suf)
4471
                  for suf in ["data", "meta"]]
4472
      names = _GenerateUniqueNames(self, lv_names)
4473
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4474
                             logical_id=(vgname, names[0]))
4475
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4476
                             logical_id=(vgname, names[1]))
4477
      new_lvs = [lv_data, lv_meta]
4478
      old_lvs = dev.children
4479
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4480
      info("creating new local storage on %s for %s" %
4481
           (tgt_node, dev.iv_name))
4482
      # since we *always* want to create this LV, we use the
4483
      # _Create...OnPrimary (which forces the creation), even if we
4484
      # are talking about the secondary node
4485
      for new_lv in new_lvs:
4486
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4487
                                        _GetInstanceInfoText(instance)):
4488
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4489
                                   " node '%s'" %
4490
                                   (new_lv.logical_id[1], tgt_node))
4491

    
4492
    # Step: for each lv, detach+rename*2+attach
4493
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4494
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4495
      info("detaching %s drbd from local storage" % dev.iv_name)
4496
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4497
      result.Raise()
4498
      if not result.data:
4499
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4500
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4501
      #dev.children = []
4502
      #cfg.Update(instance)
4503

    
4504
      # ok, we created the new LVs, so now we know we have the needed
4505
      # storage; as such, we proceed on the target node to rename
4506
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4507
      # using the assumption that logical_id == physical_id (which in
4508
      # turn is the unique_id on that node)
4509

    
4510
      # FIXME(iustin): use a better name for the replaced LVs
4511
      temp_suffix = int(time.time())
4512
      ren_fn = lambda d, suff: (d.physical_id[0],
4513
                                d.physical_id[1] + "_replaced-%s" % suff)
4514
      # build the rename list based on what LVs exist on the node
4515
      rlist = []
4516
      for to_ren in old_lvs:
4517
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4518
        if not find_res.failed and find_res.data is not None: # device exists
4519
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4520

    
4521
      info("renaming the old LVs on the target node")
4522
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4523
      result.Raise()
4524
      if not result.data:
4525
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4526
      # now we rename the new LVs to the old LVs
4527
      info("renaming the new LVs on the target node")
4528
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4529
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4530
      result.Raise()
4531
      if not result.data:
4532
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4533

    
4534
      for old, new in zip(old_lvs, new_lvs):
4535
        new.logical_id = old.logical_id
4536
        cfg.SetDiskID(new, tgt_node)
4537

    
4538
      for disk in old_lvs:
4539
        disk.logical_id = ren_fn(disk, temp_suffix)
4540
        cfg.SetDiskID(disk, tgt_node)
4541

    
4542
      # now that the new lvs have the old name, we can add them to the device
4543
      info("adding new mirror component on %s" % tgt_node)
4544
      result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4545
      if result.failed or not result.data:
4546
        for new_lv in new_lvs:
4547
          result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4548
          if result.failed or not result.data:
4549
            warning("Can't rollback device %s", hint="manually cleanup unused"
4550
                    " logical volumes")
4551
        raise errors.OpExecError("Can't add local storage to drbd")
4552

    
4553
      dev.children = new_lvs
4554
      cfg.Update(instance)
4555

    
4556
    # Step: wait for sync
4557

    
4558
    # this can fail as the old devices are degraded and _WaitForSync
4559
    # does a combined result over all disks, so we don't check its
4560
    # return value
4561
    self.proc.LogStep(5, steps_total, "sync devices")
4562
    _WaitForSync(self, instance, unlock=True)
4563

    
4564
    # so check manually all the devices
4565
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4566
      cfg.SetDiskID(dev, instance.primary_node)
4567
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4568
      if result.failed or result.data[5]:
4569
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4570

    
4571
    # Step: remove old storage
4572
    self.proc.LogStep(6, steps_total, "removing old storage")
4573
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4574
      info("remove logical volumes for %s" % name)
4575
      for lv in old_lvs:
4576
        cfg.SetDiskID(lv, tgt_node)
4577
        result = self.rpc.call_blockdev_remove(tgt_node, lv)
4578
        if result.failed or not result.data:
4579
          warning("Can't remove old LV", hint="manually remove unused LVs")
4580
          continue
4581

    
4582
  def _ExecD8Secondary(self, feedback_fn):
4583
    """Replace the secondary node for drbd8.
4584

4585
    The algorithm for replace is quite complicated:
4586
      - for all disks of the instance:
4587
        - create new LVs on the new node with same names
4588
        - shutdown the drbd device on the old secondary
4589
        - disconnect the drbd network on the primary
4590
        - create the drbd device on the new secondary
4591
        - network attach the drbd on the primary, using an artifice:
4592
          the drbd code for Attach() will connect to the network if it
4593
          finds a device which is connected to the good local disks but
4594
          not network enabled
4595
      - wait for sync across all devices
4596
      - remove all disks from the old secondary
4597

4598
    Failures are not very well handled.
4599

4600
    """
4601
    steps_total = 6
4602
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4603
    instance = self.instance
4604
    iv_names = {}
4605
    vgname = self.cfg.GetVGName()
4606
    # start of work
4607
    cfg = self.cfg
4608
    old_node = self.tgt_node
4609
    new_node = self.new_node
4610
    pri_node = instance.primary_node
4611

    
4612
    # Step: check device activation
4613
    self.proc.LogStep(1, steps_total, "check device existence")
4614
    info("checking volume groups")
4615
    my_vg = cfg.GetVGName()
4616
    results = self.rpc.call_vg_list([pri_node, new_node])
4617
    for node in pri_node, new_node:
4618
      res = results[node]
4619
      if res.failed or not res.data or my_vg not in res.data:
4620
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4621
                                 (my_vg, node))
4622
    for idx, dev in enumerate(instance.disks):
4623
      if idx not in self.op.disks:
4624
        continue
4625
      info("checking disk/%d on %s" % (idx, pri_node))
4626
      cfg.SetDiskID(dev, pri_node)
4627
      result = self.rpc.call_blockdev_find(pri_node, dev)
4628
      result.Raise()
4629
      if not result.data:
4630
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4631
                                 (idx, pri_node))
4632

    
4633
    # Step: check other node consistency
4634
    self.proc.LogStep(2, steps_total, "check peer consistency")
4635
    for idx, dev in enumerate(instance.disks):
4636
      if idx not in self.op.disks:
4637
        continue
4638
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4639
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4640
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4641
                                 " unsafe to replace the secondary" %
4642
                                 pri_node)
4643

    
4644
    # Step: create new storage
4645
    self.proc.LogStep(3, steps_total, "allocate new storage")
4646
    for idx, dev in enumerate(instance.disks):
4647
      size = dev.size
4648
      info("adding new local storage on %s for disk/%d" %
4649
           (new_node, idx))
4650
      # since we *always* want to create this LV, we use the
4651
      # _Create...OnPrimary (which forces the creation), even if we
4652
      # are talking about the secondary node
4653
      for new_lv in dev.children:
4654
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4655
                                        _GetInstanceInfoText(instance)):
4656
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4657
                                   " node '%s'" %
4658
                                   (new_lv.logical_id[1], new_node))
4659

    
4660
    # Step 4: dbrd minors and drbd setups changes
4661
    # after this, we must manually remove the drbd minors on both the
4662
    # error and the success paths
4663
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4664
                                   instance.name)
4665
    logging.debug("Allocated minors %s" % (minors,))
4666
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4667
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4668
      size = dev.size
4669
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4670
      # create new devices on new_node
4671
      if pri_node == dev.logical_id[0]:
4672
        new_logical_id = (pri_node, new_node,
4673
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4674
                          dev.logical_id[5])
4675
      else:
4676
        new_logical_id = (new_node, pri_node,
4677
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4678
                          dev.logical_id[5])
4679
      iv_names[idx] = (dev, dev.children, new_logical_id)
4680
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4681
                    new_logical_id)
4682
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4683
                              logical_id=new_logical_id,
4684
                              children=dev.children)
4685
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4686
                                        new_drbd, False,
4687
                                        _GetInstanceInfoText(instance)):
4688
        self.cfg.ReleaseDRBDMinors(instance.name)
4689
        raise errors.OpExecError("Failed to create new DRBD on"
4690
                                 " node '%s'" % new_node)
4691

    
4692
    for idx, dev in enumerate(instance.disks):
4693
      # we have new devices, shutdown the drbd on the old secondary
4694
      info("shutting down drbd for disk/%d on old node" % idx)
4695
      cfg.SetDiskID(dev, old_node)
4696
      result = self.rpc.call_blockdev_shutdown(old_node, dev)
4697
      if result.failed or not result.data:
4698
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4699
                hint="Please cleanup this device manually as soon as possible")
4700

    
4701
    info("detaching primary drbds from the network (=> standalone)")
4702
    done = 0
4703
    for idx, dev in enumerate(instance.disks):
4704
      cfg.SetDiskID(dev, pri_node)
4705
      # set the network part of the physical (unique in bdev terms) id
4706
      # to None, meaning detach from network
4707
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4708
      # and 'find' the device, which will 'fix' it to match the
4709
      # standalone state
4710
      result = self.rpc.call_blockdev_find(pri_node, dev)
4711
      if not result.failed and result.data:
4712
        done += 1
4713
      else:
4714
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4715
                idx)
4716

    
4717
    if not done:
4718
      # no detaches succeeded (very unlikely)
4719
      self.cfg.ReleaseDRBDMinors(instance.name)
4720
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4721

    
4722
    # if we managed to detach at least one, we update all the disks of
4723
    # the instance to point to the new secondary
4724
    info("updating instance configuration")
4725
    for dev, _, new_logical_id in iv_names.itervalues():
4726
      dev.logical_id = new_logical_id
4727
      cfg.SetDiskID(dev, pri_node)
4728
    cfg.Update(instance)
4729
    # we can remove now the temp minors as now the new values are
4730
    # written to the config file (and therefore stable)
4731
    self.cfg.ReleaseDRBDMinors(instance.name)
4732

    
4733
    # and now perform the drbd attach
4734
    info("attaching primary drbds to new secondary (standalone => connected)")
4735
    failures = []
4736
    for idx, dev in enumerate(instance.disks):
4737
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4738
      # since the attach is smart, it's enough to 'find' the device,
4739
      # it will automatically activate the network, if the physical_id
4740
      # is correct
4741
      cfg.SetDiskID(dev, pri_node)
4742
      logging.debug("Disk to attach: %s", dev)
4743
      result = self.rpc.call_blockdev_find(pri_node, dev)
4744
      if result.failed or not result.data:
4745
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4746
                "please do a gnt-instance info to see the status of disks")
4747

    
4748
    # this can fail as the old devices are degraded and _WaitForSync
4749
    # does a combined result over all disks, so we don't check its
4750
    # return value
4751
    self.proc.LogStep(5, steps_total, "sync devices")
4752
    _WaitForSync(self, instance, unlock=True)
4753

    
4754
    # so check manually all the devices
4755
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4756
      cfg.SetDiskID(dev, pri_node)
4757
      result = self.rpc.call_blockdev_find(pri_node, dev)
4758
      result.Raise()
4759
      if result.data[5]:
4760
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4761

    
4762
    self.proc.LogStep(6, steps_total, "removing old storage")
4763
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4764
      info("remove logical volumes for disk/%d" % idx)
4765
      for lv in old_lvs:
4766
        cfg.SetDiskID(lv, old_node)
4767
        result = self.rpc.call_blockdev_remove(old_node, lv)
4768
        if result.failed or not result.data:
4769
          warning("Can't remove LV on old secondary",
4770
                  hint="Cleanup stale volumes by hand")
4771

    
4772
  def Exec(self, feedback_fn):
4773
    """Execute disk replacement.
4774

4775
    This dispatches the disk replacement to the appropriate handler.
4776

4777
    """
4778
    instance = self.instance
4779

    
4780
    # Activate the instance disks if we're replacing them on a down instance
4781
    if instance.status == "down":
4782
      _StartInstanceDisks(self, instance, True)
4783

    
4784
    if instance.disk_template == constants.DT_DRBD8:
4785
      if self.op.remote_node is None:
4786
        fn = self._ExecD8DiskOnly
4787
      else:
4788
        fn = self._ExecD8Secondary
4789
    else:
4790
      raise errors.ProgrammerError("Unhandled disk replacement case")
4791

    
4792
    ret = fn(feedback_fn)
4793

    
4794
    # Deactivate the instance disks if we're replacing them on a down instance
4795
    if instance.status == "down":
4796
      _SafeShutdownInstanceDisks(self, instance)
4797

    
4798
    return ret
4799

    
4800

    
4801
class LUGrowDisk(LogicalUnit):
4802
  """Grow a disk of an instance.
4803

4804
  """
4805
  HPATH = "disk-grow"
4806
  HTYPE = constants.HTYPE_INSTANCE
4807
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4808
  REQ_BGL = False
4809

    
4810
  def ExpandNames(self):
4811
    self._ExpandAndLockInstance()
4812
    self.needed_locks[locking.LEVEL_NODE] = []
4813
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4814

    
4815
  def DeclareLocks(self, level):
4816
    if level == locking.LEVEL_NODE:
4817
      self._LockInstancesNodes()
4818

    
4819
  def BuildHooksEnv(self):
4820
    """Build hooks env.
4821

4822
    This runs on the master, the primary and all the secondaries.
4823

4824
    """
4825
    env = {
4826
      "DISK": self.op.disk,
4827
      "AMOUNT": self.op.amount,
4828
      }
4829
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4830
    nl = [
4831
      self.cfg.GetMasterNode(),
4832
      self.instance.primary_node,
4833
      ]
4834
    return env, nl, nl
4835

    
4836
  def CheckPrereq(self):
4837
    """Check prerequisites.
4838

4839
    This checks that the instance is in the cluster.
4840

4841
    """
4842
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4843
    assert instance is not None, \
4844
      "Cannot retrieve locked instance %s" % self.op.instance_name
4845
    _CheckNodeOnline(self, instance.primary_node)
4846
    for node in instance.secondary_nodes:
4847
      _CheckNodeOnline(self, node)
4848

    
4849

    
4850
    self.instance = instance
4851

    
4852
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4853
      raise errors.OpPrereqError("Instance's disk layout does not support"
4854
                                 " growing.")
4855

    
4856
    self.disk = instance.FindDisk(self.op.disk)
4857

    
4858
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4859
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4860
                                       instance.hypervisor)
4861
    for node in nodenames:
4862
      info = nodeinfo[node]
4863
      if info.failed or not info.data:
4864
        raise errors.OpPrereqError("Cannot get current information"
4865
                                   " from node '%s'" % node)
4866
      vg_free = info.data.get('vg_free', None)
4867
      if not isinstance(vg_free, int):
4868
        raise errors.OpPrereqError("Can't compute free disk space on"
4869
                                   " node %s" % node)
4870
      if self.op.amount > vg_free:
4871
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4872
                                   " %d MiB available, %d MiB required" %
4873
                                   (node, vg_free, self.op.amount))
4874

    
4875
  def Exec(self, feedback_fn):
4876
    """Execute disk grow.
4877

4878
    """
4879
    instance = self.instance
4880
    disk = self.disk
4881
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4882
      self.cfg.SetDiskID(disk, node)
4883
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4884
      result.Raise()
4885
      if (not result.data or not isinstance(result.data, (list, tuple)) or
4886
          len(result.data) != 2):
4887
        raise errors.OpExecError("Grow request failed to node %s" % node)
4888
      elif not result.data[0]:
4889
        raise errors.OpExecError("Grow request failed to node %s: %s" %
4890
                                 (node, result.data[1]))
4891
    disk.RecordGrow(self.op.amount)
4892
    self.cfg.Update(instance)
4893
    if self.op.wait_for_sync:
4894
      disk_abort = not _WaitForSync(self, instance)
4895
      if disk_abort:
4896
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4897
                             " status.\nPlease check the instance.")
4898

    
4899

    
4900
class LUQueryInstanceData(NoHooksLU):
4901
  """Query runtime instance data.
4902

4903
  """
4904
  _OP_REQP = ["instances", "static"]
4905
  REQ_BGL = False
4906

    
4907
  def ExpandNames(self):
4908
    self.needed_locks = {}
4909
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4910

    
4911
    if not isinstance(self.op.instances, list):
4912
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4913

    
4914
    if self.op.instances:
4915
      self.wanted_names = []
4916
      for name in self.op.instances:
4917
        full_name = self.cfg.ExpandInstanceName(name)
4918
        if full_name is None:
4919
          raise errors.OpPrereqError("Instance '%s' not known" %
4920
                                     self.op.instance_name)
4921
        self.wanted_names.append(full_name)
4922
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4923
    else:
4924
      self.wanted_names = None
4925
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4926

    
4927
    self.needed_locks[locking.LEVEL_NODE] = []
4928
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4929

    
4930
  def DeclareLocks(self, level):
4931
    if level == locking.LEVEL_NODE:
4932
      self._LockInstancesNodes()
4933

    
4934
  def CheckPrereq(self):
4935
    """Check prerequisites.
4936

4937
    This only checks the optional instance list against the existing names.
4938

4939
    """
4940
    if self.wanted_names is None:
4941
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4942

    
4943
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4944
                             in self.wanted_names]
4945
    return
4946

    
4947
  def _ComputeDiskStatus(self, instance, snode, dev):
4948
    """Compute block device status.
4949

4950
    """
4951
    static = self.op.static
4952
    if not static:
4953
      self.cfg.SetDiskID(dev, instance.primary_node)
4954
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4955
      dev_pstatus.Raise()
4956
      dev_pstatus = dev_pstatus.data
4957
    else:
4958
      dev_pstatus = None
4959

    
4960
    if dev.dev_type in constants.LDS_DRBD:
4961
      # we change the snode then (otherwise we use the one passed in)
4962
      if dev.logical_id[0] == instance.primary_node:
4963
        snode = dev.logical_id[1]
4964
      else:
4965
        snode = dev.logical_id[0]
4966

    
4967
    if snode and not static:
4968
      self.cfg.SetDiskID(dev, snode)
4969
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4970
      dev_sstatus.Raise()
4971
      dev_sstatus = dev_sstatus.data
4972
    else:
4973
      dev_sstatus = None
4974

    
4975
    if dev.children:
4976
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4977
                      for child in dev.children]
4978
    else:
4979
      dev_children = []
4980

    
4981
    data = {
4982
      "iv_name": dev.iv_name,
4983
      "dev_type": dev.dev_type,
4984
      "logical_id": dev.logical_id,
4985
      "physical_id": dev.physical_id,
4986
      "pstatus": dev_pstatus,
4987
      "sstatus": dev_sstatus,
4988
      "children": dev_children,
4989
      "mode": dev.mode,
4990
      }
4991

    
4992
    return data
4993

    
4994
  def Exec(self, feedback_fn):
4995
    """Gather and return data"""
4996
    result = {}
4997

    
4998
    cluster = self.cfg.GetClusterInfo()
4999

    
5000
    for instance in self.wanted_instances:
5001
      if not self.op.static:
5002
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5003
                                                  instance.name,
5004
                                                  instance.hypervisor)
5005
        remote_info.Raise()
5006
        remote_info = remote_info.data
5007
        if remote_info and "state" in remote_info:
5008
          remote_state = "up"
5009
        else:
5010
          remote_state = "down"
5011
      else:
5012
        remote_state = None
5013
      if instance.status == "down":
5014
        config_state = "down"
5015
      else:
5016
        config_state = "up"
5017

    
5018
      disks = [self._ComputeDiskStatus(instance, None, device)
5019
               for device in instance.disks]
5020

    
5021
      idict = {
5022
        "name": instance.name,
5023
        "config_state": config_state,
5024
        "run_state": remote_state,
5025
        "pnode": instance.primary_node,
5026
        "snodes": instance.secondary_nodes,
5027
        "os": instance.os,
5028
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5029
        "disks": disks,
5030
        "hypervisor": instance.hypervisor,
5031
        "network_port": instance.network_port,
5032
        "hv_instance": instance.hvparams,
5033
        "hv_actual": cluster.FillHV(instance),
5034
        "be_instance": instance.beparams,
5035
        "be_actual": cluster.FillBE(instance),
5036
        }
5037

    
5038
      result[instance.name] = idict
5039

    
5040
    return result
5041

    
5042

    
5043
class LUSetInstanceParams(LogicalUnit):
5044
  """Modifies an instances's parameters.
5045

5046
  """
5047
  HPATH = "instance-modify"
5048
  HTYPE = constants.HTYPE_INSTANCE
5049
  _OP_REQP = ["instance_name"]
5050
  REQ_BGL = False
5051

    
5052
  def CheckArguments(self):
5053
    if not hasattr(self.op, 'nics'):
5054
      self.op.nics = []
5055
    if not hasattr(self.op, 'disks'):
5056
      self.op.disks = []
5057
    if not hasattr(self.op, 'beparams'):
5058
      self.op.beparams = {}
5059
    if not hasattr(self.op, 'hvparams'):
5060
      self.op.hvparams = {}
5061
    self.op.force = getattr(self.op, "force", False)
5062
    if not (self.op.nics or self.op.disks or
5063
            self.op.hvparams or self.op.beparams):
5064
      raise errors.OpPrereqError("No changes submitted")
5065

    
5066
    utils.CheckBEParams(self.op.beparams)
5067

    
5068
    # Disk validation
5069
    disk_addremove = 0
5070
    for disk_op, disk_dict in self.op.disks:
5071
      if disk_op == constants.DDM_REMOVE:
5072
        disk_addremove += 1
5073
        continue
5074
      elif disk_op == constants.DDM_ADD:
5075
        disk_addremove += 1
5076
      else:
5077
        if not isinstance(disk_op, int):
5078
          raise errors.OpPrereqError("Invalid disk index")
5079
      if disk_op == constants.DDM_ADD:
5080
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5081
        if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5082
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5083
        size = disk_dict.get('size', None)
5084
        if size is None:
5085
          raise errors.OpPrereqError("Required disk parameter size missing")
5086
        try:
5087
          size = int(size)
5088
        except ValueError, err:
5089
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5090
                                     str(err))
5091
        disk_dict['size'] = size
5092
      else:
5093
        # modification of disk
5094
        if 'size' in disk_dict:
5095
          raise errors.OpPrereqError("Disk size change not possible, use"
5096
                                     " grow-disk")
5097

    
5098
    if disk_addremove > 1:
5099
      raise errors.OpPrereqError("Only one disk add or remove operation"
5100
                                 " supported at a time")
5101

    
5102
    # NIC validation
5103
    nic_addremove = 0
5104
    for nic_op, nic_dict in self.op.nics:
5105
      if nic_op == constants.DDM_REMOVE:
5106
        nic_addremove += 1
5107
        continue
5108
      elif nic_op == constants.DDM_ADD:
5109
        nic_addremove += 1
5110
      else:
5111
        if not isinstance(nic_op, int):
5112
          raise errors.OpPrereqError("Invalid nic index")
5113

    
5114
      # nic_dict should be a dict
5115
      nic_ip = nic_dict.get('ip', None)
5116
      if nic_ip is not None:
5117
        if nic_ip.lower() == "none":
5118
          nic_dict['ip'] = None
5119
        else:
5120
          if not utils.IsValidIP(nic_ip):
5121
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5122
      # we can only check None bridges and assign the default one
5123
      nic_bridge = nic_dict.get('bridge', None)
5124
      if nic_bridge is None:
5125
        nic_dict['bridge'] = self.cfg.GetDefBridge()
5126
      # but we can validate MACs
5127
      nic_mac = nic_dict.get('mac', None)
5128
      if nic_mac is not None:
5129
        if self.cfg.IsMacInUse(nic_mac):
5130
          raise errors.OpPrereqError("MAC address %s already in use"
5131
                                     " in cluster" % nic_mac)
5132
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5133
          if not utils.IsValidMac(nic_mac):
5134
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5135
    if nic_addremove > 1:
5136
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5137
                                 " supported at a time")
5138

    
5139
  def ExpandNames(self):
5140
    self._ExpandAndLockInstance()
5141
    self.needed_locks[locking.LEVEL_NODE] = []
5142
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5143

    
5144
  def DeclareLocks(self, level):
5145
    if level == locking.LEVEL_NODE:
5146
      self._LockInstancesNodes()
5147

    
5148
  def BuildHooksEnv(self):
5149
    """Build hooks env.
5150

5151
    This runs on the master, primary and secondaries.
5152

5153
    """
5154
    args = dict()
5155
    if constants.BE_MEMORY in self.be_new:
5156
      args['memory'] = self.be_new[constants.BE_MEMORY]
5157
    if constants.BE_VCPUS in self.be_new:
5158
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5159
    # FIXME: readd disk/nic changes
5160
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5161
    nl = [self.cfg.GetMasterNode(),
5162
          self.instance.primary_node] + list(self.instance.secondary_nodes)
5163
    return env, nl, nl
5164

    
5165
  def CheckPrereq(self):
5166
    """Check prerequisites.
5167

5168
    This only checks the instance list against the existing names.
5169

5170
    """
5171
    force = self.force = self.op.force
5172

    
5173
    # checking the new params on the primary/secondary nodes
5174

    
5175
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5176
    assert self.instance is not None, \
5177
      "Cannot retrieve locked instance %s" % self.op.instance_name
5178
    pnode = self.instance.primary_node
5179
    nodelist = [pnode]
5180
    nodelist.extend(instance.secondary_nodes)
5181

    
5182
    # hvparams processing
5183
    if self.op.hvparams:
5184
      i_hvdict = copy.deepcopy(instance.hvparams)
5185
      for key, val in self.op.hvparams.iteritems():
5186
        if val == constants.VALUE_DEFAULT:
5187
          try:
5188
            del i_hvdict[key]
5189
          except KeyError:
5190
            pass
5191
        elif val == constants.VALUE_NONE:
5192
          i_hvdict[key] = None
5193
        else:
5194
          i_hvdict[key] = val
5195
      cluster = self.cfg.GetClusterInfo()
5196
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5197
                                i_hvdict)
5198
      # local check
5199
      hypervisor.GetHypervisor(
5200
        instance.hypervisor).CheckParameterSyntax(hv_new)
5201
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5202
      self.hv_new = hv_new # the new actual values
5203
      self.hv_inst = i_hvdict # the new dict (without defaults)
5204
    else:
5205
      self.hv_new = self.hv_inst = {}
5206

    
5207
    # beparams processing
5208
    if self.op.beparams:
5209
      i_bedict = copy.deepcopy(instance.beparams)
5210
      for key, val in self.op.beparams.iteritems():
5211
        if val == constants.VALUE_DEFAULT:
5212
          try:
5213
            del i_bedict[key]
5214
          except KeyError:
5215
            pass
5216
        else:
5217
          i_bedict[key] = val
5218
      cluster = self.cfg.GetClusterInfo()
5219
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5220
                                i_bedict)
5221
      self.be_new = be_new # the new actual values
5222
      self.be_inst = i_bedict # the new dict (without defaults)
5223
    else:
5224
      self.be_new = self.be_inst = {}
5225

    
5226
    self.warn = []
5227

    
5228
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5229
      mem_check_list = [pnode]
5230
      if be_new[constants.BE_AUTO_BALANCE]:
5231
        # either we changed auto_balance to yes or it was from before
5232
        mem_check_list.extend(instance.secondary_nodes)
5233
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5234
                                                  instance.hypervisor)
5235
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5236
                                         instance.hypervisor)
5237
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5238
        # Assume the primary node is unreachable and go ahead
5239
        self.warn.append("Can't get info from primary node %s" % pnode)
5240
      else:
5241
        if not instance_info.failed and instance_info.data:
5242
          current_mem = instance_info.data['memory']
5243
        else:
5244
          # Assume instance not running
5245
          # (there is a slight race condition here, but it's not very probable,
5246
          # and we have no other way to check)
5247
          current_mem = 0
5248
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5249
                    nodeinfo[pnode].data['memory_free'])
5250
        if miss_mem > 0:
5251
          raise errors.OpPrereqError("This change will prevent the instance"
5252
                                     " from starting, due to %d MB of memory"
5253
                                     " missing on its primary node" % miss_mem)
5254

    
5255
      if be_new[constants.BE_AUTO_BALANCE]:
5256
        for node, nres in instance.secondary_nodes.iteritems():
5257
          if nres.failed or not isinstance(nres.data, dict):
5258
            self.warn.append("Can't get info from secondary node %s" % node)
5259
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5260
            self.warn.append("Not enough memory to failover instance to"
5261
                             " secondary node %s" % node)
5262

    
5263
    # NIC processing
5264
    for nic_op, nic_dict in self.op.nics:
5265
      if nic_op == constants.DDM_REMOVE:
5266
        if not instance.nics:
5267
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5268
        continue
5269
      if nic_op != constants.DDM_ADD:
5270
        # an existing nic
5271
        if nic_op < 0 or nic_op >= len(instance.nics):
5272
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5273
                                     " are 0 to %d" %
5274
                                     (nic_op, len(instance.nics)))
5275
      nic_bridge = nic_dict.get('bridge', None)
5276
      if nic_bridge is not None:
5277
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5278
          msg = ("Bridge '%s' doesn't exist on one of"
5279
                 " the instance nodes" % nic_bridge)
5280
          if self.force:
5281
            self.warn.append(msg)
5282
          else:
5283
            raise errors.OpPrereqError(msg)
5284

    
5285
    # DISK processing
5286
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5287
      raise errors.OpPrereqError("Disk operations not supported for"
5288
                                 " diskless instances")
5289
    for disk_op, disk_dict in self.op.disks:
5290
      if disk_op == constants.DDM_REMOVE:
5291
        if len(instance.disks) == 1:
5292
          raise errors.OpPrereqError("Cannot remove the last disk of"
5293
                                     " an instance")
5294
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5295
        ins_l = ins_l[pnode]
5296
        if not type(ins_l) is list:
5297
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5298
        if instance.name in ins_l:
5299
          raise errors.OpPrereqError("Instance is running, can't remove"
5300
                                     " disks.")
5301

    
5302
      if (disk_op == constants.DDM_ADD and
5303
          len(instance.nics) >= constants.MAX_DISKS):
5304
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5305
                                   " add more" % constants.MAX_DISKS)
5306
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5307
        # an existing disk
5308
        if disk_op < 0 or disk_op >= len(instance.disks):
5309
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5310
                                     " are 0 to %d" %
5311
                                     (disk_op, len(instance.disks)))
5312

    
5313
    return
5314

    
5315
  def Exec(self, feedback_fn):
5316
    """Modifies an instance.
5317

5318
    All parameters take effect only at the next restart of the instance.
5319

5320
    """
5321
    # Process here the warnings from CheckPrereq, as we don't have a
5322
    # feedback_fn there.
5323
    for warn in self.warn:
5324
      feedback_fn("WARNING: %s" % warn)
5325

    
5326
    result = []
5327
    instance = self.instance
5328
    # disk changes
5329
    for disk_op, disk_dict in self.op.disks:
5330
      if disk_op == constants.DDM_REMOVE:
5331
        # remove the last disk
5332
        device = instance.disks.pop()
5333
        device_idx = len(instance.disks)
5334
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5335
          self.cfg.SetDiskID(disk, node)
5336
          result = self.rpc.call_blockdev_remove(node, disk)
5337
          if result.failed or not result.data:
5338
            self.proc.LogWarning("Could not remove disk/%d on node %s,"
5339
                                 " continuing anyway", device_idx, node)
5340
        result.append(("disk/%d" % device_idx, "remove"))
5341
      elif disk_op == constants.DDM_ADD:
5342
        # add a new disk
5343
        if instance.disk_template == constants.DT_FILE:
5344
          file_driver, file_path = instance.disks[0].logical_id
5345
          file_path = os.path.dirname(file_path)
5346
        else:
5347
          file_driver = file_path = None
5348
        disk_idx_base = len(instance.disks)
5349
        new_disk = _GenerateDiskTemplate(self,
5350
                                         instance.disk_template,
5351
                                         instance, instance.primary_node,
5352
                                         instance.secondary_nodes,
5353
                                         [disk_dict],
5354
                                         file_path,
5355
                                         file_driver,
5356
                                         disk_idx_base)[0]
5357
        new_disk.mode = disk_dict['mode']
5358
        instance.disks.append(new_disk)
5359
        info = _GetInstanceInfoText(instance)
5360

    
5361
        logging.info("Creating volume %s for instance %s",
5362
                     new_disk.iv_name, instance.name)
5363
        # Note: this needs to be kept in sync with _CreateDisks
5364
        #HARDCODE
5365
        for secondary_node in instance.secondary_nodes:
5366
          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5367
                                            new_disk, False, info):
5368
            self.LogWarning("Failed to create volume %s (%s) on"
5369
                            " secondary node %s!",
5370
                            new_disk.iv_name, new_disk, secondary_node)
5371
        #HARDCODE
5372
        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5373
                                        instance, new_disk, info):
5374
          self.LogWarning("Failed to create volume %s on primary!",
5375
                          new_disk.iv_name)
5376
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5377
                       (new_disk.size, new_disk.mode)))
5378
      else:
5379
        # change a given disk
5380
        instance.disks[disk_op].mode = disk_dict['mode']
5381
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5382
    # NIC changes
5383
    for nic_op, nic_dict in self.op.nics:
5384
      if nic_op == constants.DDM_REMOVE:
5385
        # remove the last nic
5386
        del instance.nics[-1]
5387
        result.append(("nic.%d" % len(instance.nics), "remove"))
5388
      elif nic_op == constants.DDM_ADD:
5389
        # add a new nic
5390
        if 'mac' not in nic_dict:
5391
          mac = constants.VALUE_GENERATE
5392
        else:
5393
          mac = nic_dict['mac']
5394
        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5395
          mac = self.cfg.GenerateMAC()
5396
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5397
                              bridge=nic_dict.get('bridge', None))
5398
        instance.nics.append(new_nic)
5399
        result.append(("nic.%d" % (len(instance.nics) - 1),
5400
                       "add:mac=%s,ip=%s,bridge=%s" %
5401
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
5402
      else:
5403
        # change a given nic
5404
        for key in 'mac', 'ip', 'bridge':
5405
          if key in nic_dict:
5406
            setattr(instance.nics[nic_op], key, nic_dict[key])
5407
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5408

    
5409
    # hvparams changes
5410
    if self.op.hvparams:
5411
      instance.hvparams = self.hv_new
5412
      for key, val in self.op.hvparams.iteritems():
5413
        result.append(("hv/%s" % key, val))
5414

    
5415
    # beparams changes
5416
    if self.op.beparams:
5417
      instance.beparams = self.be_inst
5418
      for key, val in self.op.beparams.iteritems():
5419
        result.append(("be/%s" % key, val))
5420

    
5421
    self.cfg.Update(instance)
5422

    
5423
    return result
5424

    
5425

    
5426
class LUQueryExports(NoHooksLU):
5427
  """Query the exports list
5428

5429
  """
5430
  _OP_REQP = ['nodes']
5431
  REQ_BGL = False
5432

    
5433
  def ExpandNames(self):
5434
    self.needed_locks = {}
5435
    self.share_locks[locking.LEVEL_NODE] = 1
5436
    if not self.op.nodes:
5437
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5438
    else:
5439
      self.needed_locks[locking.LEVEL_NODE] = \
5440
        _GetWantedNodes(self, self.op.nodes)
5441

    
5442
  def CheckPrereq(self):
5443
    """Check prerequisites.
5444

5445
    """
5446
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5447

    
5448
  def Exec(self, feedback_fn):
5449
    """Compute the list of all the exported system images.
5450

5451
    @rtype: dict
5452
    @return: a dictionary with the structure node->(export-list)
5453
        where export-list is a list of the instances exported on
5454
        that node.
5455

5456
    """
5457
    rpcresult = self.rpc.call_export_list(self.nodes)
5458
    result = {}
5459
    for node in rpcresult:
5460
      if rpcresult[node].failed:
5461
        result[node] = False
5462
      else:
5463
        result[node] = rpcresult[node].data
5464

    
5465
    return result
5466

    
5467

    
5468
class LUExportInstance(LogicalUnit):
5469
  """Export an instance to an image in the cluster.
5470

5471
  """
5472
  HPATH = "instance-export"
5473
  HTYPE = constants.HTYPE_INSTANCE
5474
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
5475
  REQ_BGL = False
5476

    
5477
  def ExpandNames(self):
5478
    self._ExpandAndLockInstance()
5479
    # FIXME: lock only instance primary and destination node
5480
    #
5481
    # Sad but true, for now we have do lock all nodes, as we don't know where
5482
    # the previous export might be, and and in this LU we search for it and
5483
    # remove it from its current node. In the future we could fix this by:
5484
    #  - making a tasklet to search (share-lock all), then create the new one,
5485
    #    then one to remove, after
5486
    #  - removing the removal operation altoghether
5487
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5488

    
5489
  def DeclareLocks(self, level):
5490
    """Last minute lock declaration."""
5491
    # All nodes are locked anyway, so nothing to do here.
5492

    
5493
  def BuildHooksEnv(self):
5494
    """Build hooks env.
5495

5496
    This will run on the master, primary node and target node.
5497

5498
    """
5499
    env = {
5500
      "EXPORT_NODE": self.op.target_node,
5501
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5502
      }
5503
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5504
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5505
          self.op.target_node]
5506
    return env, nl, nl
5507

    
5508
  def CheckPrereq(self):
5509
    """Check prerequisites.
5510

5511
    This checks that the instance and node names are valid.
5512

5513
    """
5514
    instance_name = self.op.instance_name
5515
    self.instance = self.cfg.GetInstanceInfo(instance_name)
5516
    assert self.instance is not None, \
5517
          "Cannot retrieve locked instance %s" % self.op.instance_name
5518
    _CheckNodeOnline(self, instance.primary_node)
5519

    
5520
    self.dst_node = self.cfg.GetNodeInfo(
5521
      self.cfg.ExpandNodeName(self.op.target_node))
5522

    
5523
    if self.dst_node is None:
5524
      # This is wrong node name, not a non-locked node
5525
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5526
    _CheckNodeOnline(self, self.op.target_node)
5527

    
5528
    # instance disk type verification
5529
    for disk in self.instance.disks:
5530
      if disk.dev_type == constants.LD_FILE:
5531
        raise errors.OpPrereqError("Export not supported for instances with"
5532
                                   " file-based disks")
5533

    
5534
  def Exec(self, feedback_fn):
5535
    """Export an instance to an image in the cluster.
5536

5537
    """
5538
    instance = self.instance
5539
    dst_node = self.dst_node
5540
    src_node = instance.primary_node
5541
    if self.op.shutdown:
5542
      # shutdown the instance, but not the disks
5543
      result = self.rpc.call_instance_shutdown(src_node, instance)
5544
      result.Raise()
5545
      if not result.data:
5546
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5547
                                 (instance.name, src_node))
5548

    
5549
    vgname = self.cfg.GetVGName()
5550

    
5551
    snap_disks = []
5552

    
5553
    try:
5554
      for disk in instance.disks:
5555
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5556
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5557
        if new_dev_name.failed or not new_dev_name.data:
5558
          self.LogWarning("Could not snapshot block device %s on node %s",
5559
                          disk.logical_id[1], src_node)
5560
          snap_disks.append(False)
5561
        else:
5562
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5563
                                 logical_id=(vgname, new_dev_name.data),
5564
                                 physical_id=(vgname, new_dev_name.data),
5565
                                 iv_name=disk.iv_name)
5566
          snap_disks.append(new_dev)
5567

    
5568
    finally:
5569
      if self.op.shutdown and instance.status == "up":
5570
        result = self.rpc.call_instance_start(src_node, instance, None)
5571
        if result.failed or not result.data:
5572
          _ShutdownInstanceDisks(self, instance)
5573
          raise errors.OpExecError("Could not start instance")
5574

    
5575
    # TODO: check for size
5576

    
5577
    cluster_name = self.cfg.GetClusterName()
5578
    for idx, dev in enumerate(snap_disks):
5579
      if dev:
5580
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5581
                                               instance, cluster_name, idx)
5582
        if result.failed or not result.data:
5583
          self.LogWarning("Could not export block device %s from node %s to"
5584
                          " node %s", dev.logical_id[1], src_node,
5585
                          dst_node.name)
5586
        result = self.rpc.call_blockdev_remove(src_node, dev)
5587
        if result.failed or not result.data:
5588
          self.LogWarning("Could not remove snapshot block device %s from node"
5589
                          " %s", dev.logical_id[1], src_node)
5590

    
5591
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5592
    if result.failed or not result.data:
5593
      self.LogWarning("Could not finalize export for instance %s on node %s",
5594
                      instance.name, dst_node.name)
5595

    
5596
    nodelist = self.cfg.GetNodeList()
5597
    nodelist.remove(dst_node.name)
5598

    
5599
    # on one-node clusters nodelist will be empty after the removal
5600
    # if we proceed the backup would be removed because OpQueryExports
5601
    # substitutes an empty list with the full cluster node list.
5602
    if nodelist:
5603
      exportlist = self.rpc.call_export_list(nodelist)
5604
      for node in exportlist:
5605
        if exportlist[node].failed:
5606
          continue
5607
        if instance.name in exportlist[node].data:
5608
          if not self.rpc.call_export_remove(node, instance.name):
5609
            self.LogWarning("Could not remove older export for instance %s"
5610
                            " on node %s", instance.name, node)
5611

    
5612

    
5613
class LURemoveExport(NoHooksLU):
5614
  """Remove exports related to the named instance.
5615

5616
  """
5617
  _OP_REQP = ["instance_name"]
5618
  REQ_BGL = False
5619

    
5620
  def ExpandNames(self):
5621
    self.needed_locks = {}
5622
    # We need all nodes to be locked in order for RemoveExport to work, but we
5623
    # don't need to lock the instance itself, as nothing will happen to it (and
5624
    # we can remove exports also for a removed instance)
5625
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5626

    
5627
  def CheckPrereq(self):
5628
    """Check prerequisites.
5629
    """
5630
    pass
5631

    
5632
  def Exec(self, feedback_fn):
5633
    """Remove any export.
5634

5635
    """
5636
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5637
    # If the instance was not found we'll try with the name that was passed in.
5638
    # This will only work if it was an FQDN, though.
5639
    fqdn_warn = False
5640
    if not instance_name:
5641
      fqdn_warn = True
5642
      instance_name = self.op.instance_name
5643

    
5644
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5645
      locking.LEVEL_NODE])
5646
    found = False
5647
    for node in exportlist:
5648
      if exportlist[node].failed:
5649
        self.LogWarning("Failed to query node %s, continuing" % node)
5650
        continue
5651
      if instance_name in exportlist[node].data:
5652
        found = True
5653
        result = self.rpc.call_export_remove(node, instance_name)
5654
        if result.failed or not result.data:
5655
          logging.error("Could not remove export for instance %s"
5656
                        " on node %s", instance_name, node)
5657

    
5658
    if fqdn_warn and not found:
5659
      feedback_fn("Export not found. If trying to remove an export belonging"
5660
                  " to a deleted instance please use its Fully Qualified"
5661
                  " Domain Name.")
5662

    
5663

    
5664
class TagsLU(NoHooksLU):
5665
  """Generic tags LU.
5666

5667
  This is an abstract class which is the parent of all the other tags LUs.
5668

5669
  """
5670

    
5671
  def ExpandNames(self):
5672
    self.needed_locks = {}
5673
    if self.op.kind == constants.TAG_NODE:
5674
      name = self.cfg.ExpandNodeName(self.op.name)
5675
      if name is None:
5676
        raise errors.OpPrereqError("Invalid node name (%s)" %
5677
                                   (self.op.name,))
5678
      self.op.name = name
5679
      self.needed_locks[locking.LEVEL_NODE] = name
5680
    elif self.op.kind == constants.TAG_INSTANCE:
5681
      name = self.cfg.ExpandInstanceName(self.op.name)
5682
      if name is None:
5683
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5684
                                   (self.op.name,))
5685
      self.op.name = name
5686
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5687

    
5688
  def CheckPrereq(self):
5689
    """Check prerequisites.
5690

5691
    """
5692
    if self.op.kind == constants.TAG_CLUSTER:
5693
      self.target = self.cfg.GetClusterInfo()
5694
    elif self.op.kind == constants.TAG_NODE:
5695
      self.target = self.cfg.GetNodeInfo(self.op.name)
5696
    elif self.op.kind == constants.TAG_INSTANCE:
5697
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5698
    else:
5699
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5700
                                 str(self.op.kind))
5701

    
5702

    
5703
class LUGetTags(TagsLU):
5704
  """Returns the tags of a given object.
5705

5706
  """
5707
  _OP_REQP = ["kind", "name"]
5708
  REQ_BGL = False
5709

    
5710
  def Exec(self, feedback_fn):
5711
    """Returns the tag list.
5712

5713
    """
5714
    return list(self.target.GetTags())
5715

    
5716

    
5717
class LUSearchTags(NoHooksLU):
5718
  """Searches the tags for a given pattern.
5719

5720
  """
5721
  _OP_REQP = ["pattern"]
5722
  REQ_BGL = False
5723

    
5724
  def ExpandNames(self):
5725
    self.needed_locks = {}
5726

    
5727
  def CheckPrereq(self):
5728
    """Check prerequisites.
5729

5730
    This checks the pattern passed for validity by compiling it.
5731

5732
    """
5733
    try:
5734
      self.re = re.compile(self.op.pattern)
5735
    except re.error, err:
5736
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5737
                                 (self.op.pattern, err))
5738

    
5739
  def Exec(self, feedback_fn):
5740
    """Returns the tag list.
5741

5742
    """
5743
    cfg = self.cfg
5744
    tgts = [("/cluster", cfg.GetClusterInfo())]
5745
    ilist = cfg.GetAllInstancesInfo().values()
5746
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5747
    nlist = cfg.GetAllNodesInfo().values()
5748
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5749
    results = []
5750
    for path, target in tgts:
5751
      for tag in target.GetTags():
5752
        if self.re.search(tag):
5753
          results.append((path, tag))
5754
    return results
5755

    
5756

    
5757
class LUAddTags(TagsLU):
5758
  """Sets a tag on a given object.
5759

5760
  """
5761
  _OP_REQP = ["kind", "name", "tags"]
5762
  REQ_BGL = False
5763

    
5764
  def CheckPrereq(self):
5765
    """Check prerequisites.
5766

5767
    This checks the type and length of the tag name and value.
5768

5769
    """
5770
    TagsLU.CheckPrereq(self)
5771
    for tag in self.op.tags:
5772
      objects.TaggableObject.ValidateTag(tag)
5773

    
5774
  def Exec(self, feedback_fn):
5775
    """Sets the tag.
5776

5777
    """
5778
    try:
5779
      for tag in self.op.tags:
5780
        self.target.AddTag(tag)
5781
    except errors.TagError, err:
5782
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5783
    try:
5784
      self.cfg.Update(self.target)
5785
    except errors.ConfigurationError:
5786
      raise errors.OpRetryError("There has been a modification to the"
5787
                                " config file and the operation has been"
5788
                                " aborted. Please retry.")
5789

    
5790

    
5791
class LUDelTags(TagsLU):
5792
  """Delete a list of tags from a given object.
5793

5794
  """
5795
  _OP_REQP = ["kind", "name", "tags"]
5796
  REQ_BGL = False
5797

    
5798
  def CheckPrereq(self):
5799
    """Check prerequisites.
5800

5801
    This checks that we have the given tag.
5802

5803
    """
5804
    TagsLU.CheckPrereq(self)
5805
    for tag in self.op.tags:
5806
      objects.TaggableObject.ValidateTag(tag)
5807
    del_tags = frozenset(self.op.tags)
5808
    cur_tags = self.target.GetTags()
5809
    if not del_tags <= cur_tags:
5810
      diff_tags = del_tags - cur_tags
5811
      diff_names = ["'%s'" % tag for tag in diff_tags]
5812
      diff_names.sort()
5813
      raise errors.OpPrereqError("Tag(s) %s not found" %
5814
                                 (",".join(diff_names)))
5815

    
5816
  def Exec(self, feedback_fn):
5817
    """Remove the tag from the object.
5818

5819
    """
5820
    for tag in self.op.tags:
5821
      self.target.RemoveTag(tag)
5822
    try:
5823
      self.cfg.Update(self.target)
5824
    except errors.ConfigurationError:
5825
      raise errors.OpRetryError("There has been a modification to the"
5826
                                " config file and the operation has been"
5827
                                " aborted. Please retry.")
5828

    
5829

    
5830
class LUTestDelay(NoHooksLU):
5831
  """Sleep for a specified amount of time.
5832

5833
  This LU sleeps on the master and/or nodes for a specified amount of
5834
  time.
5835

5836
  """
5837
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5838
  REQ_BGL = False
5839

    
5840
  def ExpandNames(self):
5841
    """Expand names and set required locks.
5842

5843
    This expands the node list, if any.
5844

5845
    """
5846
    self.needed_locks = {}
5847
    if self.op.on_nodes:
5848
      # _GetWantedNodes can be used here, but is not always appropriate to use
5849
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5850
      # more information.
5851
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5852
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5853

    
5854
  def CheckPrereq(self):
5855
    """Check prerequisites.
5856

5857
    """
5858

    
5859
  def Exec(self, feedback_fn):
5860
    """Do the actual sleep.
5861

5862
    """
5863
    if self.op.on_master:
5864
      if not utils.TestDelay(self.op.duration):
5865
        raise errors.OpExecError("Error during master delay test")
5866
    if self.op.on_nodes:
5867
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5868
      if not result:
5869
        raise errors.OpExecError("Complete failure from rpc call")
5870
      for node, node_result in result.items():
5871
        node_result.Raise()
5872
        if not node_result.data:
5873
          raise errors.OpExecError("Failure during rpc call to node %s,"
5874
                                   " result: %s" % (node, node_result.data))
5875

    
5876

    
5877
class IAllocator(object):
5878
  """IAllocator framework.
5879

5880
  An IAllocator instance has three sets of attributes:
5881
    - cfg that is needed to query the cluster
5882
    - input data (all members of the _KEYS class attribute are required)
5883
    - four buffer attributes (in|out_data|text), that represent the
5884
      input (to the external script) in text and data structure format,
5885
      and the output from it, again in two formats
5886
    - the result variables from the script (success, info, nodes) for
5887
      easy usage
5888

5889
  """
5890
  _ALLO_KEYS = [
5891
    "mem_size", "disks", "disk_template",
5892
    "os", "tags", "nics", "vcpus", "hypervisor",
5893
    ]
5894
  _RELO_KEYS = [
5895
    "relocate_from",
5896
    ]
5897

    
5898
  def __init__(self, lu, mode, name, **kwargs):
5899
    self.lu = lu
5900
    # init buffer variables
5901
    self.in_text = self.out_text = self.in_data = self.out_data = None
5902
    # init all input fields so that pylint is happy
5903
    self.mode = mode
5904
    self.name = name
5905
    self.mem_size = self.disks = self.disk_template = None
5906
    self.os = self.tags = self.nics = self.vcpus = None
5907
    self.relocate_from = None
5908
    # computed fields
5909
    self.required_nodes = None
5910
    # init result fields
5911
    self.success = self.info = self.nodes = None
5912
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5913
      keyset = self._ALLO_KEYS
5914
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5915
      keyset = self._RELO_KEYS
5916
    else:
5917
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5918
                                   " IAllocator" % self.mode)
5919
    for key in kwargs:
5920
      if key not in keyset:
5921
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5922
                                     " IAllocator" % key)
5923
      setattr(self, key, kwargs[key])
5924
    for key in keyset:
5925
      if key not in kwargs:
5926
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5927
                                     " IAllocator" % key)
5928
    self._BuildInputData()
5929

    
5930
  def _ComputeClusterData(self):
5931
    """Compute the generic allocator input data.
5932

5933
    This is the data that is independent of the actual operation.
5934

5935
    """
5936
    cfg = self.lu.cfg
5937
    cluster_info = cfg.GetClusterInfo()
5938
    # cluster data
5939
    data = {
5940
      "version": 1,
5941
      "cluster_name": cfg.GetClusterName(),
5942
      "cluster_tags": list(cluster_info.GetTags()),
5943
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5944
      # we don't have job IDs
5945
      }
5946
    iinfo = cfg.GetAllInstancesInfo().values()
5947
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5948

    
5949
    # node data
5950
    node_results = {}
5951
    node_list = cfg.GetNodeList()
5952

    
5953
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5954
      hypervisor = self.hypervisor
5955
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5956
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5957

    
5958
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5959
                                           hypervisor)
5960
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5961
                       cluster_info.enabled_hypervisors)
5962
    for nname in node_list:
5963
      ninfo = cfg.GetNodeInfo(nname)
5964
      node_data[nname].Raise()
5965
      if not isinstance(node_data[nname].data, dict):
5966
        raise errors.OpExecError("Can't get data for node %s" % nname)
5967
      remote_info = node_data[nname].data
5968
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5969
                   'vg_size', 'vg_free', 'cpu_total']:
5970
        if attr not in remote_info:
5971
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5972
                                   (nname, attr))
5973
        try:
5974
          remote_info[attr] = int(remote_info[attr])
5975
        except ValueError, err:
5976
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5977
                                   " %s" % (nname, attr, str(err)))
5978
      # compute memory used by primary instances
5979
      i_p_mem = i_p_up_mem = 0
5980
      for iinfo, beinfo in i_list:
5981
        if iinfo.primary_node == nname:
5982
          i_p_mem += beinfo[constants.BE_MEMORY]
5983
          if iinfo.name not in node_iinfo[nname]:
5984
            i_used_mem = 0
5985
          else:
5986
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5987
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5988
          remote_info['memory_free'] -= max(0, i_mem_diff)
5989

    
5990
          if iinfo.status == "up":
5991
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5992

    
5993
      # compute memory used by instances
5994
      pnr = {
5995
        "tags": list(ninfo.GetTags()),
5996
        "total_memory": remote_info['memory_total'],
5997
        "reserved_memory": remote_info['memory_dom0'],
5998
        "free_memory": remote_info['memory_free'],
5999
        "i_pri_memory": i_p_mem,
6000
        "i_pri_up_memory": i_p_up_mem,
6001
        "total_disk": remote_info['vg_size'],
6002
        "free_disk": remote_info['vg_free'],
6003
        "primary_ip": ninfo.primary_ip,
6004
        "secondary_ip": ninfo.secondary_ip,
6005
        "total_cpus": remote_info['cpu_total'],
6006
        "offline": ninfo.offline,
6007
        }
6008
      node_results[nname] = pnr
6009
    data["nodes"] = node_results
6010

    
6011
    # instance data
6012
    instance_data = {}
6013
    for iinfo, beinfo in i_list:
6014
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6015
                  for n in iinfo.nics]
6016
      pir = {
6017
        "tags": list(iinfo.GetTags()),
6018
        "should_run": iinfo.status == "up",
6019
        "vcpus": beinfo[constants.BE_VCPUS],
6020
        "memory": beinfo[constants.BE_MEMORY],
6021
        "os": iinfo.os,
6022
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6023
        "nics": nic_data,
6024
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
6025
        "disk_template": iinfo.disk_template,
6026
        "hypervisor": iinfo.hypervisor,
6027
        }
6028
      instance_data[iinfo.name] = pir
6029

    
6030
    data["instances"] = instance_data
6031

    
6032
    self.in_data = data
6033

    
6034
  def _AddNewInstance(self):
6035
    """Add new instance data to allocator structure.
6036

6037
    This in combination with _AllocatorGetClusterData will create the
6038
    correct structure needed as input for the allocator.
6039

6040
    The checks for the completeness of the opcode must have already been
6041
    done.
6042

6043
    """
6044
    data = self.in_data
6045
    if len(self.disks) != 2:
6046
      raise errors.OpExecError("Only two-disk configurations supported")
6047

    
6048
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6049

    
6050
    if self.disk_template in constants.DTS_NET_MIRROR:
6051
      self.required_nodes = 2
6052
    else:
6053
      self.required_nodes = 1
6054
    request = {
6055
      "type": "allocate",
6056
      "name": self.name,
6057
      "disk_template": self.disk_template,
6058
      "tags": self.tags,
6059
      "os": self.os,
6060
      "vcpus": self.vcpus,
6061
      "memory": self.mem_size,
6062
      "disks": self.disks,
6063
      "disk_space_total": disk_space,
6064
      "nics": self.nics,
6065
      "required_nodes": self.required_nodes,
6066
      }
6067
    data["request"] = request
6068

    
6069
  def _AddRelocateInstance(self):
6070
    """Add relocate instance data to allocator structure.
6071

6072
    This in combination with _IAllocatorGetClusterData will create the
6073
    correct structure needed as input for the allocator.
6074

6075
    The checks for the completeness of the opcode must have already been
6076
    done.
6077

6078
    """
6079
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6080
    if instance is None:
6081
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6082
                                   " IAllocator" % self.name)
6083

    
6084
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6085
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6086

    
6087
    if len(instance.secondary_nodes) != 1:
6088
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6089

    
6090
    self.required_nodes = 1
6091
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6092
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6093

    
6094
    request = {
6095
      "type": "relocate",
6096
      "name": self.name,
6097
      "disk_space_total": disk_space,
6098
      "required_nodes": self.required_nodes,
6099
      "relocate_from": self.relocate_from,
6100
      }
6101
    self.in_data["request"] = request
6102

    
6103
  def _BuildInputData(self):
6104
    """Build input data structures.
6105

6106
    """
6107
    self._ComputeClusterData()
6108

    
6109
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6110
      self._AddNewInstance()
6111
    else:
6112
      self._AddRelocateInstance()
6113

    
6114
    self.in_text = serializer.Dump(self.in_data)
6115

    
6116
  def Run(self, name, validate=True, call_fn=None):
6117
    """Run an instance allocator and return the results.
6118

6119
    """
6120
    if call_fn is None:
6121
      call_fn = self.lu.rpc.call_iallocator_runner
6122
    data = self.in_text
6123

    
6124
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6125
    result.Raise()
6126

    
6127
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6128
      raise errors.OpExecError("Invalid result from master iallocator runner")
6129

    
6130
    rcode, stdout, stderr, fail = result.data
6131

    
6132
    if rcode == constants.IARUN_NOTFOUND:
6133
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6134
    elif rcode == constants.IARUN_FAILURE:
6135
      raise errors.OpExecError("Instance allocator call failed: %s,"
6136
                               " output: %s" % (fail, stdout+stderr))
6137
    self.out_text = stdout
6138
    if validate:
6139
      self._ValidateResult()
6140

    
6141
  def _ValidateResult(self):
6142
    """Process the allocator results.
6143

6144
    This will process and if successful save the result in
6145
    self.out_data and the other parameters.
6146

6147
    """
6148
    try:
6149
      rdict = serializer.Load(self.out_text)
6150
    except Exception, err:
6151
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6152

    
6153
    if not isinstance(rdict, dict):
6154
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6155

    
6156
    for key in "success", "info", "nodes":
6157
      if key not in rdict:
6158
        raise errors.OpExecError("Can't parse iallocator results:"
6159
                                 " missing key '%s'" % key)
6160
      setattr(self, key, rdict[key])
6161

    
6162
    if not isinstance(rdict["nodes"], list):
6163
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6164
                               " is not a list")
6165
    self.out_data = rdict
6166

    
6167

    
6168
class LUTestAllocator(NoHooksLU):
6169
  """Run allocator tests.
6170

6171
  This LU runs the allocator tests
6172

6173
  """
6174
  _OP_REQP = ["direction", "mode", "name"]
6175

    
6176
  def CheckPrereq(self):
6177
    """Check prerequisites.
6178

6179
    This checks the opcode parameters depending on the director and mode test.
6180

6181
    """
6182
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6183
      for attr in ["name", "mem_size", "disks", "disk_template",
6184
                   "os", "tags", "nics", "vcpus"]:
6185
        if not hasattr(self.op, attr):
6186
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6187
                                     attr)
6188
      iname = self.cfg.ExpandInstanceName(self.op.name)
6189
      if iname is not None:
6190
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6191
                                   iname)
6192
      if not isinstance(self.op.nics, list):
6193
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6194
      for row in self.op.nics:
6195
        if (not isinstance(row, dict) or
6196
            "mac" not in row or
6197
            "ip" not in row or
6198
            "bridge" not in row):
6199
          raise errors.OpPrereqError("Invalid contents of the"
6200
                                     " 'nics' parameter")
6201
      if not isinstance(self.op.disks, list):
6202
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6203
      if len(self.op.disks) != 2:
6204
        raise errors.OpPrereqError("Only two-disk configurations supported")
6205
      for row in self.op.disks:
6206
        if (not isinstance(row, dict) or
6207
            "size" not in row or
6208
            not isinstance(row["size"], int) or
6209
            "mode" not in row or
6210
            row["mode"] not in ['r', 'w']):
6211
          raise errors.OpPrereqError("Invalid contents of the"
6212
                                     " 'disks' parameter")
6213
      if self.op.hypervisor is None:
6214
        self.op.hypervisor = self.cfg.GetHypervisorType()
6215
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6216
      if not hasattr(self.op, "name"):
6217
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6218
      fname = self.cfg.ExpandInstanceName(self.op.name)
6219
      if fname is None:
6220
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6221
                                   self.op.name)
6222
      self.op.name = fname
6223
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6224
    else:
6225
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6226
                                 self.op.mode)
6227

    
6228
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6229
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6230
        raise errors.OpPrereqError("Missing allocator name")
6231
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6232
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6233
                                 self.op.direction)
6234

    
6235
  def Exec(self, feedback_fn):
6236
    """Run the allocator test.
6237

6238
    """
6239
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6240
      ial = IAllocator(self,
6241
                       mode=self.op.mode,
6242
                       name=self.op.name,
6243
                       mem_size=self.op.mem_size,
6244
                       disks=self.op.disks,
6245
                       disk_template=self.op.disk_template,
6246
                       os=self.op.os,
6247
                       tags=self.op.tags,
6248
                       nics=self.op.nics,
6249
                       vcpus=self.op.vcpus,
6250
                       hypervisor=self.op.hypervisor,
6251
                       )
6252
    else:
6253
      ial = IAllocator(self,
6254
                       mode=self.op.mode,
6255
                       name=self.op.name,
6256
                       relocate_from=list(self.relocate_from),
6257
                       )
6258

    
6259
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6260
      result = ial.in_text
6261
    else:
6262
      ial.Run(self.op.allocator, validate=False)
6263
      result = ial.out_text
6264
    return result