Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ fc0fe88c

History | View | Annotate | Download (214.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = lu.cfg.GetInstanceList()
396
  return utils.NiceSort(wanted)
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419
                          memory, vcpus, nics):
420
  """Builds instance related env variables for hooks
421

422
  This builds the hook environment from individual variables.
423

424
  @type name: string
425
  @param name: the name of the instance
426
  @type primary_node: string
427
  @param primary_node: the name of the instance's primary node
428
  @type secondary_nodes: list
429
  @param secondary_nodes: list of secondary nodes as strings
430
  @type os_type: string
431
  @param os_type: the name of the instance's OS
432
  @type status: string
433
  @param status: the desired status of the instances
434
  @type memory: string
435
  @param memory: the memory size of the instance
436
  @type vcpus: string
437
  @param vcpus: the count of VCPUs the instance has
438
  @type nics: list
439
  @param nics: list of tuples (ip, bridge, mac) representing
440
      the NICs the instance  has
441
  @rtype: dict
442
  @return: the hook environment for this instance
443

444
  """
445
  env = {
446
    "OP_TARGET": name,
447
    "INSTANCE_NAME": name,
448
    "INSTANCE_PRIMARY": primary_node,
449
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450
    "INSTANCE_OS_TYPE": os_type,
451
    "INSTANCE_STATUS": status,
452
    "INSTANCE_MEMORY": memory,
453
    "INSTANCE_VCPUS": vcpus,
454
  }
455

    
456
  if nics:
457
    nic_count = len(nics)
458
    for idx, (ip, bridge, mac) in enumerate(nics):
459
      if ip is None:
460
        ip = ""
461
      env["INSTANCE_NIC%d_IP" % idx] = ip
462
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
464
  else:
465
    nic_count = 0
466

    
467
  env["INSTANCE_NIC_COUNT"] = nic_count
468

    
469
  return env
470

    
471

    
472
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473
  """Builds instance related env variables for hooks from an object.
474

475
  @type lu: L{LogicalUnit}
476
  @param lu: the logical unit on whose behalf we execute
477
  @type instance: L{objects.Instance}
478
  @param instance: the instance for which we should build the
479
      environment
480
  @type override: dict
481
  @param override: dictionary with key/values that will override
482
      our values
483
  @rtype: dict
484
  @return: the hook environment dictionary
485

486
  """
487
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
488
  args = {
489
    'name': instance.name,
490
    'primary_node': instance.primary_node,
491
    'secondary_nodes': instance.secondary_nodes,
492
    'os_type': instance.os,
493
    'status': instance.os,
494
    'memory': bep[constants.BE_MEMORY],
495
    'vcpus': bep[constants.BE_VCPUS],
496
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
497
  }
498
  if override:
499
    args.update(override)
500
  return _BuildInstanceHookEnv(**args)
501

    
502

    
503
def _CheckInstanceBridgesExist(lu, instance):
504
  """Check that the brigdes needed by an instance exist.
505

506
  """
507
  # check bridges existance
508
  brlist = [nic.bridge for nic in instance.nics]
509
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
510
  result.Raise()
511
  if not result.data:
512
    raise errors.OpPrereqError("One or more target bridges %s does not"
513
                               " exist on destination node '%s'" %
514
                               (brlist, instance.primary_node))
515

    
516

    
517
class LUDestroyCluster(NoHooksLU):
518
  """Logical unit for destroying the cluster.
519

520
  """
521
  _OP_REQP = []
522

    
523
  def CheckPrereq(self):
524
    """Check prerequisites.
525

526
    This checks whether the cluster is empty.
527

528
    Any errors are signalled by raising errors.OpPrereqError.
529

530
    """
531
    master = self.cfg.GetMasterNode()
532

    
533
    nodelist = self.cfg.GetNodeList()
534
    if len(nodelist) != 1 or nodelist[0] != master:
535
      raise errors.OpPrereqError("There are still %d node(s) in"
536
                                 " this cluster." % (len(nodelist) - 1))
537
    instancelist = self.cfg.GetInstanceList()
538
    if instancelist:
539
      raise errors.OpPrereqError("There are still %d instance(s) in"
540
                                 " this cluster." % len(instancelist))
541

    
542
  def Exec(self, feedback_fn):
543
    """Destroys the cluster.
544

545
    """
546
    master = self.cfg.GetMasterNode()
547
    result = self.rpc.call_node_stop_master(master, False)
548
    result.Raise()
549
    if not result.data:
550
      raise errors.OpExecError("Could not disable the master role")
551
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552
    utils.CreateBackup(priv_key)
553
    utils.CreateBackup(pub_key)
554
    return master
555

    
556

    
557
class LUVerifyCluster(LogicalUnit):
558
  """Verifies the cluster status.
559

560
  """
561
  HPATH = "cluster-verify"
562
  HTYPE = constants.HTYPE_CLUSTER
563
  _OP_REQP = ["skip_checks"]
564
  REQ_BGL = False
565

    
566
  def ExpandNames(self):
567
    self.needed_locks = {
568
      locking.LEVEL_NODE: locking.ALL_SET,
569
      locking.LEVEL_INSTANCE: locking.ALL_SET,
570
    }
571
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
572

    
573
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574
                  node_result, feedback_fn, master_files):
575
    """Run multiple tests against a node.
576

577
    Test list:
578

579
      - compares ganeti version
580
      - checks vg existance and size > 20G
581
      - checks config file checksum
582
      - checks ssh to other nodes
583

584
    @type nodeinfo: L{objects.Node}
585
    @param nodeinfo: the node to check
586
    @param file_list: required list of files
587
    @param local_cksum: dictionary of local files and their checksums
588
    @param node_result: the results from the node
589
    @param feedback_fn: function used to accumulate results
590
    @param master_files: list of files that only masters should have
591

592
    """
593
    node = nodeinfo.name
594

    
595
    # main result, node_result should be a non-empty dict
596
    if not node_result or not isinstance(node_result, dict):
597
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
598
      return True
599

    
600
    # compares ganeti version
601
    local_version = constants.PROTOCOL_VERSION
602
    remote_version = node_result.get('version', None)
603
    if not remote_version:
604
      feedback_fn("  - ERROR: connection to %s failed" % (node))
605
      return True
606

    
607
    if local_version != remote_version:
608
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
609
                      (local_version, node, remote_version))
610
      return True
611

    
612
    # checks vg existance and size > 20G
613

    
614
    bad = False
615
    vglist = node_result.get(constants.NV_VGLIST, None)
616
    if not vglist:
617
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
618
                      (node,))
619
      bad = True
620
    else:
621
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622
                                            constants.MIN_VG_SIZE)
623
      if vgstatus:
624
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
625
        bad = True
626

    
627
    # checks config file checksum
628

    
629
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
630
    if not isinstance(remote_cksum, dict):
631
      bad = True
632
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
633
    else:
634
      for file_name in file_list:
635
        node_is_mc = nodeinfo.master_candidate
636
        must_have_file = file_name not in master_files
637
        if file_name not in remote_cksum:
638
          if node_is_mc or must_have_file:
639
            bad = True
640
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
641
        elif remote_cksum[file_name] != local_cksum[file_name]:
642
          if node_is_mc or must_have_file:
643
            bad = True
644
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
645
          else:
646
            # not candidate and this is not a must-have file
647
            bad = True
648
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
649
                        " '%s'" % file_name)
650
        else:
651
          # all good, except non-master/non-must have combination
652
          if not node_is_mc and not must_have_file:
653
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
654
                        " candidates" % file_name)
655

    
656
    # checks ssh to any
657

    
658
    if constants.NV_NODELIST not in node_result:
659
      bad = True
660
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
661
    else:
662
      if node_result[constants.NV_NODELIST]:
663
        bad = True
664
        for node in node_result[constants.NV_NODELIST]:
665
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
666
                          (node, node_result[constants.NV_NODELIST][node]))
667

    
668
    if constants.NV_NODENETTEST not in node_result:
669
      bad = True
670
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
671
    else:
672
      if node_result[constants.NV_NODENETTEST]:
673
        bad = True
674
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
675
        for node in nlist:
676
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
677
                          (node, node_result[constants.NV_NODENETTEST][node]))
678

    
679
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680
    if isinstance(hyp_result, dict):
681
      for hv_name, hv_result in hyp_result.iteritems():
682
        if hv_result is not None:
683
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
684
                      (hv_name, hv_result))
685
    return bad
686

    
687
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688
                      node_instance, feedback_fn):
689
    """Verify an instance.
690

691
    This function checks to see if the required block devices are
692
    available on the instance's node.
693

694
    """
695
    bad = False
696

    
697
    node_current = instanceconfig.primary_node
698

    
699
    node_vol_should = {}
700
    instanceconfig.MapLVsByNode(node_vol_should)
701

    
702
    for node in node_vol_should:
703
      for volume in node_vol_should[node]:
704
        if node not in node_vol_is or volume not in node_vol_is[node]:
705
          feedback_fn("  - ERROR: volume %s missing on node %s" %
706
                          (volume, node))
707
          bad = True
708

    
709
    if not instanceconfig.status == 'down':
710
      if (node_current not in node_instance or
711
          not instance in node_instance[node_current]):
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758
    """Verify N+1 Memory Resilience.
759

760
    Check that if one single node dies we can still start all the instances it
761
    was primary for.
762

763
    """
764
    bad = False
765

    
766
    for node, nodeinfo in node_info.iteritems():
767
      # This code checks that every node which is now listed as secondary has
768
      # enough memory to host all instances it is supposed to should a single
769
      # other node in the cluster fail.
770
      # FIXME: not ready for failover to an arbitrary node
771
      # FIXME: does not support file-backed instances
772
      # WARNING: we currently take into account down instances as well as up
773
      # ones, considering that even if they're down someone might want to start
774
      # them even in the event of a node failure.
775
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776
        needed_mem = 0
777
        for instance in instances:
778
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779
          if bep[constants.BE_AUTO_BALANCE]:
780
            needed_mem += bep[constants.BE_MEMORY]
781
        if nodeinfo['mfree'] < needed_mem:
782
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
783
                      " failovers should node %s fail" % (node, prinode))
784
          bad = True
785
    return bad
786

    
787
  def CheckPrereq(self):
788
    """Check prerequisites.
789

790
    Transform the list of checks we're going to skip into a set and check that
791
    all its members are valid.
792

793
    """
794
    self.skip_set = frozenset(self.op.skip_checks)
795
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
797

    
798
  def BuildHooksEnv(self):
799
    """Build hooks env.
800

801
    Cluster-Verify hooks just rone in the post phase and their failure makes
802
    the output be logged in the verify output and the verification to fail.
803

804
    """
805
    all_nodes = self.cfg.GetNodeList()
806
    # TODO: populate the environment with useful information for verify hooks
807
    env = {}
808
    return env, [], all_nodes
809

    
810
  def Exec(self, feedback_fn):
811
    """Verify integrity of cluster, performing various test on nodes.
812

813
    """
814
    bad = False
815
    feedback_fn("* Verifying global settings")
816
    for msg in self.cfg.VerifyConfig():
817
      feedback_fn("  - ERROR: %s" % msg)
818

    
819
    vg_name = self.cfg.GetVGName()
820
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
822
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824
    i_non_redundant = [] # Non redundant instances
825
    i_non_a_balanced = [] # Non auto-balanced instances
826
    node_volume = {}
827
    node_instance = {}
828
    node_info = {}
829
    instance_cfg = {}
830

    
831
    # FIXME: verify OS list
832
    # do local checksums
833
    master_files = [constants.CLUSTER_CONF_FILE]
834

    
835
    file_names = ssconf.SimpleStore().GetFileList()
836
    file_names.append(constants.SSL_CERT_FILE)
837
    file_names.extend(master_files)
838

    
839
    local_checksums = utils.FingerprintFiles(file_names)
840

    
841
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842
    node_verify_param = {
843
      constants.NV_FILELIST: file_names,
844
      constants.NV_NODELIST: nodelist,
845
      constants.NV_HYPERVISOR: hypervisors,
846
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847
                                  node.secondary_ip) for node in nodeinfo],
848
      constants.NV_LVLIST: vg_name,
849
      constants.NV_INSTANCELIST: hypervisors,
850
      constants.NV_VGLIST: None,
851
      constants.NV_VERSION: None,
852
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
853
      }
854
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855
                                           self.cfg.GetClusterName())
856

    
857
    cluster = self.cfg.GetClusterInfo()
858
    master_node = self.cfg.GetMasterNode()
859
    for node_i in nodeinfo:
860
      node = node_i.name
861
      nresult = all_nvinfo[node].data
862

    
863
      if node == master_node:
864
        ntype = "master"
865
      elif node_i.master_candidate:
866
        ntype = "master candidate"
867
      else:
868
        ntype = "regular"
869
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
870

    
871
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
872
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
873
        bad = True
874
        continue
875

    
876
      result = self._VerifyNode(node_i, file_names, local_checksums,
877
                                nresult, feedback_fn, master_files)
878
      bad = bad or result
879

    
880
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881
      if isinstance(lvdata, basestring):
882
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
883
                    (node, lvdata.encode('string_escape')))
884
        bad = True
885
        node_volume[node] = {}
886
      elif not isinstance(lvdata, dict):
887
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
888
        bad = True
889
        continue
890
      else:
891
        node_volume[node] = lvdata
892

    
893
      # node_instance
894
      idata = nresult.get(constants.NV_INSTANCELIST, None)
895
      if not isinstance(idata, list):
896
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
897
                    (node,))
898
        bad = True
899
        continue
900

    
901
      node_instance[node] = idata
902

    
903
      # node_info
904
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
905
      if not isinstance(nodeinfo, dict):
906
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
907
        bad = True
908
        continue
909

    
910
      try:
911
        node_info[node] = {
912
          "mfree": int(nodeinfo['memory_free']),
913
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
914
          "pinst": [],
915
          "sinst": [],
916
          # dictionary holding all instances this node is secondary for,
917
          # grouped by their primary node. Each key is a cluster node, and each
918
          # value is a list of instances which have the key as primary and the
919
          # current node as secondary.  this is handy to calculate N+1 memory
920
          # availability if you can only failover from a primary to its
921
          # secondary.
922
          "sinst-by-pnode": {},
923
        }
924
      except ValueError:
925
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926
        bad = True
927
        continue
928

    
929
    node_vol_should = {}
930

    
931
    for instance in instancelist:
932
      feedback_fn("* Verifying instance %s" % instance)
933
      inst_config = self.cfg.GetInstanceInfo(instance)
934
      result =  self._VerifyInstance(instance, inst_config, node_volume,
935
                                     node_instance, feedback_fn)
936
      bad = bad or result
937

    
938
      inst_config.MapLVsByNode(node_vol_should)
939

    
940
      instance_cfg[instance] = inst_config
941

    
942
      pnode = inst_config.primary_node
943
      if pnode in node_info:
944
        node_info[pnode]['pinst'].append(instance)
945
      else:
946
        feedback_fn("  - ERROR: instance %s, connection to primary node"
947
                    " %s failed" % (instance, pnode))
948
        bad = True
949

    
950
      # If the instance is non-redundant we cannot survive losing its primary
951
      # node, so we are not N+1 compliant. On the other hand we have no disk
952
      # templates with more than one secondary so that situation is not well
953
      # supported either.
954
      # FIXME: does not support file-backed instances
955
      if len(inst_config.secondary_nodes) == 0:
956
        i_non_redundant.append(instance)
957
      elif len(inst_config.secondary_nodes) > 1:
958
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
959
                    % instance)
960

    
961
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962
        i_non_a_balanced.append(instance)
963

    
964
      for snode in inst_config.secondary_nodes:
965
        if snode in node_info:
966
          node_info[snode]['sinst'].append(instance)
967
          if pnode not in node_info[snode]['sinst-by-pnode']:
968
            node_info[snode]['sinst-by-pnode'][pnode] = []
969
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
970
        else:
971
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
972
                      " %s failed" % (instance, snode))
973

    
974
    feedback_fn("* Verifying orphan volumes")
975
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976
                                       feedback_fn)
977
    bad = bad or result
978

    
979
    feedback_fn("* Verifying remaining instances")
980
    result = self._VerifyOrphanInstances(instancelist, node_instance,
981
                                         feedback_fn)
982
    bad = bad or result
983

    
984
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985
      feedback_fn("* Verifying N+1 Memory redundancy")
986
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
987
      bad = bad or result
988

    
989
    feedback_fn("* Other Notes")
990
    if i_non_redundant:
991
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
992
                  % len(i_non_redundant))
993

    
994
    if i_non_a_balanced:
995
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
996
                  % len(i_non_a_balanced))
997

    
998
    return not bad
999

    
1000
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001
    """Analize the post-hooks' result
1002

1003
    This method analyses the hook result, handles it, and sends some
1004
    nicely-formatted feedback back to the user.
1005

1006
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008
    @param hooks_results: the results of the multi-node hooks rpc call
1009
    @param feedback_fn: function used send feedback back to the caller
1010
    @param lu_result: previous Exec result
1011
    @return: the new Exec result, based on the previous result
1012
        and hook results
1013

1014
    """
1015
    # We only really run POST phase hooks, and are only interested in
1016
    # their results
1017
    if phase == constants.HOOKS_PHASE_POST:
1018
      # Used to change hooks' output to proper indentation
1019
      indent_re = re.compile('^', re.M)
1020
      feedback_fn("* Hooks Results")
1021
      if not hooks_results:
1022
        feedback_fn("  - ERROR: general communication failure")
1023
        lu_result = 1
1024
      else:
1025
        for node_name in hooks_results:
1026
          show_node_header = True
1027
          res = hooks_results[node_name]
1028
          if res.failed or res.data is False or not isinstance(res.data, list):
1029
            feedback_fn("    Communication failure in hooks execution")
1030
            lu_result = 1
1031
            continue
1032
          for script, hkr, output in res.data:
1033
            if hkr == constants.HKR_FAIL:
1034
              # The node header is only shown once, if there are
1035
              # failing hooks on that node
1036
              if show_node_header:
1037
                feedback_fn("  Node %s:" % node_name)
1038
                show_node_header = False
1039
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1040
              output = indent_re.sub('      ', output)
1041
              feedback_fn("%s" % output)
1042
              lu_result = 1
1043

    
1044
      return lu_result
1045

    
1046

    
1047
class LUVerifyDisks(NoHooksLU):
1048
  """Verifies the cluster disks status.
1049

1050
  """
1051
  _OP_REQP = []
1052
  REQ_BGL = False
1053

    
1054
  def ExpandNames(self):
1055
    self.needed_locks = {
1056
      locking.LEVEL_NODE: locking.ALL_SET,
1057
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1058
    }
1059
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1060

    
1061
  def CheckPrereq(self):
1062
    """Check prerequisites.
1063

1064
    This has no prerequisites.
1065

1066
    """
1067
    pass
1068

    
1069
  def Exec(self, feedback_fn):
1070
    """Verify integrity of cluster disks.
1071

1072
    """
1073
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1074

    
1075
    vg_name = self.cfg.GetVGName()
1076
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1077
    instances = [self.cfg.GetInstanceInfo(name)
1078
                 for name in self.cfg.GetInstanceList()]
1079

    
1080
    nv_dict = {}
1081
    for inst in instances:
1082
      inst_lvs = {}
1083
      if (inst.status != "up" or
1084
          inst.disk_template not in constants.DTS_NET_MIRROR):
1085
        continue
1086
      inst.MapLVsByNode(inst_lvs)
1087
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088
      for node, vol_list in inst_lvs.iteritems():
1089
        for vol in vol_list:
1090
          nv_dict[(node, vol)] = inst
1091

    
1092
    if not nv_dict:
1093
      return result
1094

    
1095
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1096

    
1097
    to_act = set()
1098
    for node in nodes:
1099
      # node_volume
1100
      lvs = node_lvs[node]
1101
      if lvs.failed:
1102
        self.LogWarning("Connection to node %s failed: %s" %
1103
                        (node, lvs.data))
1104
        continue
1105
      lvs = lvs.data
1106
      if isinstance(lvs, basestring):
1107
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108
        res_nlvm[node] = lvs
1109
      elif not isinstance(lvs, dict):
1110
        logging.warning("Connection to node %s failed or invalid data"
1111
                        " returned", node)
1112
        res_nodes.append(node)
1113
        continue
1114

    
1115
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116
        inst = nv_dict.pop((node, lv_name), None)
1117
        if (not lv_online and inst is not None
1118
            and inst.name not in res_instances):
1119
          res_instances.append(inst.name)
1120

    
1121
    # any leftover items in nv_dict are missing LVs, let's arrange the
1122
    # data better
1123
    for key, inst in nv_dict.iteritems():
1124
      if inst.name not in res_missing:
1125
        res_missing[inst.name] = []
1126
      res_missing[inst.name].append(key)
1127

    
1128
    return result
1129

    
1130

    
1131
class LURenameCluster(LogicalUnit):
1132
  """Rename the cluster.
1133

1134
  """
1135
  HPATH = "cluster-rename"
1136
  HTYPE = constants.HTYPE_CLUSTER
1137
  _OP_REQP = ["name"]
1138

    
1139
  def BuildHooksEnv(self):
1140
    """Build hooks env.
1141

1142
    """
1143
    env = {
1144
      "OP_TARGET": self.cfg.GetClusterName(),
1145
      "NEW_NAME": self.op.name,
1146
      }
1147
    mn = self.cfg.GetMasterNode()
1148
    return env, [mn], [mn]
1149

    
1150
  def CheckPrereq(self):
1151
    """Verify that the passed name is a valid one.
1152

1153
    """
1154
    hostname = utils.HostInfo(self.op.name)
1155

    
1156
    new_name = hostname.name
1157
    self.ip = new_ip = hostname.ip
1158
    old_name = self.cfg.GetClusterName()
1159
    old_ip = self.cfg.GetMasterIP()
1160
    if new_name == old_name and new_ip == old_ip:
1161
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162
                                 " cluster has changed")
1163
    if new_ip != old_ip:
1164
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166
                                   " reachable on the network. Aborting." %
1167
                                   new_ip)
1168

    
1169
    self.op.name = new_name
1170

    
1171
  def Exec(self, feedback_fn):
1172
    """Rename the cluster.
1173

1174
    """
1175
    clustername = self.op.name
1176
    ip = self.ip
1177

    
1178
    # shutdown the master IP
1179
    master = self.cfg.GetMasterNode()
1180
    result = self.rpc.call_node_stop_master(master, False)
1181
    if result.failed or not result.data:
1182
      raise errors.OpExecError("Could not disable the master role")
1183

    
1184
    try:
1185
      cluster = self.cfg.GetClusterInfo()
1186
      cluster.cluster_name = clustername
1187
      cluster.master_ip = ip
1188
      self.cfg.Update(cluster)
1189

    
1190
      # update the known hosts file
1191
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192
      node_list = self.cfg.GetNodeList()
1193
      try:
1194
        node_list.remove(master)
1195
      except ValueError:
1196
        pass
1197
      result = self.rpc.call_upload_file(node_list,
1198
                                         constants.SSH_KNOWN_HOSTS_FILE)
1199
      for to_node, to_result in result.iteritems():
1200
        if to_result.failed or not to_result.data:
1201
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1202

    
1203
    finally:
1204
      result = self.rpc.call_node_start_master(master, False)
1205
      if result.failed or not result.data:
1206
        self.LogWarning("Could not re-enable the master role on"
1207
                        " the master, please restart manually.")
1208

    
1209

    
1210
def _RecursiveCheckIfLVMBased(disk):
1211
  """Check if the given disk or its children are lvm-based.
1212

1213
  @type disk: L{objects.Disk}
1214
  @param disk: the disk to check
1215
  @rtype: booleean
1216
  @return: boolean indicating whether a LD_LV dev_type was found or not
1217

1218
  """
1219
  if disk.children:
1220
    for chdisk in disk.children:
1221
      if _RecursiveCheckIfLVMBased(chdisk):
1222
        return True
1223
  return disk.dev_type == constants.LD_LV
1224

    
1225

    
1226
class LUSetClusterParams(LogicalUnit):
1227
  """Change the parameters of the cluster.
1228

1229
  """
1230
  HPATH = "cluster-modify"
1231
  HTYPE = constants.HTYPE_CLUSTER
1232
  _OP_REQP = []
1233
  REQ_BGL = False
1234

    
1235
  def CheckParameters(self):
1236
    """Check parameters
1237

1238
    """
1239
    if not hasattr(self.op, "candidate_pool_size"):
1240
      self.op.candidate_pool_size = None
1241
    if self.op.candidate_pool_size is not None:
1242
      try:
1243
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244
      except ValueError, err:
1245
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1246
                                   str(err))
1247
      if self.op.candidate_pool_size < 1:
1248
        raise errors.OpPrereqError("At least one master candidate needed")
1249

    
1250
  def ExpandNames(self):
1251
    # FIXME: in the future maybe other cluster params won't require checking on
1252
    # all nodes to be modified.
1253
    self.needed_locks = {
1254
      locking.LEVEL_NODE: locking.ALL_SET,
1255
    }
1256
    self.share_locks[locking.LEVEL_NODE] = 1
1257

    
1258
  def BuildHooksEnv(self):
1259
    """Build hooks env.
1260

1261
    """
1262
    env = {
1263
      "OP_TARGET": self.cfg.GetClusterName(),
1264
      "NEW_VG_NAME": self.op.vg_name,
1265
      }
1266
    mn = self.cfg.GetMasterNode()
1267
    return env, [mn], [mn]
1268

    
1269
  def CheckPrereq(self):
1270
    """Check prerequisites.
1271

1272
    This checks whether the given params don't conflict and
1273
    if the given volume group is valid.
1274

1275
    """
1276
    # FIXME: This only works because there is only one parameter that can be
1277
    # changed or removed.
1278
    if self.op.vg_name is not None and not self.op.vg_name:
1279
      instances = self.cfg.GetAllInstancesInfo().values()
1280
      for inst in instances:
1281
        for disk in inst.disks:
1282
          if _RecursiveCheckIfLVMBased(disk):
1283
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1284
                                       " lvm-based instances exist")
1285

    
1286
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1287

    
1288
    # if vg_name not None, checks given volume group on all nodes
1289
    if self.op.vg_name:
1290
      vglist = self.rpc.call_vg_list(node_list)
1291
      for node in node_list:
1292
        if vglist[node].failed:
1293
          # ignoring down node
1294
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1295
          continue
1296
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1297
                                              self.op.vg_name,
1298
                                              constants.MIN_VG_SIZE)
1299
        if vgstatus:
1300
          raise errors.OpPrereqError("Error on node '%s': %s" %
1301
                                     (node, vgstatus))
1302

    
1303
    self.cluster = cluster = self.cfg.GetClusterInfo()
1304
    # validate beparams changes
1305
    if self.op.beparams:
1306
      utils.CheckBEParams(self.op.beparams)
1307
      self.new_beparams = cluster.FillDict(
1308
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1309

    
1310
    # hypervisor list/parameters
1311
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312
    if self.op.hvparams:
1313
      if not isinstance(self.op.hvparams, dict):
1314
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315
      for hv_name, hv_dict in self.op.hvparams.items():
1316
        if hv_name not in self.new_hvparams:
1317
          self.new_hvparams[hv_name] = hv_dict
1318
        else:
1319
          self.new_hvparams[hv_name].update(hv_dict)
1320

    
1321
    if self.op.enabled_hypervisors is not None:
1322
      self.hv_list = self.op.enabled_hypervisors
1323
    else:
1324
      self.hv_list = cluster.enabled_hypervisors
1325

    
1326
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327
      # either the enabled list has changed, or the parameters have, validate
1328
      for hv_name, hv_params in self.new_hvparams.items():
1329
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330
            (self.op.enabled_hypervisors and
1331
             hv_name in self.op.enabled_hypervisors)):
1332
          # either this is a new hypervisor, or its parameters have changed
1333
          hv_class = hypervisor.GetHypervisor(hv_name)
1334
          hv_class.CheckParameterSyntax(hv_params)
1335
          _CheckHVParams(self, node_list, hv_name, hv_params)
1336

    
1337
  def Exec(self, feedback_fn):
1338
    """Change the parameters of the cluster.
1339

1340
    """
1341
    if self.op.vg_name is not None:
1342
      if self.op.vg_name != self.cfg.GetVGName():
1343
        self.cfg.SetVGName(self.op.vg_name)
1344
      else:
1345
        feedback_fn("Cluster LVM configuration already in desired"
1346
                    " state, not changing")
1347
    if self.op.hvparams:
1348
      self.cluster.hvparams = self.new_hvparams
1349
    if self.op.enabled_hypervisors is not None:
1350
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351
    if self.op.beparams:
1352
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353
    if self.op.candidate_pool_size is not None:
1354
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1355

    
1356
    self.cfg.Update(self.cluster)
1357

    
1358
    # we want to update nodes after the cluster so that if any errors
1359
    # happen, we have recorded and saved the cluster info
1360
    if self.op.candidate_pool_size is not None:
1361
      node_info = self.cfg.GetAllNodesInfo().values()
1362
      num_candidates = len([node for node in node_info
1363
                            if node.master_candidate])
1364
      num_nodes = len(node_info)
1365
      if num_candidates < self.op.candidate_pool_size:
1366
        random.shuffle(node_info)
1367
        for node in node_info:
1368
          if num_candidates >= self.op.candidate_pool_size:
1369
            break
1370
          if node.master_candidate:
1371
            continue
1372
          node.master_candidate = True
1373
          self.LogInfo("Promoting node %s to master candidate", node.name)
1374
          self.cfg.Update(node)
1375
          self.context.ReaddNode(node)
1376
          num_candidates += 1
1377
      elif num_candidates > self.op.candidate_pool_size:
1378
        self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379
                     " of candidate_pool_size (%d)" %
1380
                     (num_candidates, self.op.candidate_pool_size))
1381

    
1382

    
1383
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384
  """Sleep and poll for an instance's disk to sync.
1385

1386
  """
1387
  if not instance.disks:
1388
    return True
1389

    
1390
  if not oneshot:
1391
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1392

    
1393
  node = instance.primary_node
1394

    
1395
  for dev in instance.disks:
1396
    lu.cfg.SetDiskID(dev, node)
1397

    
1398
  retries = 0
1399
  while True:
1400
    max_time = 0
1401
    done = True
1402
    cumul_degraded = False
1403
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404
    if rstats.failed or not rstats.data:
1405
      lu.LogWarning("Can't get any data from node %s", node)
1406
      retries += 1
1407
      if retries >= 10:
1408
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1409
                                 " aborting." % node)
1410
      time.sleep(6)
1411
      continue
1412
    rstats = rstats.data
1413
    retries = 0
1414
    for i in range(len(rstats)):
1415
      mstat = rstats[i]
1416
      if mstat is None:
1417
        lu.LogWarning("Can't compute data for node %s/%s",
1418
                           node, instance.disks[i].iv_name)
1419
        continue
1420
      # we ignore the ldisk parameter
1421
      perc_done, est_time, is_degraded, _ = mstat
1422
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423
      if perc_done is not None:
1424
        done = False
1425
        if est_time is not None:
1426
          rem_time = "%d estimated seconds remaining" % est_time
1427
          max_time = est_time
1428
        else:
1429
          rem_time = "no time estimate"
1430
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431
                        (instance.disks[i].iv_name, perc_done, rem_time))
1432
    if done or oneshot:
1433
      break
1434

    
1435
    time.sleep(min(60, max_time))
1436

    
1437
  if done:
1438
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439
  return not cumul_degraded
1440

    
1441

    
1442
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443
  """Check that mirrors are not degraded.
1444

1445
  The ldisk parameter, if True, will change the test from the
1446
  is_degraded attribute (which represents overall non-ok status for
1447
  the device(s)) to the ldisk (representing the local storage status).
1448

1449
  """
1450
  lu.cfg.SetDiskID(dev, node)
1451
  if ldisk:
1452
    idx = 6
1453
  else:
1454
    idx = 5
1455

    
1456
  result = True
1457
  if on_primary or dev.AssembleOnSecondary():
1458
    rstats = lu.rpc.call_blockdev_find(node, dev)
1459
    if rstats.failed or not rstats.data:
1460
      logging.warning("Node %s: disk degraded, not found or node down", node)
1461
      result = False
1462
    else:
1463
      result = result and (not rstats.data[idx])
1464
  if dev.children:
1465
    for child in dev.children:
1466
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1467

    
1468
  return result
1469

    
1470

    
1471
class LUDiagnoseOS(NoHooksLU):
1472
  """Logical unit for OS diagnose/query.
1473

1474
  """
1475
  _OP_REQP = ["output_fields", "names"]
1476
  REQ_BGL = False
1477
  _FIELDS_STATIC = utils.FieldSet()
1478
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1479

    
1480
  def ExpandNames(self):
1481
    if self.op.names:
1482
      raise errors.OpPrereqError("Selective OS query not supported")
1483

    
1484
    _CheckOutputFields(static=self._FIELDS_STATIC,
1485
                       dynamic=self._FIELDS_DYNAMIC,
1486
                       selected=self.op.output_fields)
1487

    
1488
    # Lock all nodes, in shared mode
1489
    self.needed_locks = {}
1490
    self.share_locks[locking.LEVEL_NODE] = 1
1491
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1492

    
1493
  def CheckPrereq(self):
1494
    """Check prerequisites.
1495

1496
    """
1497

    
1498
  @staticmethod
1499
  def _DiagnoseByOS(node_list, rlist):
1500
    """Remaps a per-node return list into an a per-os per-node dictionary
1501

1502
    @param node_list: a list with the names of all nodes
1503
    @param rlist: a map with node names as keys and OS objects as values
1504

1505
    @rtype: dict
1506
    @returns: a dictionary with osnames as keys and as value another map, with
1507
        nodes as keys and list of OS objects as values, eg::
1508

1509
          {"debian-etch": {"node1": [<object>,...],
1510
                           "node2": [<object>,]}
1511
          }
1512

1513
    """
1514
    all_os = {}
1515
    for node_name, nr in rlist.iteritems():
1516
      if nr.failed or not nr.data:
1517
        continue
1518
      for os_obj in nr.data:
1519
        if os_obj.name not in all_os:
1520
          # build a list of nodes for this os containing empty lists
1521
          # for each node in node_list
1522
          all_os[os_obj.name] = {}
1523
          for nname in node_list:
1524
            all_os[os_obj.name][nname] = []
1525
        all_os[os_obj.name][node_name].append(os_obj)
1526
    return all_os
1527

    
1528
  def Exec(self, feedback_fn):
1529
    """Compute the list of OSes.
1530

1531
    """
1532
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1533
    node_data = self.rpc.call_os_diagnose(node_list)
1534
    if node_data == False:
1535
      raise errors.OpExecError("Can't gather the list of OSes")
1536
    pol = self._DiagnoseByOS(node_list, node_data)
1537
    output = []
1538
    for os_name, os_data in pol.iteritems():
1539
      row = []
1540
      for field in self.op.output_fields:
1541
        if field == "name":
1542
          val = os_name
1543
        elif field == "valid":
1544
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1545
        elif field == "node_status":
1546
          val = {}
1547
          for node_name, nos_list in os_data.iteritems():
1548
            val[node_name] = [(v.status, v.path) for v in nos_list]
1549
        else:
1550
          raise errors.ParameterError(field)
1551
        row.append(val)
1552
      output.append(row)
1553

    
1554
    return output
1555

    
1556

    
1557
class LURemoveNode(LogicalUnit):
1558
  """Logical unit for removing a node.
1559

1560
  """
1561
  HPATH = "node-remove"
1562
  HTYPE = constants.HTYPE_NODE
1563
  _OP_REQP = ["node_name"]
1564

    
1565
  def BuildHooksEnv(self):
1566
    """Build hooks env.
1567

1568
    This doesn't run on the target node in the pre phase as a failed
1569
    node would then be impossible to remove.
1570

1571
    """
1572
    env = {
1573
      "OP_TARGET": self.op.node_name,
1574
      "NODE_NAME": self.op.node_name,
1575
      }
1576
    all_nodes = self.cfg.GetNodeList()
1577
    all_nodes.remove(self.op.node_name)
1578
    return env, all_nodes, all_nodes
1579

    
1580
  def CheckPrereq(self):
1581
    """Check prerequisites.
1582

1583
    This checks:
1584
     - the node exists in the configuration
1585
     - it does not have primary or secondary instances
1586
     - it's not the master
1587

1588
    Any errors are signalled by raising errors.OpPrereqError.
1589

1590
    """
1591
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1592
    if node is None:
1593
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1594

    
1595
    instance_list = self.cfg.GetInstanceList()
1596

    
1597
    masternode = self.cfg.GetMasterNode()
1598
    if node.name == masternode:
1599
      raise errors.OpPrereqError("Node is the master node,"
1600
                                 " you need to failover first.")
1601

    
1602
    for instance_name in instance_list:
1603
      instance = self.cfg.GetInstanceInfo(instance_name)
1604
      if node.name == instance.primary_node:
1605
        raise errors.OpPrereqError("Instance %s still running on the node,"
1606
                                   " please remove first." % instance_name)
1607
      if node.name in instance.secondary_nodes:
1608
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609
                                   " please remove first." % instance_name)
1610
    self.op.node_name = node.name
1611
    self.node = node
1612

    
1613
  def Exec(self, feedback_fn):
1614
    """Removes the node from the cluster.
1615

1616
    """
1617
    node = self.node
1618
    logging.info("Stopping the node daemon and removing configs from node %s",
1619
                 node.name)
1620

    
1621
    self.context.RemoveNode(node.name)
1622

    
1623
    self.rpc.call_node_leave_cluster(node.name)
1624

    
1625
    # Promote nodes to master candidate as needed
1626
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1627
    node_info = self.cfg.GetAllNodesInfo().values()
1628
    num_candidates = len([n for n in node_info
1629
                          if n.master_candidate])
1630
    num_nodes = len(node_info)
1631
    random.shuffle(node_info)
1632
    for node in node_info:
1633
      if num_candidates >= cp_size or num_candidates >= num_nodes:
1634
        break
1635
      if node.master_candidate:
1636
        continue
1637
      node.master_candidate = True
1638
      self.LogInfo("Promoting node %s to master candidate", node.name)
1639
      self.cfg.Update(node)
1640
      self.context.ReaddNode(node)
1641
      num_candidates += 1
1642

    
1643

    
1644
class LUQueryNodes(NoHooksLU):
1645
  """Logical unit for querying nodes.
1646

1647
  """
1648
  _OP_REQP = ["output_fields", "names"]
1649
  REQ_BGL = False
1650
  _FIELDS_DYNAMIC = utils.FieldSet(
1651
    "dtotal", "dfree",
1652
    "mtotal", "mnode", "mfree",
1653
    "bootid",
1654
    "ctotal",
1655
    )
1656

    
1657
  _FIELDS_STATIC = utils.FieldSet(
1658
    "name", "pinst_cnt", "sinst_cnt",
1659
    "pinst_list", "sinst_list",
1660
    "pip", "sip", "tags",
1661
    "serial_no",
1662
    "master_candidate",
1663
    "master",
1664
    )
1665

    
1666
  def ExpandNames(self):
1667
    _CheckOutputFields(static=self._FIELDS_STATIC,
1668
                       dynamic=self._FIELDS_DYNAMIC,
1669
                       selected=self.op.output_fields)
1670

    
1671
    self.needed_locks = {}
1672
    self.share_locks[locking.LEVEL_NODE] = 1
1673

    
1674
    if self.op.names:
1675
      self.wanted = _GetWantedNodes(self, self.op.names)
1676
    else:
1677
      self.wanted = locking.ALL_SET
1678

    
1679
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1680
    if self.do_locking:
1681
      # if we don't request only static fields, we need to lock the nodes
1682
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1683

    
1684

    
1685
  def CheckPrereq(self):
1686
    """Check prerequisites.
1687

1688
    """
1689
    # The validation of the node list is done in the _GetWantedNodes,
1690
    # if non empty, and if empty, there's no validation to do
1691
    pass
1692

    
1693
  def Exec(self, feedback_fn):
1694
    """Computes the list of nodes and their attributes.
1695

1696
    """
1697
    all_info = self.cfg.GetAllNodesInfo()
1698
    if self.do_locking:
1699
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1700
    elif self.wanted != locking.ALL_SET:
1701
      nodenames = self.wanted
1702
      missing = set(nodenames).difference(all_info.keys())
1703
      if missing:
1704
        raise errors.OpExecError(
1705
          "Some nodes were removed before retrieving their data: %s" % missing)
1706
    else:
1707
      nodenames = all_info.keys()
1708

    
1709
    nodenames = utils.NiceSort(nodenames)
1710
    nodelist = [all_info[name] for name in nodenames]
1711

    
1712
    # begin data gathering
1713

    
1714
    if self.do_locking:
1715
      live_data = {}
1716
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1717
                                          self.cfg.GetHypervisorType())
1718
      for name in nodenames:
1719
        nodeinfo = node_data[name]
1720
        if not nodeinfo.failed and nodeinfo.data:
1721
          nodeinfo = nodeinfo.data
1722
          fn = utils.TryConvert
1723
          live_data[name] = {
1724
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1725
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1726
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1727
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1728
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1729
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1730
            "bootid": nodeinfo.get('bootid', None),
1731
            }
1732
        else:
1733
          live_data[name] = {}
1734
    else:
1735
      live_data = dict.fromkeys(nodenames, {})
1736

    
1737
    node_to_primary = dict([(name, set()) for name in nodenames])
1738
    node_to_secondary = dict([(name, set()) for name in nodenames])
1739

    
1740
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1741
                             "sinst_cnt", "sinst_list"))
1742
    if inst_fields & frozenset(self.op.output_fields):
1743
      instancelist = self.cfg.GetInstanceList()
1744

    
1745
      for instance_name in instancelist:
1746
        inst = self.cfg.GetInstanceInfo(instance_name)
1747
        if inst.primary_node in node_to_primary:
1748
          node_to_primary[inst.primary_node].add(inst.name)
1749
        for secnode in inst.secondary_nodes:
1750
          if secnode in node_to_secondary:
1751
            node_to_secondary[secnode].add(inst.name)
1752

    
1753
    master_node = self.cfg.GetMasterNode()
1754

    
1755
    # end data gathering
1756

    
1757
    output = []
1758
    for node in nodelist:
1759
      node_output = []
1760
      for field in self.op.output_fields:
1761
        if field == "name":
1762
          val = node.name
1763
        elif field == "pinst_list":
1764
          val = list(node_to_primary[node.name])
1765
        elif field == "sinst_list":
1766
          val = list(node_to_secondary[node.name])
1767
        elif field == "pinst_cnt":
1768
          val = len(node_to_primary[node.name])
1769
        elif field == "sinst_cnt":
1770
          val = len(node_to_secondary[node.name])
1771
        elif field == "pip":
1772
          val = node.primary_ip
1773
        elif field == "sip":
1774
          val = node.secondary_ip
1775
        elif field == "tags":
1776
          val = list(node.GetTags())
1777
        elif field == "serial_no":
1778
          val = node.serial_no
1779
        elif field == "master_candidate":
1780
          val = node.master_candidate
1781
        elif field == "master":
1782
          val = node.name == master_node
1783
        elif self._FIELDS_DYNAMIC.Matches(field):
1784
          val = live_data[node.name].get(field, None)
1785
        else:
1786
          raise errors.ParameterError(field)
1787
        node_output.append(val)
1788
      output.append(node_output)
1789

    
1790
    return output
1791

    
1792

    
1793
class LUQueryNodeVolumes(NoHooksLU):
1794
  """Logical unit for getting volumes on node(s).
1795

1796
  """
1797
  _OP_REQP = ["nodes", "output_fields"]
1798
  REQ_BGL = False
1799
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1800
  _FIELDS_STATIC = utils.FieldSet("node")
1801

    
1802
  def ExpandNames(self):
1803
    _CheckOutputFields(static=self._FIELDS_STATIC,
1804
                       dynamic=self._FIELDS_DYNAMIC,
1805
                       selected=self.op.output_fields)
1806

    
1807
    self.needed_locks = {}
1808
    self.share_locks[locking.LEVEL_NODE] = 1
1809
    if not self.op.nodes:
1810
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1811
    else:
1812
      self.needed_locks[locking.LEVEL_NODE] = \
1813
        _GetWantedNodes(self, self.op.nodes)
1814

    
1815
  def CheckPrereq(self):
1816
    """Check prerequisites.
1817

1818
    This checks that the fields required are valid output fields.
1819

1820
    """
1821
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1822

    
1823
  def Exec(self, feedback_fn):
1824
    """Computes the list of nodes and their attributes.
1825

1826
    """
1827
    nodenames = self.nodes
1828
    volumes = self.rpc.call_node_volumes(nodenames)
1829

    
1830
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1831
             in self.cfg.GetInstanceList()]
1832

    
1833
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1834

    
1835
    output = []
1836
    for node in nodenames:
1837
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1838
        continue
1839

    
1840
      node_vols = volumes[node].data[:]
1841
      node_vols.sort(key=lambda vol: vol['dev'])
1842

    
1843
      for vol in node_vols:
1844
        node_output = []
1845
        for field in self.op.output_fields:
1846
          if field == "node":
1847
            val = node
1848
          elif field == "phys":
1849
            val = vol['dev']
1850
          elif field == "vg":
1851
            val = vol['vg']
1852
          elif field == "name":
1853
            val = vol['name']
1854
          elif field == "size":
1855
            val = int(float(vol['size']))
1856
          elif field == "instance":
1857
            for inst in ilist:
1858
              if node not in lv_by_node[inst]:
1859
                continue
1860
              if vol['name'] in lv_by_node[inst][node]:
1861
                val = inst.name
1862
                break
1863
            else:
1864
              val = '-'
1865
          else:
1866
            raise errors.ParameterError(field)
1867
          node_output.append(str(val))
1868

    
1869
        output.append(node_output)
1870

    
1871
    return output
1872

    
1873

    
1874
class LUAddNode(LogicalUnit):
1875
  """Logical unit for adding node to the cluster.
1876

1877
  """
1878
  HPATH = "node-add"
1879
  HTYPE = constants.HTYPE_NODE
1880
  _OP_REQP = ["node_name"]
1881

    
1882
  def BuildHooksEnv(self):
1883
    """Build hooks env.
1884

1885
    This will run on all nodes before, and on all nodes + the new node after.
1886

1887
    """
1888
    env = {
1889
      "OP_TARGET": self.op.node_name,
1890
      "NODE_NAME": self.op.node_name,
1891
      "NODE_PIP": self.op.primary_ip,
1892
      "NODE_SIP": self.op.secondary_ip,
1893
      }
1894
    nodes_0 = self.cfg.GetNodeList()
1895
    nodes_1 = nodes_0 + [self.op.node_name, ]
1896
    return env, nodes_0, nodes_1
1897

    
1898
  def CheckPrereq(self):
1899
    """Check prerequisites.
1900

1901
    This checks:
1902
     - the new node is not already in the config
1903
     - it is resolvable
1904
     - its parameters (single/dual homed) matches the cluster
1905

1906
    Any errors are signalled by raising errors.OpPrereqError.
1907

1908
    """
1909
    node_name = self.op.node_name
1910
    cfg = self.cfg
1911

    
1912
    dns_data = utils.HostInfo(node_name)
1913

    
1914
    node = dns_data.name
1915
    primary_ip = self.op.primary_ip = dns_data.ip
1916
    secondary_ip = getattr(self.op, "secondary_ip", None)
1917
    if secondary_ip is None:
1918
      secondary_ip = primary_ip
1919
    if not utils.IsValidIP(secondary_ip):
1920
      raise errors.OpPrereqError("Invalid secondary IP given")
1921
    self.op.secondary_ip = secondary_ip
1922

    
1923
    node_list = cfg.GetNodeList()
1924
    if not self.op.readd and node in node_list:
1925
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1926
                                 node)
1927
    elif self.op.readd and node not in node_list:
1928
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1929

    
1930
    for existing_node_name in node_list:
1931
      existing_node = cfg.GetNodeInfo(existing_node_name)
1932

    
1933
      if self.op.readd and node == existing_node_name:
1934
        if (existing_node.primary_ip != primary_ip or
1935
            existing_node.secondary_ip != secondary_ip):
1936
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1937
                                     " address configuration as before")
1938
        continue
1939

    
1940
      if (existing_node.primary_ip == primary_ip or
1941
          existing_node.secondary_ip == primary_ip or
1942
          existing_node.primary_ip == secondary_ip or
1943
          existing_node.secondary_ip == secondary_ip):
1944
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1945
                                   " existing node %s" % existing_node.name)
1946

    
1947
    # check that the type of the node (single versus dual homed) is the
1948
    # same as for the master
1949
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1950
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1951
    newbie_singlehomed = secondary_ip == primary_ip
1952
    if master_singlehomed != newbie_singlehomed:
1953
      if master_singlehomed:
1954
        raise errors.OpPrereqError("The master has no private ip but the"
1955
                                   " new node has one")
1956
      else:
1957
        raise errors.OpPrereqError("The master has a private ip but the"
1958
                                   " new node doesn't have one")
1959

    
1960
    # checks reachablity
1961
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1962
      raise errors.OpPrereqError("Node not reachable by ping")
1963

    
1964
    if not newbie_singlehomed:
1965
      # check reachability from my secondary ip to newbie's secondary ip
1966
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1967
                           source=myself.secondary_ip):
1968
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1969
                                   " based ping to noded port")
1970

    
1971
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1972
    node_info = self.cfg.GetAllNodesInfo().values()
1973
    num_candidates = len([n for n in node_info
1974
                          if n.master_candidate])
1975
    master_candidate = num_candidates < cp_size
1976

    
1977
    self.new_node = objects.Node(name=node,
1978
                                 primary_ip=primary_ip,
1979
                                 secondary_ip=secondary_ip,
1980
                                 master_candidate=master_candidate,
1981
                                 offline=False)
1982

    
1983
  def Exec(self, feedback_fn):
1984
    """Adds the new node to the cluster.
1985

1986
    """
1987
    new_node = self.new_node
1988
    node = new_node.name
1989

    
1990
    # check connectivity
1991
    result = self.rpc.call_version([node])[node]
1992
    result.Raise()
1993
    if result.data:
1994
      if constants.PROTOCOL_VERSION == result.data:
1995
        logging.info("Communication to node %s fine, sw version %s match",
1996
                     node, result.data)
1997
      else:
1998
        raise errors.OpExecError("Version mismatch master version %s,"
1999
                                 " node version %s" %
2000
                                 (constants.PROTOCOL_VERSION, result.data))
2001
    else:
2002
      raise errors.OpExecError("Cannot get version from the new node")
2003

    
2004
    # setup ssh on node
2005
    logging.info("Copy ssh key to node %s", node)
2006
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2007
    keyarray = []
2008
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2009
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2010
                priv_key, pub_key]
2011

    
2012
    for i in keyfiles:
2013
      f = open(i, 'r')
2014
      try:
2015
        keyarray.append(f.read())
2016
      finally:
2017
        f.close()
2018

    
2019
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2020
                                    keyarray[2],
2021
                                    keyarray[3], keyarray[4], keyarray[5])
2022

    
2023
    if result.failed or not result.data:
2024
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2025

    
2026
    # Add node to our /etc/hosts, and add key to known_hosts
2027
    utils.AddHostToEtcHosts(new_node.name)
2028

    
2029
    if new_node.secondary_ip != new_node.primary_ip:
2030
      result = self.rpc.call_node_has_ip_address(new_node.name,
2031
                                                 new_node.secondary_ip)
2032
      if result.failed or not result.data:
2033
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2034
                                 " you gave (%s). Please fix and re-run this"
2035
                                 " command." % new_node.secondary_ip)
2036

    
2037
    node_verify_list = [self.cfg.GetMasterNode()]
2038
    node_verify_param = {
2039
      'nodelist': [node],
2040
      # TODO: do a node-net-test as well?
2041
    }
2042

    
2043
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2044
                                       self.cfg.GetClusterName())
2045
    for verifier in node_verify_list:
2046
      if result[verifier].failed or not result[verifier].data:
2047
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2048
                                 " for remote verification" % verifier)
2049
      if result[verifier].data['nodelist']:
2050
        for failed in result[verifier].data['nodelist']:
2051
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2052
                      (verifier, result[verifier]['nodelist'][failed]))
2053
        raise errors.OpExecError("ssh/hostname verification failed.")
2054

    
2055
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2056
    # including the node just added
2057
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2058
    dist_nodes = self.cfg.GetNodeList()
2059
    if not self.op.readd:
2060
      dist_nodes.append(node)
2061
    if myself.name in dist_nodes:
2062
      dist_nodes.remove(myself.name)
2063

    
2064
    logging.debug("Copying hosts and known_hosts to all nodes")
2065
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2066
      result = self.rpc.call_upload_file(dist_nodes, fname)
2067
      for to_node, to_result in result.iteritems():
2068
        if to_result.failed or not to_result.data:
2069
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2070

    
2071
    to_copy = []
2072
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2073
      to_copy.append(constants.VNC_PASSWORD_FILE)
2074
    for fname in to_copy:
2075
      result = self.rpc.call_upload_file([node], fname)
2076
      if result[node].failed or not result[node]:
2077
        logging.error("Could not copy file %s to node %s", fname, node)
2078

    
2079
    if self.op.readd:
2080
      self.context.ReaddNode(new_node)
2081
    else:
2082
      self.context.AddNode(new_node)
2083

    
2084

    
2085
class LUSetNodeParams(LogicalUnit):
2086
  """Modifies the parameters of a node.
2087

2088
  """
2089
  HPATH = "node-modify"
2090
  HTYPE = constants.HTYPE_NODE
2091
  _OP_REQP = ["node_name"]
2092
  REQ_BGL = False
2093

    
2094
  def CheckArguments(self):
2095
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2096
    if node_name is None:
2097
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2098
    self.op.node_name = node_name
2099
    if not hasattr(self.op, 'master_candidate'):
2100
      raise errors.OpPrereqError("Please pass at least one modification")
2101
    self.op.master_candidate = bool(self.op.master_candidate)
2102

    
2103
  def ExpandNames(self):
2104
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2105

    
2106
  def BuildHooksEnv(self):
2107
    """Build hooks env.
2108

2109
    This runs on the master node.
2110

2111
    """
2112
    env = {
2113
      "OP_TARGET": self.op.node_name,
2114
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2115
      }
2116
    nl = [self.cfg.GetMasterNode(),
2117
          self.op.node_name]
2118
    return env, nl, nl
2119

    
2120
  def CheckPrereq(self):
2121
    """Check prerequisites.
2122

2123
    This only checks the instance list against the existing names.
2124

2125
    """
2126
    force = self.force = self.op.force
2127

    
2128
    if self.op.master_candidate == False:
2129
      if self.op.node_name == self.cfg.GetMasterNode():
2130
        raise errors.OpPrereqError("The master node has to be a"
2131
                                   " master candidate")
2132
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2133
      node_info = self.cfg.GetAllNodesInfo().values()
2134
      num_candidates = len([node for node in node_info
2135
                            if node.master_candidate])
2136
      if num_candidates <= cp_size:
2137
        msg = ("Not enough master candidates (desired"
2138
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2139
        if force:
2140
          self.LogWarning(msg)
2141
        else:
2142
          raise errors.OpPrereqError(msg)
2143

    
2144
    return
2145

    
2146
  def Exec(self, feedback_fn):
2147
    """Modifies a node.
2148

2149
    """
2150
    node = self.cfg.GetNodeInfo(self.op.node_name)
2151

    
2152
    result = []
2153

    
2154
    if self.op.master_candidate is not None:
2155
      node.master_candidate = self.op.master_candidate
2156
      result.append(("master_candidate", str(self.op.master_candidate)))
2157

    
2158
    # this will trigger configuration file update, if needed
2159
    self.cfg.Update(node)
2160
    # this will trigger job queue propagation or cleanup
2161
    if self.op.node_name != self.cfg.GetMasterNode():
2162
      self.context.ReaddNode(node)
2163

    
2164
    return result
2165

    
2166

    
2167
class LUQueryClusterInfo(NoHooksLU):
2168
  """Query cluster configuration.
2169

2170
  """
2171
  _OP_REQP = []
2172
  REQ_BGL = False
2173

    
2174
  def ExpandNames(self):
2175
    self.needed_locks = {}
2176

    
2177
  def CheckPrereq(self):
2178
    """No prerequsites needed for this LU.
2179

2180
    """
2181
    pass
2182

    
2183
  def Exec(self, feedback_fn):
2184
    """Return cluster config.
2185

2186
    """
2187
    cluster = self.cfg.GetClusterInfo()
2188
    result = {
2189
      "software_version": constants.RELEASE_VERSION,
2190
      "protocol_version": constants.PROTOCOL_VERSION,
2191
      "config_version": constants.CONFIG_VERSION,
2192
      "os_api_version": constants.OS_API_VERSION,
2193
      "export_version": constants.EXPORT_VERSION,
2194
      "architecture": (platform.architecture()[0], platform.machine()),
2195
      "name": cluster.cluster_name,
2196
      "master": cluster.master_node,
2197
      "default_hypervisor": cluster.default_hypervisor,
2198
      "enabled_hypervisors": cluster.enabled_hypervisors,
2199
      "hvparams": cluster.hvparams,
2200
      "beparams": cluster.beparams,
2201
      "candidate_pool_size": cluster.candidate_pool_size,
2202
      }
2203

    
2204
    return result
2205

    
2206

    
2207
class LUQueryConfigValues(NoHooksLU):
2208
  """Return configuration values.
2209

2210
  """
2211
  _OP_REQP = []
2212
  REQ_BGL = False
2213
  _FIELDS_DYNAMIC = utils.FieldSet()
2214
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2215

    
2216
  def ExpandNames(self):
2217
    self.needed_locks = {}
2218

    
2219
    _CheckOutputFields(static=self._FIELDS_STATIC,
2220
                       dynamic=self._FIELDS_DYNAMIC,
2221
                       selected=self.op.output_fields)
2222

    
2223
  def CheckPrereq(self):
2224
    """No prerequisites.
2225

2226
    """
2227
    pass
2228

    
2229
  def Exec(self, feedback_fn):
2230
    """Dump a representation of the cluster config to the standard output.
2231

2232
    """
2233
    values = []
2234
    for field in self.op.output_fields:
2235
      if field == "cluster_name":
2236
        entry = self.cfg.GetClusterName()
2237
      elif field == "master_node":
2238
        entry = self.cfg.GetMasterNode()
2239
      elif field == "drain_flag":
2240
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2241
      else:
2242
        raise errors.ParameterError(field)
2243
      values.append(entry)
2244
    return values
2245

    
2246

    
2247
class LUActivateInstanceDisks(NoHooksLU):
2248
  """Bring up an instance's disks.
2249

2250
  """
2251
  _OP_REQP = ["instance_name"]
2252
  REQ_BGL = False
2253

    
2254
  def ExpandNames(self):
2255
    self._ExpandAndLockInstance()
2256
    self.needed_locks[locking.LEVEL_NODE] = []
2257
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2258

    
2259
  def DeclareLocks(self, level):
2260
    if level == locking.LEVEL_NODE:
2261
      self._LockInstancesNodes()
2262

    
2263
  def CheckPrereq(self):
2264
    """Check prerequisites.
2265

2266
    This checks that the instance is in the cluster.
2267

2268
    """
2269
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2270
    assert self.instance is not None, \
2271
      "Cannot retrieve locked instance %s" % self.op.instance_name
2272

    
2273
  def Exec(self, feedback_fn):
2274
    """Activate the disks.
2275

2276
    """
2277
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2278
    if not disks_ok:
2279
      raise errors.OpExecError("Cannot activate block devices")
2280

    
2281
    return disks_info
2282

    
2283

    
2284
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2285
  """Prepare the block devices for an instance.
2286

2287
  This sets up the block devices on all nodes.
2288

2289
  @type lu: L{LogicalUnit}
2290
  @param lu: the logical unit on whose behalf we execute
2291
  @type instance: L{objects.Instance}
2292
  @param instance: the instance for whose disks we assemble
2293
  @type ignore_secondaries: boolean
2294
  @param ignore_secondaries: if true, errors on secondary nodes
2295
      won't result in an error return from the function
2296
  @return: False if the operation failed, otherwise a list of
2297
      (host, instance_visible_name, node_visible_name)
2298
      with the mapping from node devices to instance devices
2299

2300
  """
2301
  device_info = []
2302
  disks_ok = True
2303
  iname = instance.name
2304
  # With the two passes mechanism we try to reduce the window of
2305
  # opportunity for the race condition of switching DRBD to primary
2306
  # before handshaking occured, but we do not eliminate it
2307

    
2308
  # The proper fix would be to wait (with some limits) until the
2309
  # connection has been made and drbd transitions from WFConnection
2310
  # into any other network-connected state (Connected, SyncTarget,
2311
  # SyncSource, etc.)
2312

    
2313
  # 1st pass, assemble on all nodes in secondary mode
2314
  for inst_disk in instance.disks:
2315
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2316
      lu.cfg.SetDiskID(node_disk, node)
2317
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2318
      if result.failed or not result:
2319
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2320
                           " (is_primary=False, pass=1)",
2321
                           inst_disk.iv_name, node)
2322
        if not ignore_secondaries:
2323
          disks_ok = False
2324

    
2325
  # FIXME: race condition on drbd migration to primary
2326

    
2327
  # 2nd pass, do only the primary node
2328
  for inst_disk in instance.disks:
2329
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2330
      if node != instance.primary_node:
2331
        continue
2332
      lu.cfg.SetDiskID(node_disk, node)
2333
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2334
      if result.failed or not result:
2335
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2336
                           " (is_primary=True, pass=2)",
2337
                           inst_disk.iv_name, node)
2338
        disks_ok = False
2339
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2340

    
2341
  # leave the disks configured for the primary node
2342
  # this is a workaround that would be fixed better by
2343
  # improving the logical/physical id handling
2344
  for disk in instance.disks:
2345
    lu.cfg.SetDiskID(disk, instance.primary_node)
2346

    
2347
  return disks_ok, device_info
2348

    
2349

    
2350
def _StartInstanceDisks(lu, instance, force):
2351
  """Start the disks of an instance.
2352

2353
  """
2354
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2355
                                           ignore_secondaries=force)
2356
  if not disks_ok:
2357
    _ShutdownInstanceDisks(lu, instance)
2358
    if force is not None and not force:
2359
      lu.proc.LogWarning("", hint="If the message above refers to a"
2360
                         " secondary node,"
2361
                         " you can retry the operation using '--force'.")
2362
    raise errors.OpExecError("Disk consistency error")
2363

    
2364

    
2365
class LUDeactivateInstanceDisks(NoHooksLU):
2366
  """Shutdown an instance's disks.
2367

2368
  """
2369
  _OP_REQP = ["instance_name"]
2370
  REQ_BGL = False
2371

    
2372
  def ExpandNames(self):
2373
    self._ExpandAndLockInstance()
2374
    self.needed_locks[locking.LEVEL_NODE] = []
2375
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2376

    
2377
  def DeclareLocks(self, level):
2378
    if level == locking.LEVEL_NODE:
2379
      self._LockInstancesNodes()
2380

    
2381
  def CheckPrereq(self):
2382
    """Check prerequisites.
2383

2384
    This checks that the instance is in the cluster.
2385

2386
    """
2387
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2388
    assert self.instance is not None, \
2389
      "Cannot retrieve locked instance %s" % self.op.instance_name
2390

    
2391
  def Exec(self, feedback_fn):
2392
    """Deactivate the disks
2393

2394
    """
2395
    instance = self.instance
2396
    _SafeShutdownInstanceDisks(self, instance)
2397

    
2398

    
2399
def _SafeShutdownInstanceDisks(lu, instance):
2400
  """Shutdown block devices of an instance.
2401

2402
  This function checks if an instance is running, before calling
2403
  _ShutdownInstanceDisks.
2404

2405
  """
2406
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2407
                                      [instance.hypervisor])
2408
  ins_l = ins_l[instance.primary_node]
2409
  if ins_l.failed or not isinstance(ins_l.data, list):
2410
    raise errors.OpExecError("Can't contact node '%s'" %
2411
                             instance.primary_node)
2412

    
2413
  if instance.name in ins_l.data:
2414
    raise errors.OpExecError("Instance is running, can't shutdown"
2415
                             " block devices.")
2416

    
2417
  _ShutdownInstanceDisks(lu, instance)
2418

    
2419

    
2420
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2421
  """Shutdown block devices of an instance.
2422

2423
  This does the shutdown on all nodes of the instance.
2424

2425
  If the ignore_primary is false, errors on the primary node are
2426
  ignored.
2427

2428
  """
2429
  result = True
2430
  for disk in instance.disks:
2431
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2432
      lu.cfg.SetDiskID(top_disk, node)
2433
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2434
      if result.failed or not result.data:
2435
        logging.error("Could not shutdown block device %s on node %s",
2436
                      disk.iv_name, node)
2437
        if not ignore_primary or node != instance.primary_node:
2438
          result = False
2439
  return result
2440

    
2441

    
2442
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2443
  """Checks if a node has enough free memory.
2444

2445
  This function check if a given node has the needed amount of free
2446
  memory. In case the node has less memory or we cannot get the
2447
  information from the node, this function raise an OpPrereqError
2448
  exception.
2449

2450
  @type lu: C{LogicalUnit}
2451
  @param lu: a logical unit from which we get configuration data
2452
  @type node: C{str}
2453
  @param node: the node to check
2454
  @type reason: C{str}
2455
  @param reason: string to use in the error message
2456
  @type requested: C{int}
2457
  @param requested: the amount of memory in MiB to check for
2458
  @type hypervisor: C{str}
2459
  @param hypervisor: the hypervisor to ask for memory stats
2460
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2461
      we cannot check the node
2462

2463
  """
2464
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2465
  nodeinfo[node].Raise()
2466
  free_mem = nodeinfo[node].data.get('memory_free')
2467
  if not isinstance(free_mem, int):
2468
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2469
                             " was '%s'" % (node, free_mem))
2470
  if requested > free_mem:
2471
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2472
                             " needed %s MiB, available %s MiB" %
2473
                             (node, reason, requested, free_mem))
2474

    
2475

    
2476
class LUStartupInstance(LogicalUnit):
2477
  """Starts an instance.
2478

2479
  """
2480
  HPATH = "instance-start"
2481
  HTYPE = constants.HTYPE_INSTANCE
2482
  _OP_REQP = ["instance_name", "force"]
2483
  REQ_BGL = False
2484

    
2485
  def ExpandNames(self):
2486
    self._ExpandAndLockInstance()
2487

    
2488
  def BuildHooksEnv(self):
2489
    """Build hooks env.
2490

2491
    This runs on master, primary and secondary nodes of the instance.
2492

2493
    """
2494
    env = {
2495
      "FORCE": self.op.force,
2496
      }
2497
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2498
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2499
          list(self.instance.secondary_nodes))
2500
    return env, nl, nl
2501

    
2502
  def CheckPrereq(self):
2503
    """Check prerequisites.
2504

2505
    This checks that the instance is in the cluster.
2506

2507
    """
2508
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2509
    assert self.instance is not None, \
2510
      "Cannot retrieve locked instance %s" % self.op.instance_name
2511

    
2512
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2513
    # check bridges existance
2514
    _CheckInstanceBridgesExist(self, instance)
2515

    
2516
    _CheckNodeFreeMemory(self, instance.primary_node,
2517
                         "starting instance %s" % instance.name,
2518
                         bep[constants.BE_MEMORY], instance.hypervisor)
2519

    
2520
  def Exec(self, feedback_fn):
2521
    """Start the instance.
2522

2523
    """
2524
    instance = self.instance
2525
    force = self.op.force
2526
    extra_args = getattr(self.op, "extra_args", "")
2527

    
2528
    self.cfg.MarkInstanceUp(instance.name)
2529

    
2530
    node_current = instance.primary_node
2531

    
2532
    _StartInstanceDisks(self, instance, force)
2533

    
2534
    result = self.rpc.call_instance_start(node_current, instance, extra_args)
2535
    if result.failed or not result.data:
2536
      _ShutdownInstanceDisks(self, instance)
2537
      raise errors.OpExecError("Could not start instance")
2538

    
2539

    
2540
class LURebootInstance(LogicalUnit):
2541
  """Reboot an instance.
2542

2543
  """
2544
  HPATH = "instance-reboot"
2545
  HTYPE = constants.HTYPE_INSTANCE
2546
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2547
  REQ_BGL = False
2548

    
2549
  def ExpandNames(self):
2550
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2551
                                   constants.INSTANCE_REBOOT_HARD,
2552
                                   constants.INSTANCE_REBOOT_FULL]:
2553
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2554
                                  (constants.INSTANCE_REBOOT_SOFT,
2555
                                   constants.INSTANCE_REBOOT_HARD,
2556
                                   constants.INSTANCE_REBOOT_FULL))
2557
    self._ExpandAndLockInstance()
2558

    
2559
  def BuildHooksEnv(self):
2560
    """Build hooks env.
2561

2562
    This runs on master, primary and secondary nodes of the instance.
2563

2564
    """
2565
    env = {
2566
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2567
      }
2568
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2569
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2570
          list(self.instance.secondary_nodes))
2571
    return env, nl, nl
2572

    
2573
  def CheckPrereq(self):
2574
    """Check prerequisites.
2575

2576
    This checks that the instance is in the cluster.
2577

2578
    """
2579
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2580
    assert self.instance is not None, \
2581
      "Cannot retrieve locked instance %s" % self.op.instance_name
2582

    
2583
    # check bridges existance
2584
    _CheckInstanceBridgesExist(self, instance)
2585

    
2586
  def Exec(self, feedback_fn):
2587
    """Reboot the instance.
2588

2589
    """
2590
    instance = self.instance
2591
    ignore_secondaries = self.op.ignore_secondaries
2592
    reboot_type = self.op.reboot_type
2593
    extra_args = getattr(self.op, "extra_args", "")
2594

    
2595
    node_current = instance.primary_node
2596

    
2597
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2598
                       constants.INSTANCE_REBOOT_HARD]:
2599
      result = self.rpc.call_instance_reboot(node_current, instance,
2600
                                             reboot_type, extra_args)
2601
      if result.failed or not result.data:
2602
        raise errors.OpExecError("Could not reboot instance")
2603
    else:
2604
      if not self.rpc.call_instance_shutdown(node_current, instance):
2605
        raise errors.OpExecError("could not shutdown instance for full reboot")
2606
      _ShutdownInstanceDisks(self, instance)
2607
      _StartInstanceDisks(self, instance, ignore_secondaries)
2608
      result = self.rpc.call_instance_start(node_current, instance, extra_args)
2609
      if result.failed or not result.data:
2610
        _ShutdownInstanceDisks(self, instance)
2611
        raise errors.OpExecError("Could not start instance for full reboot")
2612

    
2613
    self.cfg.MarkInstanceUp(instance.name)
2614

    
2615

    
2616
class LUShutdownInstance(LogicalUnit):
2617
  """Shutdown an instance.
2618

2619
  """
2620
  HPATH = "instance-stop"
2621
  HTYPE = constants.HTYPE_INSTANCE
2622
  _OP_REQP = ["instance_name"]
2623
  REQ_BGL = False
2624

    
2625
  def ExpandNames(self):
2626
    self._ExpandAndLockInstance()
2627

    
2628
  def BuildHooksEnv(self):
2629
    """Build hooks env.
2630

2631
    This runs on master, primary and secondary nodes of the instance.
2632

2633
    """
2634
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2635
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2636
          list(self.instance.secondary_nodes))
2637
    return env, nl, nl
2638

    
2639
  def CheckPrereq(self):
2640
    """Check prerequisites.
2641

2642
    This checks that the instance is in the cluster.
2643

2644
    """
2645
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2646
    assert self.instance is not None, \
2647
      "Cannot retrieve locked instance %s" % self.op.instance_name
2648

    
2649
  def Exec(self, feedback_fn):
2650
    """Shutdown the instance.
2651

2652
    """
2653
    instance = self.instance
2654
    node_current = instance.primary_node
2655
    self.cfg.MarkInstanceDown(instance.name)
2656
    result = self.rpc.call_instance_shutdown(node_current, instance)
2657
    if result.failed or not result.data:
2658
      self.proc.LogWarning("Could not shutdown instance")
2659

    
2660
    _ShutdownInstanceDisks(self, instance)
2661

    
2662

    
2663
class LUReinstallInstance(LogicalUnit):
2664
  """Reinstall an instance.
2665

2666
  """
2667
  HPATH = "instance-reinstall"
2668
  HTYPE = constants.HTYPE_INSTANCE
2669
  _OP_REQP = ["instance_name"]
2670
  REQ_BGL = False
2671

    
2672
  def ExpandNames(self):
2673
    self._ExpandAndLockInstance()
2674

    
2675
  def BuildHooksEnv(self):
2676
    """Build hooks env.
2677

2678
    This runs on master, primary and secondary nodes of the instance.
2679

2680
    """
2681
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2682
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2683
          list(self.instance.secondary_nodes))
2684
    return env, nl, nl
2685

    
2686
  def CheckPrereq(self):
2687
    """Check prerequisites.
2688

2689
    This checks that the instance is in the cluster and is not running.
2690

2691
    """
2692
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2693
    assert instance is not None, \
2694
      "Cannot retrieve locked instance %s" % self.op.instance_name
2695

    
2696
    if instance.disk_template == constants.DT_DISKLESS:
2697
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2698
                                 self.op.instance_name)
2699
    if instance.status != "down":
2700
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2701
                                 self.op.instance_name)
2702
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2703
                                              instance.name,
2704
                                              instance.hypervisor)
2705
    if remote_info.failed or remote_info.data:
2706
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2707
                                 (self.op.instance_name,
2708
                                  instance.primary_node))
2709

    
2710
    self.op.os_type = getattr(self.op, "os_type", None)
2711
    if self.op.os_type is not None:
2712
      # OS verification
2713
      pnode = self.cfg.GetNodeInfo(
2714
        self.cfg.ExpandNodeName(instance.primary_node))
2715
      if pnode is None:
2716
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2717
                                   self.op.pnode)
2718
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2719
      result.Raise()
2720
      if not isinstance(result.data, objects.OS):
2721
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2722
                                   " primary node"  % self.op.os_type)
2723

    
2724
    self.instance = instance
2725

    
2726
  def Exec(self, feedback_fn):
2727
    """Reinstall the instance.
2728

2729
    """
2730
    inst = self.instance
2731

    
2732
    if self.op.os_type is not None:
2733
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2734
      inst.os = self.op.os_type
2735
      self.cfg.Update(inst)
2736

    
2737
    _StartInstanceDisks(self, inst, None)
2738
    try:
2739
      feedback_fn("Running the instance OS create scripts...")
2740
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2741
      result.Raise()
2742
      if not result.data:
2743
        raise errors.OpExecError("Could not install OS for instance %s"
2744
                                 " on node %s" %
2745
                                 (inst.name, inst.primary_node))
2746
    finally:
2747
      _ShutdownInstanceDisks(self, inst)
2748

    
2749

    
2750
class LURenameInstance(LogicalUnit):
2751
  """Rename an instance.
2752

2753
  """
2754
  HPATH = "instance-rename"
2755
  HTYPE = constants.HTYPE_INSTANCE
2756
  _OP_REQP = ["instance_name", "new_name"]
2757

    
2758
  def BuildHooksEnv(self):
2759
    """Build hooks env.
2760

2761
    This runs on master, primary and secondary nodes of the instance.
2762

2763
    """
2764
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2765
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2766
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2767
          list(self.instance.secondary_nodes))
2768
    return env, nl, nl
2769

    
2770
  def CheckPrereq(self):
2771
    """Check prerequisites.
2772

2773
    This checks that the instance is in the cluster and is not running.
2774

2775
    """
2776
    instance = self.cfg.GetInstanceInfo(
2777
      self.cfg.ExpandInstanceName(self.op.instance_name))
2778
    if instance is None:
2779
      raise errors.OpPrereqError("Instance '%s' not known" %
2780
                                 self.op.instance_name)
2781
    if instance.status != "down":
2782
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2783
                                 self.op.instance_name)
2784
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2785
                                              instance.name,
2786
                                              instance.hypervisor)
2787
    remote_info.Raise()
2788
    if remote_info.data:
2789
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2790
                                 (self.op.instance_name,
2791
                                  instance.primary_node))
2792
    self.instance = instance
2793

    
2794
    # new name verification
2795
    name_info = utils.HostInfo(self.op.new_name)
2796

    
2797
    self.op.new_name = new_name = name_info.name
2798
    instance_list = self.cfg.GetInstanceList()
2799
    if new_name in instance_list:
2800
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2801
                                 new_name)
2802

    
2803
    if not getattr(self.op, "ignore_ip", False):
2804
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2805
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2806
                                   (name_info.ip, new_name))
2807

    
2808

    
2809
  def Exec(self, feedback_fn):
2810
    """Reinstall the instance.
2811

2812
    """
2813
    inst = self.instance
2814
    old_name = inst.name
2815

    
2816
    if inst.disk_template == constants.DT_FILE:
2817
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2818

    
2819
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2820
    # Change the instance lock. This is definitely safe while we hold the BGL
2821
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2822
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2823

    
2824
    # re-read the instance from the configuration after rename
2825
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2826

    
2827
    if inst.disk_template == constants.DT_FILE:
2828
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2829
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2830
                                                     old_file_storage_dir,
2831
                                                     new_file_storage_dir)
2832
      result.Raise()
2833
      if not result.data:
2834
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2835
                                 " directory '%s' to '%s' (but the instance"
2836
                                 " has been renamed in Ganeti)" % (
2837
                                 inst.primary_node, old_file_storage_dir,
2838
                                 new_file_storage_dir))
2839

    
2840
      if not result.data[0]:
2841
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2842
                                 " (but the instance has been renamed in"
2843
                                 " Ganeti)" % (old_file_storage_dir,
2844
                                               new_file_storage_dir))
2845

    
2846
    _StartInstanceDisks(self, inst, None)
2847
    try:
2848
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2849
                                                 old_name)
2850
      if result.failed or not result.data:
2851
        msg = ("Could not run OS rename script for instance %s on node %s"
2852
               " (but the instance has been renamed in Ganeti)" %
2853
               (inst.name, inst.primary_node))
2854
        self.proc.LogWarning(msg)
2855
    finally:
2856
      _ShutdownInstanceDisks(self, inst)
2857

    
2858

    
2859
class LURemoveInstance(LogicalUnit):
2860
  """Remove an instance.
2861

2862
  """
2863
  HPATH = "instance-remove"
2864
  HTYPE = constants.HTYPE_INSTANCE
2865
  _OP_REQP = ["instance_name", "ignore_failures"]
2866
  REQ_BGL = False
2867

    
2868
  def ExpandNames(self):
2869
    self._ExpandAndLockInstance()
2870
    self.needed_locks[locking.LEVEL_NODE] = []
2871
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2872

    
2873
  def DeclareLocks(self, level):
2874
    if level == locking.LEVEL_NODE:
2875
      self._LockInstancesNodes()
2876

    
2877
  def BuildHooksEnv(self):
2878
    """Build hooks env.
2879

2880
    This runs on master, primary and secondary nodes of the instance.
2881

2882
    """
2883
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2884
    nl = [self.cfg.GetMasterNode()]
2885
    return env, nl, nl
2886

    
2887
  def CheckPrereq(self):
2888
    """Check prerequisites.
2889

2890
    This checks that the instance is in the cluster.
2891

2892
    """
2893
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2894
    assert self.instance is not None, \
2895
      "Cannot retrieve locked instance %s" % self.op.instance_name
2896

    
2897
  def Exec(self, feedback_fn):
2898
    """Remove the instance.
2899

2900
    """
2901
    instance = self.instance
2902
    logging.info("Shutting down instance %s on node %s",
2903
                 instance.name, instance.primary_node)
2904

    
2905
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2906
    if result.failed or not result.data:
2907
      if self.op.ignore_failures:
2908
        feedback_fn("Warning: can't shutdown instance")
2909
      else:
2910
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2911
                                 (instance.name, instance.primary_node))
2912

    
2913
    logging.info("Removing block devices for instance %s", instance.name)
2914

    
2915
    if not _RemoveDisks(self, instance):
2916
      if self.op.ignore_failures:
2917
        feedback_fn("Warning: can't remove instance's disks")
2918
      else:
2919
        raise errors.OpExecError("Can't remove instance's disks")
2920

    
2921
    logging.info("Removing instance %s out of cluster config", instance.name)
2922

    
2923
    self.cfg.RemoveInstance(instance.name)
2924
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2925

    
2926

    
2927
class LUQueryInstances(NoHooksLU):
2928
  """Logical unit for querying instances.
2929

2930
  """
2931
  _OP_REQP = ["output_fields", "names"]
2932
  REQ_BGL = False
2933
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2934
                                    "admin_state", "admin_ram",
2935
                                    "disk_template", "ip", "mac", "bridge",
2936
                                    "sda_size", "sdb_size", "vcpus", "tags",
2937
                                    "network_port", "beparams",
2938
                                    "(disk).(size)/([0-9]+)",
2939
                                    "(disk).(sizes)",
2940
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2941
                                    "(nic).(macs|ips|bridges)",
2942
                                    "(disk|nic).(count)",
2943
                                    "serial_no", "hypervisor", "hvparams",] +
2944
                                  ["hv/%s" % name
2945
                                   for name in constants.HVS_PARAMETERS] +
2946
                                  ["be/%s" % name
2947
                                   for name in constants.BES_PARAMETERS])
2948
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2949

    
2950

    
2951
  def ExpandNames(self):
2952
    _CheckOutputFields(static=self._FIELDS_STATIC,
2953
                       dynamic=self._FIELDS_DYNAMIC,
2954
                       selected=self.op.output_fields)
2955

    
2956
    self.needed_locks = {}
2957
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2958
    self.share_locks[locking.LEVEL_NODE] = 1
2959

    
2960
    if self.op.names:
2961
      self.wanted = _GetWantedInstances(self, self.op.names)
2962
    else:
2963
      self.wanted = locking.ALL_SET
2964

    
2965
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2966
    if self.do_locking:
2967
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2968
      self.needed_locks[locking.LEVEL_NODE] = []
2969
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2970

    
2971
  def DeclareLocks(self, level):
2972
    if level == locking.LEVEL_NODE and self.do_locking:
2973
      self._LockInstancesNodes()
2974

    
2975
  def CheckPrereq(self):
2976
    """Check prerequisites.
2977

2978
    """
2979
    pass
2980

    
2981
  def Exec(self, feedback_fn):
2982
    """Computes the list of nodes and their attributes.
2983

2984
    """
2985
    all_info = self.cfg.GetAllInstancesInfo()
2986
    if self.do_locking:
2987
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2988
    elif self.wanted != locking.ALL_SET:
2989
      instance_names = self.wanted
2990
      missing = set(instance_names).difference(all_info.keys())
2991
      if missing:
2992
        raise errors.OpExecError(
2993
          "Some instances were removed before retrieving their data: %s"
2994
          % missing)
2995
    else:
2996
      instance_names = all_info.keys()
2997

    
2998
    instance_names = utils.NiceSort(instance_names)
2999
    instance_list = [all_info[iname] for iname in instance_names]
3000

    
3001
    # begin data gathering
3002

    
3003
    nodes = frozenset([inst.primary_node for inst in instance_list])
3004
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3005

    
3006
    bad_nodes = []
3007
    if self.do_locking:
3008
      live_data = {}
3009
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3010
      for name in nodes:
3011
        result = node_data[name]
3012
        if result.failed:
3013
          bad_nodes.append(name)
3014
        else:
3015
          if result.data:
3016
            live_data.update(result.data)
3017
            # else no instance is alive
3018
    else:
3019
      live_data = dict([(name, {}) for name in instance_names])
3020

    
3021
    # end data gathering
3022

    
3023
    HVPREFIX = "hv/"
3024
    BEPREFIX = "be/"
3025
    output = []
3026
    for instance in instance_list:
3027
      iout = []
3028
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3029
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3030
      for field in self.op.output_fields:
3031
        st_match = self._FIELDS_STATIC.Matches(field)
3032
        if field == "name":
3033
          val = instance.name
3034
        elif field == "os":
3035
          val = instance.os
3036
        elif field == "pnode":
3037
          val = instance.primary_node
3038
        elif field == "snodes":
3039
          val = list(instance.secondary_nodes)
3040
        elif field == "admin_state":
3041
          val = (instance.status != "down")
3042
        elif field == "oper_state":
3043
          if instance.primary_node in bad_nodes:
3044
            val = None
3045
          else:
3046
            val = bool(live_data.get(instance.name))
3047
        elif field == "status":
3048
          if instance.primary_node in bad_nodes:
3049
            val = "ERROR_nodedown"
3050
          else:
3051
            running = bool(live_data.get(instance.name))
3052
            if running:
3053
              if instance.status != "down":
3054
                val = "running"
3055
              else:
3056
                val = "ERROR_up"
3057
            else:
3058
              if instance.status != "down":
3059
                val = "ERROR_down"
3060
              else:
3061
                val = "ADMIN_down"
3062
        elif field == "oper_ram":
3063
          if instance.primary_node in bad_nodes:
3064
            val = None
3065
          elif instance.name in live_data:
3066
            val = live_data[instance.name].get("memory", "?")
3067
          else:
3068
            val = "-"
3069
        elif field == "disk_template":
3070
          val = instance.disk_template
3071
        elif field == "ip":
3072
          val = instance.nics[0].ip
3073
        elif field == "bridge":
3074
          val = instance.nics[0].bridge
3075
        elif field == "mac":
3076
          val = instance.nics[0].mac
3077
        elif field == "sda_size" or field == "sdb_size":
3078
          idx = ord(field[2]) - ord('a')
3079
          try:
3080
            val = instance.FindDisk(idx).size
3081
          except errors.OpPrereqError:
3082
            val = None
3083
        elif field == "tags":
3084
          val = list(instance.GetTags())
3085
        elif field == "serial_no":
3086
          val = instance.serial_no
3087
        elif field == "network_port":
3088
          val = instance.network_port
3089
        elif field == "hypervisor":
3090
          val = instance.hypervisor
3091
        elif field == "hvparams":
3092
          val = i_hv
3093
        elif (field.startswith(HVPREFIX) and
3094
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3095
          val = i_hv.get(field[len(HVPREFIX):], None)
3096
        elif field == "beparams":
3097
          val = i_be
3098
        elif (field.startswith(BEPREFIX) and
3099
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3100
          val = i_be.get(field[len(BEPREFIX):], None)
3101
        elif st_match and st_match.groups():
3102
          # matches a variable list
3103
          st_groups = st_match.groups()
3104
          if st_groups and st_groups[0] == "disk":
3105
            if st_groups[1] == "count":
3106
              val = len(instance.disks)
3107
            elif st_groups[1] == "sizes":
3108
              val = [disk.size for disk in instance.disks]
3109
            elif st_groups[1] == "size":
3110
              try:
3111
                val = instance.FindDisk(st_groups[2]).size
3112
              except errors.OpPrereqError:
3113
                val = None
3114
            else:
3115
              assert False, "Unhandled disk parameter"
3116
          elif st_groups[0] == "nic":
3117
            if st_groups[1] == "count":
3118
              val = len(instance.nics)
3119
            elif st_groups[1] == "macs":
3120
              val = [nic.mac for nic in instance.nics]
3121
            elif st_groups[1] == "ips":
3122
              val = [nic.ip for nic in instance.nics]
3123
            elif st_groups[1] == "bridges":
3124
              val = [nic.bridge for nic in instance.nics]
3125
            else:
3126
              # index-based item
3127
              nic_idx = int(st_groups[2])
3128
              if nic_idx >= len(instance.nics):
3129
                val = None
3130
              else:
3131
                if st_groups[1] == "mac":
3132
                  val = instance.nics[nic_idx].mac
3133
                elif st_groups[1] == "ip":
3134
                  val = instance.nics[nic_idx].ip
3135
                elif st_groups[1] == "bridge":
3136
                  val = instance.nics[nic_idx].bridge
3137
                else:
3138
                  assert False, "Unhandled NIC parameter"
3139
          else:
3140
            assert False, "Unhandled variable parameter"
3141
        else:
3142
          raise errors.ParameterError(field)
3143
        iout.append(val)
3144
      output.append(iout)
3145

    
3146
    return output
3147

    
3148

    
3149
class LUFailoverInstance(LogicalUnit):
3150
  """Failover an instance.
3151

3152
  """
3153
  HPATH = "instance-failover"
3154
  HTYPE = constants.HTYPE_INSTANCE
3155
  _OP_REQP = ["instance_name", "ignore_consistency"]
3156
  REQ_BGL = False
3157

    
3158
  def ExpandNames(self):
3159
    self._ExpandAndLockInstance()
3160
    self.needed_locks[locking.LEVEL_NODE] = []
3161
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3162

    
3163
  def DeclareLocks(self, level):
3164
    if level == locking.LEVEL_NODE:
3165
      self._LockInstancesNodes()
3166

    
3167
  def BuildHooksEnv(self):
3168
    """Build hooks env.
3169

3170
    This runs on master, primary and secondary nodes of the instance.
3171

3172
    """
3173
    env = {
3174
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3175
      }
3176
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3177
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3178
    return env, nl, nl
3179

    
3180
  def CheckPrereq(self):
3181
    """Check prerequisites.
3182

3183
    This checks that the instance is in the cluster.
3184

3185
    """
3186
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3187
    assert self.instance is not None, \
3188
      "Cannot retrieve locked instance %s" % self.op.instance_name
3189

    
3190
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3191
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3192
      raise errors.OpPrereqError("Instance's disk layout is not"
3193
                                 " network mirrored, cannot failover.")
3194

    
3195
    secondary_nodes = instance.secondary_nodes
3196
    if not secondary_nodes:
3197
      raise errors.ProgrammerError("no secondary node but using "
3198
                                   "a mirrored disk template")
3199

    
3200
    target_node = secondary_nodes[0]
3201
    # check memory requirements on the secondary node
3202
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3203
                         instance.name, bep[constants.BE_MEMORY],
3204
                         instance.hypervisor)
3205

    
3206
    # check bridge existance
3207
    brlist = [nic.bridge for nic in instance.nics]
3208
    result = self.rpc.call_bridges_exist(target_node, brlist)
3209
    result.Raise()
3210
    if not result.data:
3211
      raise errors.OpPrereqError("One or more target bridges %s does not"
3212
                                 " exist on destination node '%s'" %
3213
                                 (brlist, target_node))
3214

    
3215
  def Exec(self, feedback_fn):
3216
    """Failover an instance.
3217

3218
    The failover is done by shutting it down on its present node and
3219
    starting it on the secondary.
3220

3221
    """
3222
    instance = self.instance
3223

    
3224
    source_node = instance.primary_node
3225
    target_node = instance.secondary_nodes[0]
3226

    
3227
    feedback_fn("* checking disk consistency between source and target")
3228
    for dev in instance.disks:
3229
      # for drbd, these are drbd over lvm
3230
      if not _CheckDiskConsistency(self, dev, target_node, False):
3231
        if instance.status == "up" and not self.op.ignore_consistency:
3232
          raise errors.OpExecError("Disk %s is degraded on target node,"
3233
                                   " aborting failover." % dev.iv_name)
3234

    
3235
    feedback_fn("* shutting down instance on source node")
3236
    logging.info("Shutting down instance %s on node %s",
3237
                 instance.name, source_node)
3238

    
3239
    result = self.rpc.call_instance_shutdown(source_node, instance)
3240
    if result.failed or not result.data:
3241
      if self.op.ignore_consistency:
3242
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3243
                             " Proceeding"
3244
                             " anyway. Please make sure node %s is down",
3245
                             instance.name, source_node, source_node)
3246
      else:
3247
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3248
                                 (instance.name, source_node))
3249

    
3250
    feedback_fn("* deactivating the instance's disks on source node")
3251
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3252
      raise errors.OpExecError("Can't shut down the instance's disks.")
3253

    
3254
    instance.primary_node = target_node
3255
    # distribute new instance config to the other nodes
3256
    self.cfg.Update(instance)
3257

    
3258
    # Only start the instance if it's marked as up
3259
    if instance.status == "up":
3260
      feedback_fn("* activating the instance's disks on target node")
3261
      logging.info("Starting instance %s on node %s",
3262
                   instance.name, target_node)
3263

    
3264
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3265
                                               ignore_secondaries=True)
3266
      if not disks_ok:
3267
        _ShutdownInstanceDisks(self, instance)
3268
        raise errors.OpExecError("Can't activate the instance's disks")
3269

    
3270
      feedback_fn("* starting the instance on the target node")
3271
      result = self.rpc.call_instance_start(target_node, instance, None)
3272
      if result.failed or not result.data:
3273
        _ShutdownInstanceDisks(self, instance)
3274
        raise errors.OpExecError("Could not start instance %s on node %s." %
3275
                                 (instance.name, target_node))
3276

    
3277

    
3278
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3279
  """Create a tree of block devices on the primary node.
3280

3281
  This always creates all devices.
3282

3283
  """
3284
  if device.children:
3285
    for child in device.children:
3286
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3287
        return False
3288

    
3289
  lu.cfg.SetDiskID(device, node)
3290
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3291
                                       instance.name, True, info)
3292
  if new_id.failed or not new_id.data:
3293
    return False
3294
  if device.physical_id is None:
3295
    device.physical_id = new_id
3296
  return True
3297

    
3298

    
3299
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3300
  """Create a tree of block devices on a secondary node.
3301

3302
  If this device type has to be created on secondaries, create it and
3303
  all its children.
3304

3305
  If not, just recurse to children keeping the same 'force' value.
3306

3307
  """
3308
  if device.CreateOnSecondary():
3309
    force = True
3310
  if device.children:
3311
    for child in device.children:
3312
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3313
                                        child, force, info):
3314
        return False
3315

    
3316
  if not force:
3317
    return True
3318
  lu.cfg.SetDiskID(device, node)
3319
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3320
                                       instance.name, False, info)
3321
  if new_id.failed or not new_id.data:
3322
    return False
3323
  if device.physical_id is None:
3324
    device.physical_id = new_id
3325
  return True
3326

    
3327

    
3328
def _GenerateUniqueNames(lu, exts):
3329
  """Generate a suitable LV name.
3330

3331
  This will generate a logical volume name for the given instance.
3332

3333
  """
3334
  results = []
3335
  for val in exts:
3336
    new_id = lu.cfg.GenerateUniqueID()
3337
    results.append("%s%s" % (new_id, val))
3338
  return results
3339

    
3340

    
3341
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3342
                         p_minor, s_minor):
3343
  """Generate a drbd8 device complete with its children.
3344

3345
  """
3346
  port = lu.cfg.AllocatePort()
3347
  vgname = lu.cfg.GetVGName()
3348
  shared_secret = lu.cfg.GenerateDRBDSecret()
3349
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3350
                          logical_id=(vgname, names[0]))
3351
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3352
                          logical_id=(vgname, names[1]))
3353
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3354
                          logical_id=(primary, secondary, port,
3355
                                      p_minor, s_minor,
3356
                                      shared_secret),
3357
                          children=[dev_data, dev_meta],
3358
                          iv_name=iv_name)
3359
  return drbd_dev
3360

    
3361

    
3362
def _GenerateDiskTemplate(lu, template_name,
3363
                          instance_name, primary_node,
3364
                          secondary_nodes, disk_info,
3365
                          file_storage_dir, file_driver,
3366
                          base_index):
3367
  """Generate the entire disk layout for a given template type.
3368

3369
  """
3370
  #TODO: compute space requirements
3371

    
3372
  vgname = lu.cfg.GetVGName()
3373
  disk_count = len(disk_info)
3374
  disks = []
3375
  if template_name == constants.DT_DISKLESS:
3376
    pass
3377
  elif template_name == constants.DT_PLAIN:
3378
    if len(secondary_nodes) != 0:
3379
      raise errors.ProgrammerError("Wrong template configuration")
3380

    
3381
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3382
                                      for i in range(disk_count)])
3383
    for idx, disk in enumerate(disk_info):
3384
      disk_index = idx + base_index
3385
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3386
                              logical_id=(vgname, names[idx]),
3387
                              iv_name="disk/%d" % disk_index)
3388
      disks.append(disk_dev)
3389
  elif template_name == constants.DT_DRBD8:
3390
    if len(secondary_nodes) != 1:
3391
      raise errors.ProgrammerError("Wrong template configuration")
3392
    remote_node = secondary_nodes[0]
3393
    minors = lu.cfg.AllocateDRBDMinor(
3394
      [primary_node, remote_node] * len(disk_info), instance_name)
3395

    
3396
    names = _GenerateUniqueNames(lu,
3397
                                 [".disk%d_%s" % (i, s)
3398
                                  for i in range(disk_count)
3399
                                  for s in ("data", "meta")
3400
                                  ])
3401
    for idx, disk in enumerate(disk_info):
3402
      disk_index = idx + base_index
3403
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3404
                                      disk["size"], names[idx*2:idx*2+2],
3405
                                      "disk/%d" % disk_index,
3406
                                      minors[idx*2], minors[idx*2+1])
3407
      disks.append(disk_dev)
3408
  elif template_name == constants.DT_FILE:
3409
    if len(secondary_nodes) != 0:
3410
      raise errors.ProgrammerError("Wrong template configuration")
3411

    
3412
    for idx, disk in enumerate(disk_info):
3413
      disk_index = idx + base_index
3414
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3415
                              iv_name="disk/%d" % disk_index,
3416
                              logical_id=(file_driver,
3417
                                          "%s/disk%d" % (file_storage_dir,
3418
                                                         idx)))
3419
      disks.append(disk_dev)
3420
  else:
3421
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3422
  return disks
3423

    
3424

    
3425
def _GetInstanceInfoText(instance):
3426
  """Compute that text that should be added to the disk's metadata.
3427

3428
  """
3429
  return "originstname+%s" % instance.name
3430

    
3431

    
3432
def _CreateDisks(lu, instance):
3433
  """Create all disks for an instance.
3434

3435
  This abstracts away some work from AddInstance.
3436

3437
  @type lu: L{LogicalUnit}
3438
  @param lu: the logical unit on whose behalf we execute
3439
  @type instance: L{objects.Instance}
3440
  @param instance: the instance whose disks we should create
3441
  @rtype: boolean
3442
  @return: the success of the creation
3443

3444
  """
3445
  info = _GetInstanceInfoText(instance)
3446

    
3447
  if instance.disk_template == constants.DT_FILE:
3448
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3449
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3450
                                                 file_storage_dir)
3451

    
3452
    if result.failed or not result.data:
3453
      logging.error("Could not connect to node '%s'", instance.primary_node)
3454
      return False
3455

    
3456
    if not result.data[0]:
3457
      logging.error("Failed to create directory '%s'", file_storage_dir)
3458
      return False
3459

    
3460
  # Note: this needs to be kept in sync with adding of disks in
3461
  # LUSetInstanceParams
3462
  for device in instance.disks:
3463
    logging.info("Creating volume %s for instance %s",
3464
                 device.iv_name, instance.name)
3465
    #HARDCODE
3466
    for secondary_node in instance.secondary_nodes:
3467
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3468
                                        device, False, info):
3469
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3470
                      device.iv_name, device, secondary_node)
3471
        return False
3472
    #HARDCODE
3473
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3474
                                    instance, device, info):
3475
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3476
      return False
3477

    
3478
  return True
3479

    
3480

    
3481
def _RemoveDisks(lu, instance):
3482
  """Remove all disks for an instance.
3483

3484
  This abstracts away some work from `AddInstance()` and
3485
  `RemoveInstance()`. Note that in case some of the devices couldn't
3486
  be removed, the removal will continue with the other ones (compare
3487
  with `_CreateDisks()`).
3488

3489
  @type lu: L{LogicalUnit}
3490
  @param lu: the logical unit on whose behalf we execute
3491
  @type instance: L{objects.Instance}
3492
  @param instance: the instance whose disks we should remove
3493
  @rtype: boolean
3494
  @return: the success of the removal
3495

3496
  """
3497
  logging.info("Removing block devices for instance %s", instance.name)
3498

    
3499
  result = True
3500
  for device in instance.disks:
3501
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3502
      lu.cfg.SetDiskID(disk, node)
3503
      result = lu.rpc.call_blockdev_remove(node, disk)
3504
      if result.failed or not result.data:
3505
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3506
                           " continuing anyway", device.iv_name, node)
3507
        result = False
3508

    
3509
  if instance.disk_template == constants.DT_FILE:
3510
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3511
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3512
                                                 file_storage_dir)
3513
    if result.failed or not result.data:
3514
      logging.error("Could not remove directory '%s'", file_storage_dir)
3515
      result = False
3516

    
3517
  return result
3518

    
3519

    
3520
def _ComputeDiskSize(disk_template, disks):
3521
  """Compute disk size requirements in the volume group
3522

3523
  """
3524
  # Required free disk space as a function of disk and swap space
3525
  req_size_dict = {
3526
    constants.DT_DISKLESS: None,
3527
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3528
    # 128 MB are added for drbd metadata for each disk
3529
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3530
    constants.DT_FILE: None,
3531
  }
3532

    
3533
  if disk_template not in req_size_dict:
3534
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3535
                                 " is unknown" %  disk_template)
3536

    
3537
  return req_size_dict[disk_template]
3538

    
3539

    
3540
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3541
  """Hypervisor parameter validation.
3542

3543
  This function abstract the hypervisor parameter validation to be
3544
  used in both instance create and instance modify.
3545

3546
  @type lu: L{LogicalUnit}
3547
  @param lu: the logical unit for which we check
3548
  @type nodenames: list
3549
  @param nodenames: the list of nodes on which we should check
3550
  @type hvname: string
3551
  @param hvname: the name of the hypervisor we should use
3552
  @type hvparams: dict
3553
  @param hvparams: the parameters which we need to check
3554
  @raise errors.OpPrereqError: if the parameters are not valid
3555

3556
  """
3557
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3558
                                                  hvname,
3559
                                                  hvparams)
3560
  for node in nodenames:
3561
    info = hvinfo[node]
3562
    info.Raise()
3563
    if not info.data or not isinstance(info.data, (tuple, list)):
3564
      raise errors.OpPrereqError("Cannot get current information"
3565
                                 " from node '%s' (%s)" % (node, info.data))
3566
    if not info.data[0]:
3567
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3568
                                 " %s" % info.data[1])
3569

    
3570

    
3571
class LUCreateInstance(LogicalUnit):
3572
  """Create an instance.
3573

3574
  """
3575
  HPATH = "instance-add"
3576
  HTYPE = constants.HTYPE_INSTANCE
3577
  _OP_REQP = ["instance_name", "disks", "disk_template",
3578
              "mode", "start",
3579
              "wait_for_sync", "ip_check", "nics",
3580
              "hvparams", "beparams"]
3581
  REQ_BGL = False
3582

    
3583
  def _ExpandNode(self, node):
3584
    """Expands and checks one node name.
3585

3586
    """
3587
    node_full = self.cfg.ExpandNodeName(node)
3588
    if node_full is None:
3589
      raise errors.OpPrereqError("Unknown node %s" % node)
3590
    return node_full
3591

    
3592
  def ExpandNames(self):
3593
    """ExpandNames for CreateInstance.
3594

3595
    Figure out the right locks for instance creation.
3596

3597
    """
3598
    self.needed_locks = {}
3599

    
3600
    # set optional parameters to none if they don't exist
3601
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3602
      if not hasattr(self.op, attr):
3603
        setattr(self.op, attr, None)
3604

    
3605
    # cheap checks, mostly valid constants given
3606

    
3607
    # verify creation mode
3608
    if self.op.mode not in (constants.INSTANCE_CREATE,
3609
                            constants.INSTANCE_IMPORT):
3610
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3611
                                 self.op.mode)
3612

    
3613
    # disk template and mirror node verification
3614
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3615
      raise errors.OpPrereqError("Invalid disk template name")
3616

    
3617
    if self.op.hypervisor is None:
3618
      self.op.hypervisor = self.cfg.GetHypervisorType()
3619

    
3620
    cluster = self.cfg.GetClusterInfo()
3621
    enabled_hvs = cluster.enabled_hypervisors
3622
    if self.op.hypervisor not in enabled_hvs:
3623
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3624
                                 " cluster (%s)" % (self.op.hypervisor,
3625
                                  ",".join(enabled_hvs)))
3626

    
3627
    # check hypervisor parameter syntax (locally)
3628

    
3629
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3630
                                  self.op.hvparams)
3631
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3632
    hv_type.CheckParameterSyntax(filled_hvp)
3633

    
3634
    # fill and remember the beparams dict
3635
    utils.CheckBEParams(self.op.beparams)
3636
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3637
                                    self.op.beparams)
3638

    
3639
    #### instance parameters check
3640

    
3641
    # instance name verification
3642
    hostname1 = utils.HostInfo(self.op.instance_name)
3643
    self.op.instance_name = instance_name = hostname1.name
3644

    
3645
    # this is just a preventive check, but someone might still add this
3646
    # instance in the meantime, and creation will fail at lock-add time
3647
    if instance_name in self.cfg.GetInstanceList():
3648
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3649
                                 instance_name)
3650

    
3651
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3652

    
3653
    # NIC buildup
3654
    self.nics = []
3655
    for nic in self.op.nics:
3656
      # ip validity checks
3657
      ip = nic.get("ip", None)
3658
      if ip is None or ip.lower() == "none":
3659
        nic_ip = None
3660
      elif ip.lower() == constants.VALUE_AUTO:
3661
        nic_ip = hostname1.ip
3662
      else:
3663
        if not utils.IsValidIP(ip):
3664
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3665
                                     " like a valid IP" % ip)
3666
        nic_ip = ip
3667

    
3668
      # MAC address verification
3669
      mac = nic.get("mac", constants.VALUE_AUTO)
3670
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3671
        if not utils.IsValidMac(mac.lower()):
3672
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3673
                                     mac)
3674
      # bridge verification
3675
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3676
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3677

    
3678
    # disk checks/pre-build
3679
    self.disks = []
3680
    for disk in self.op.disks:
3681
      mode = disk.get("mode", constants.DISK_RDWR)
3682
      if mode not in constants.DISK_ACCESS_SET:
3683
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3684
                                   mode)
3685
      size = disk.get("size", None)
3686
      if size is None:
3687
        raise errors.OpPrereqError("Missing disk size")
3688
      try:
3689
        size = int(size)
3690
      except ValueError:
3691
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3692
      self.disks.append({"size": size, "mode": mode})
3693

    
3694
    # used in CheckPrereq for ip ping check
3695
    self.check_ip = hostname1.ip
3696

    
3697
    # file storage checks
3698
    if (self.op.file_driver and
3699
        not self.op.file_driver in constants.FILE_DRIVER):
3700
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3701
                                 self.op.file_driver)
3702

    
3703
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3704
      raise errors.OpPrereqError("File storage directory path not absolute")
3705

    
3706
    ### Node/iallocator related checks
3707
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3708
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3709
                                 " node must be given")
3710

    
3711
    if self.op.iallocator:
3712
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3713
    else:
3714
      self.op.pnode = self._ExpandNode(self.op.pnode)
3715
      nodelist = [self.op.pnode]
3716
      if self.op.snode is not None:
3717
        self.op.snode = self._ExpandNode(self.op.snode)
3718
        nodelist.append(self.op.snode)
3719
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3720

    
3721
    # in case of import lock the source node too
3722
    if self.op.mode == constants.INSTANCE_IMPORT:
3723
      src_node = getattr(self.op, "src_node", None)
3724
      src_path = getattr(self.op, "src_path", None)
3725

    
3726
      if src_path is None:
3727
        self.op.src_path = src_path = self.op.instance_name
3728

    
3729
      if src_node is None:
3730
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3731
        self.op.src_node = None
3732
        if os.path.isabs(src_path):
3733
          raise errors.OpPrereqError("Importing an instance from an absolute"
3734
                                     " path requires a source node option.")
3735
      else:
3736
        self.op.src_node = src_node = self._ExpandNode(src_node)
3737
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3738
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
3739
        if not os.path.isabs(src_path):
3740
          self.op.src_path = src_path = \
3741
            os.path.join(constants.EXPORT_DIR, src_path)
3742

    
3743
    else: # INSTANCE_CREATE
3744
      if getattr(self.op, "os_type", None) is None:
3745
        raise errors.OpPrereqError("No guest OS specified")
3746

    
3747
  def _RunAllocator(self):
3748
    """Run the allocator based on input opcode.
3749

3750
    """
3751
    nics = [n.ToDict() for n in self.nics]
3752
    ial = IAllocator(self,
3753
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3754
                     name=self.op.instance_name,
3755
                     disk_template=self.op.disk_template,
3756
                     tags=[],
3757
                     os=self.op.os_type,
3758
                     vcpus=self.be_full[constants.BE_VCPUS],
3759
                     mem_size=self.be_full[constants.BE_MEMORY],
3760
                     disks=self.disks,
3761
                     nics=nics,
3762
                     hypervisor=self.op.hypervisor,
3763
                     )
3764

    
3765
    ial.Run(self.op.iallocator)
3766

    
3767
    if not ial.success:
3768
      raise errors.OpPrereqError("Can't compute nodes using"
3769
                                 " iallocator '%s': %s" % (self.op.iallocator,
3770
                                                           ial.info))
3771
    if len(ial.nodes) != ial.required_nodes:
3772
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3773
                                 " of nodes (%s), required %s" %
3774
                                 (self.op.iallocator, len(ial.nodes),
3775
                                  ial.required_nodes))
3776
    self.op.pnode = ial.nodes[0]
3777
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3778
                 self.op.instance_name, self.op.iallocator,
3779
                 ", ".join(ial.nodes))
3780
    if ial.required_nodes == 2:
3781
      self.op.snode = ial.nodes[1]
3782

    
3783
  def BuildHooksEnv(self):
3784
    """Build hooks env.
3785

3786
    This runs on master, primary and secondary nodes of the instance.
3787

3788
    """
3789
    env = {
3790
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3791
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3792
      "INSTANCE_ADD_MODE": self.op.mode,
3793
      }
3794
    if self.op.mode == constants.INSTANCE_IMPORT:
3795
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3796
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3797
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3798

    
3799
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3800
      primary_node=self.op.pnode,
3801
      secondary_nodes=self.secondaries,
3802
      status=self.instance_status,
3803
      os_type=self.op.os_type,
3804
      memory=self.be_full[constants.BE_MEMORY],
3805
      vcpus=self.be_full[constants.BE_VCPUS],
3806
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3807
    ))
3808

    
3809
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3810
          self.secondaries)
3811
    return env, nl, nl
3812

    
3813

    
3814
  def CheckPrereq(self):
3815
    """Check prerequisites.
3816

3817
    """
3818
    if (not self.cfg.GetVGName() and
3819
        self.op.disk_template not in constants.DTS_NOT_LVM):
3820
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3821
                                 " instances")
3822

    
3823

    
3824
    if self.op.mode == constants.INSTANCE_IMPORT:
3825
      src_node = self.op.src_node
3826
      src_path = self.op.src_path
3827

    
3828
      if src_node is None:
3829
        exp_list = self.rpc.call_export_list(
3830
          self.acquired_locks[locking.LEVEL_NODE])
3831
        found = False
3832
        for node in exp_list:
3833
          if not exp_list[node].failed and src_path in exp_list[node].data:
3834
            found = True
3835
            self.op.src_node = src_node = node
3836
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3837
                                                       src_path)
3838
            break
3839
        if not found:
3840
          raise errors.OpPrereqError("No export found for relative path %s" %
3841
                                      src_path)
3842

    
3843
      result = self.rpc.call_export_info(src_node, src_path)
3844
      result.Raise()
3845
      if not result.data:
3846
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3847

    
3848
      export_info = result.data
3849
      if not export_info.has_section(constants.INISECT_EXP):
3850
        raise errors.ProgrammerError("Corrupted export config")
3851

    
3852
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3853
      if (int(ei_version) != constants.EXPORT_VERSION):
3854
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3855
                                   (ei_version, constants.EXPORT_VERSION))
3856

    
3857
      # Check that the new instance doesn't have less disks than the export
3858
      instance_disks = len(self.disks)
3859
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3860
      if instance_disks < export_disks:
3861
        raise errors.OpPrereqError("Not enough disks to import."
3862
                                   " (instance: %d, export: %d)" %
3863
                                   (instance_disks, export_disks))
3864

    
3865
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3866
      disk_images = []
3867
      for idx in range(export_disks):
3868
        option = 'disk%d_dump' % idx
3869
        if export_info.has_option(constants.INISECT_INS, option):
3870
          # FIXME: are the old os-es, disk sizes, etc. useful?
3871
          export_name = export_info.get(constants.INISECT_INS, option)
3872
          image = os.path.join(src_path, export_name)
3873
          disk_images.append(image)
3874
        else:
3875
          disk_images.append(False)
3876

    
3877
      self.src_images = disk_images
3878

    
3879
      old_name = export_info.get(constants.INISECT_INS, 'name')
3880
      # FIXME: int() here could throw a ValueError on broken exports
3881
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3882
      if self.op.instance_name == old_name:
3883
        for idx, nic in enumerate(self.nics):
3884
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3885
            nic_mac_ini = 'nic%d_mac' % idx
3886
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3887

    
3888
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3889
    if self.op.start and not self.op.ip_check:
3890
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3891
                                 " adding an instance in start mode")
3892

    
3893
    if self.op.ip_check:
3894
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3895
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3896
                                   (self.check_ip, self.op.instance_name))
3897

    
3898
    #### allocator run
3899

    
3900
    if self.op.iallocator is not None:
3901
      self._RunAllocator()
3902

    
3903
    #### node related checks
3904

    
3905
    # check primary node
3906
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3907
    assert self.pnode is not None, \
3908
      "Cannot retrieve locked node %s" % self.op.pnode
3909
    self.secondaries = []
3910

    
3911
    # mirror node verification
3912
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3913
      if self.op.snode is None:
3914
        raise errors.OpPrereqError("The networked disk templates need"
3915
                                   " a mirror node")
3916
      if self.op.snode == pnode.name:
3917
        raise errors.OpPrereqError("The secondary node cannot be"
3918
                                   " the primary node.")
3919
      self.secondaries.append(self.op.snode)
3920

    
3921
    nodenames = [pnode.name] + self.secondaries
3922

    
3923
    req_size = _ComputeDiskSize(self.op.disk_template,
3924
                                self.disks)
3925

    
3926
    # Check lv size requirements
3927
    if req_size is not None:
3928
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3929
                                         self.op.hypervisor)
3930
      for node in nodenames:
3931
        info = nodeinfo[node]
3932
        info.Raise()
3933
        info = info.data
3934
        if not info:
3935
          raise errors.OpPrereqError("Cannot get current information"
3936
                                     " from node '%s'" % node)
3937
        vg_free = info.get('vg_free', None)
3938
        if not isinstance(vg_free, int):
3939
          raise errors.OpPrereqError("Can't compute free disk space on"
3940
                                     " node %s" % node)
3941
        if req_size > info['vg_free']:
3942
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3943
                                     " %d MB available, %d MB required" %
3944
                                     (node, info['vg_free'], req_size))
3945

    
3946
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3947

    
3948
    # os verification
3949
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3950
    result.Raise()
3951
    if not isinstance(result.data, objects.OS):
3952
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3953
                                 " primary node"  % self.op.os_type)
3954

    
3955
    # bridge check on primary node
3956
    bridges = [n.bridge for n in self.nics]
3957
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3958
    result.Raise()
3959
    if not result.data:
3960
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
3961
                                 " exist on destination node '%s'" %
3962
                                 (",".join(bridges), pnode.name))
3963

    
3964
    # memory check on primary node
3965
    if self.op.start:
3966
      _CheckNodeFreeMemory(self, self.pnode.name,
3967
                           "creating instance %s" % self.op.instance_name,
3968
                           self.be_full[constants.BE_MEMORY],
3969
                           self.op.hypervisor)
3970

    
3971
    if self.op.start:
3972
      self.instance_status = 'up'
3973
    else:
3974
      self.instance_status = 'down'
3975

    
3976
  def Exec(self, feedback_fn):
3977
    """Create and add the instance to the cluster.
3978

3979
    """
3980
    instance = self.op.instance_name
3981
    pnode_name = self.pnode.name
3982

    
3983
    for nic in self.nics:
3984
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3985
        nic.mac = self.cfg.GenerateMAC()
3986

    
3987
    ht_kind = self.op.hypervisor
3988
    if ht_kind in constants.HTS_REQ_PORT:
3989
      network_port = self.cfg.AllocatePort()
3990
    else:
3991
      network_port = None
3992

    
3993
    ##if self.op.vnc_bind_address is None:
3994
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3995

    
3996
    # this is needed because os.path.join does not accept None arguments
3997
    if self.op.file_storage_dir is None:
3998
      string_file_storage_dir = ""
3999
    else:
4000
      string_file_storage_dir = self.op.file_storage_dir
4001

    
4002
    # build the full file storage dir path
4003
    file_storage_dir = os.path.normpath(os.path.join(
4004
                                        self.cfg.GetFileStorageDir(),
4005
                                        string_file_storage_dir, instance))
4006

    
4007

    
4008
    disks = _GenerateDiskTemplate(self,
4009
                                  self.op.disk_template,
4010
                                  instance, pnode_name,
4011
                                  self.secondaries,
4012
                                  self.disks,
4013
                                  file_storage_dir,
4014
                                  self.op.file_driver,
4015
                                  0)
4016

    
4017
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4018
                            primary_node=pnode_name,
4019
                            nics=self.nics, disks=disks,
4020
                            disk_template=self.op.disk_template,
4021
                            status=self.instance_status,
4022
                            network_port=network_port,
4023
                            beparams=self.op.beparams,
4024
                            hvparams=self.op.hvparams,
4025
                            hypervisor=self.op.hypervisor,
4026
                            )
4027

    
4028
    feedback_fn("* creating instance disks...")
4029
    if not _CreateDisks(self, iobj):
4030
      _RemoveDisks(self, iobj)
4031
      self.cfg.ReleaseDRBDMinors(instance)
4032
      raise errors.OpExecError("Device creation failed, reverting...")
4033

    
4034
    feedback_fn("adding instance %s to cluster config" % instance)
4035

    
4036
    self.cfg.AddInstance(iobj)
4037
    # Declare that we don't want to remove the instance lock anymore, as we've
4038
    # added the instance to the config
4039
    del self.remove_locks[locking.LEVEL_INSTANCE]
4040
    # Remove the temp. assignements for the instance's drbds
4041
    self.cfg.ReleaseDRBDMinors(instance)
4042
    # Unlock all the nodes
4043
    if self.op.mode == constants.INSTANCE_IMPORT:
4044
      nodes_keep = [self.op.src_node]
4045
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4046
                       if node != self.op.src_node]
4047
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4048
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4049
    else:
4050
      self.context.glm.release(locking.LEVEL_NODE)
4051
      del self.acquired_locks[locking.LEVEL_NODE]
4052

    
4053
    if self.op.wait_for_sync:
4054
      disk_abort = not _WaitForSync(self, iobj)
4055
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4056
      # make sure the disks are not degraded (still sync-ing is ok)
4057
      time.sleep(15)
4058
      feedback_fn("* checking mirrors status")
4059
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4060
    else:
4061
      disk_abort = False
4062

    
4063
    if disk_abort:
4064
      _RemoveDisks(self, iobj)
4065
      self.cfg.RemoveInstance(iobj.name)
4066
      # Make sure the instance lock gets removed
4067
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4068
      raise errors.OpExecError("There are some degraded disks for"
4069
                               " this instance")
4070

    
4071
    feedback_fn("creating os for instance %s on node %s" %
4072
                (instance, pnode_name))
4073

    
4074
    if iobj.disk_template != constants.DT_DISKLESS:
4075
      if self.op.mode == constants.INSTANCE_CREATE:
4076
        feedback_fn("* running the instance OS create scripts...")
4077
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4078
        result.Raise()
4079
        if not result.data:
4080
          raise errors.OpExecError("Could not add os for instance %s"
4081
                                   " on node %s" %
4082
                                   (instance, pnode_name))
4083

    
4084
      elif self.op.mode == constants.INSTANCE_IMPORT:
4085
        feedback_fn("* running the instance OS import scripts...")
4086
        src_node = self.op.src_node
4087
        src_images = self.src_images
4088
        cluster_name = self.cfg.GetClusterName()
4089
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4090
                                                         src_node, src_images,
4091
                                                         cluster_name)
4092
        import_result.Raise()
4093
        for idx, result in enumerate(import_result.data):
4094
          if not result:
4095
            self.LogWarning("Could not import the image %s for instance"
4096
                            " %s, disk %d, on node %s" %
4097
                            (src_images[idx], instance, idx, pnode_name))
4098
      else:
4099
        # also checked in the prereq part
4100
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4101
                                     % self.op.mode)
4102

    
4103
    if self.op.start:
4104
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4105
      feedback_fn("* starting instance...")
4106
      result = self.rpc.call_instance_start(pnode_name, iobj, None)
4107
      result.Raise()
4108
      if not result.data:
4109
        raise errors.OpExecError("Could not start instance")
4110

    
4111

    
4112
class LUConnectConsole(NoHooksLU):
4113
  """Connect to an instance's console.
4114

4115
  This is somewhat special in that it returns the command line that
4116
  you need to run on the master node in order to connect to the
4117
  console.
4118

4119
  """
4120
  _OP_REQP = ["instance_name"]
4121
  REQ_BGL = False
4122

    
4123
  def ExpandNames(self):
4124
    self._ExpandAndLockInstance()
4125

    
4126
  def CheckPrereq(self):
4127
    """Check prerequisites.
4128

4129
    This checks that the instance is in the cluster.
4130

4131
    """
4132
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4133
    assert self.instance is not None, \
4134
      "Cannot retrieve locked instance %s" % self.op.instance_name
4135

    
4136
  def Exec(self, feedback_fn):
4137
    """Connect to the console of an instance
4138

4139
    """
4140
    instance = self.instance
4141
    node = instance.primary_node
4142

    
4143
    node_insts = self.rpc.call_instance_list([node],
4144
                                             [instance.hypervisor])[node]
4145
    node_insts.Raise()
4146

    
4147
    if instance.name not in node_insts.data:
4148
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4149

    
4150
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4151

    
4152
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4153
    console_cmd = hyper.GetShellCommandForConsole(instance)
4154

    
4155
    # build ssh cmdline
4156
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4157

    
4158

    
4159
class LUReplaceDisks(LogicalUnit):
4160
  """Replace the disks of an instance.
4161

4162
  """
4163
  HPATH = "mirrors-replace"
4164
  HTYPE = constants.HTYPE_INSTANCE
4165
  _OP_REQP = ["instance_name", "mode", "disks"]
4166
  REQ_BGL = False
4167

    
4168
  def ExpandNames(self):
4169
    self._ExpandAndLockInstance()
4170

    
4171
    if not hasattr(self.op, "remote_node"):
4172
      self.op.remote_node = None
4173

    
4174
    ia_name = getattr(self.op, "iallocator", None)
4175
    if ia_name is not None:
4176
      if self.op.remote_node is not None:
4177
        raise errors.OpPrereqError("Give either the iallocator or the new"
4178
                                   " secondary, not both")
4179
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4180
    elif self.op.remote_node is not None:
4181
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4182
      if remote_node is None:
4183
        raise errors.OpPrereqError("Node '%s' not known" %
4184
                                   self.op.remote_node)
4185
      self.op.remote_node = remote_node
4186
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4187
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4188
    else:
4189
      self.needed_locks[locking.LEVEL_NODE] = []
4190
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4191

    
4192
  def DeclareLocks(self, level):
4193
    # If we're not already locking all nodes in the set we have to declare the
4194
    # instance's primary/secondary nodes.
4195
    if (level == locking.LEVEL_NODE and
4196
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4197
      self._LockInstancesNodes()
4198

    
4199
  def _RunAllocator(self):
4200
    """Compute a new secondary node using an IAllocator.
4201

4202
    """
4203
    ial = IAllocator(self,
4204
                     mode=constants.IALLOCATOR_MODE_RELOC,
4205
                     name=self.op.instance_name,
4206
                     relocate_from=[self.sec_node])
4207

    
4208
    ial.Run(self.op.iallocator)
4209

    
4210
    if not ial.success:
4211
      raise errors.OpPrereqError("Can't compute nodes using"
4212
                                 " iallocator '%s': %s" % (self.op.iallocator,
4213
                                                           ial.info))
4214
    if len(ial.nodes) != ial.required_nodes:
4215
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4216
                                 " of nodes (%s), required %s" %
4217
                                 (len(ial.nodes), ial.required_nodes))
4218
    self.op.remote_node = ial.nodes[0]
4219
    self.LogInfo("Selected new secondary for the instance: %s",
4220
                 self.op.remote_node)
4221

    
4222
  def BuildHooksEnv(self):
4223
    """Build hooks env.
4224

4225
    This runs on the master, the primary and all the secondaries.
4226

4227
    """
4228
    env = {
4229
      "MODE": self.op.mode,
4230
      "NEW_SECONDARY": self.op.remote_node,
4231
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4232
      }
4233
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4234
    nl = [
4235
      self.cfg.GetMasterNode(),
4236
      self.instance.primary_node,
4237
      ]
4238
    if self.op.remote_node is not None:
4239
      nl.append(self.op.remote_node)
4240
    return env, nl, nl
4241

    
4242
  def CheckPrereq(self):
4243
    """Check prerequisites.
4244

4245
    This checks that the instance is in the cluster.
4246

4247
    """
4248
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4249
    assert instance is not None, \
4250
      "Cannot retrieve locked instance %s" % self.op.instance_name
4251
    self.instance = instance
4252

    
4253
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4254
      raise errors.OpPrereqError("Instance's disk layout is not"
4255
                                 " network mirrored.")
4256

    
4257
    if len(instance.secondary_nodes) != 1:
4258
      raise errors.OpPrereqError("The instance has a strange layout,"
4259
                                 " expected one secondary but found %d" %
4260
                                 len(instance.secondary_nodes))
4261

    
4262
    self.sec_node = instance.secondary_nodes[0]
4263

    
4264
    ia_name = getattr(self.op, "iallocator", None)
4265
    if ia_name is not None:
4266
      self._RunAllocator()
4267

    
4268
    remote_node = self.op.remote_node
4269
    if remote_node is not None:
4270
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4271
      assert self.remote_node_info is not None, \
4272
        "Cannot retrieve locked node %s" % remote_node
4273
    else:
4274
      self.remote_node_info = None
4275
    if remote_node == instance.primary_node:
4276
      raise errors.OpPrereqError("The specified node is the primary node of"
4277
                                 " the instance.")
4278
    elif remote_node == self.sec_node:
4279
      if self.op.mode == constants.REPLACE_DISK_SEC:
4280
        # this is for DRBD8, where we can't execute the same mode of
4281
        # replacement as for drbd7 (no different port allocated)
4282
        raise errors.OpPrereqError("Same secondary given, cannot execute"
4283
                                   " replacement")
4284
    if instance.disk_template == constants.DT_DRBD8:
4285
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4286
          remote_node is not None):
4287
        # switch to replace secondary mode
4288
        self.op.mode = constants.REPLACE_DISK_SEC
4289

    
4290
      if self.op.mode == constants.REPLACE_DISK_ALL:
4291
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4292
                                   " secondary disk replacement, not"
4293
                                   " both at once")
4294
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4295
        if remote_node is not None:
4296
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4297
                                     " the secondary while doing a primary"
4298
                                     " node disk replacement")
4299
        self.tgt_node = instance.primary_node
4300
        self.oth_node = instance.secondary_nodes[0]
4301
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4302
        self.new_node = remote_node # this can be None, in which case
4303
                                    # we don't change the secondary
4304
        self.tgt_node = instance.secondary_nodes[0]
4305
        self.oth_node = instance.primary_node
4306
      else:
4307
        raise errors.ProgrammerError("Unhandled disk replace mode")
4308

    
4309
    if not self.op.disks:
4310
      self.op.disks = range(len(instance.disks))
4311

    
4312
    for disk_idx in self.op.disks:
4313
      instance.FindDisk(disk_idx)
4314

    
4315
  def _ExecD8DiskOnly(self, feedback_fn):
4316
    """Replace a disk on the primary or secondary for dbrd8.
4317

4318
    The algorithm for replace is quite complicated:
4319

4320
      1. for each disk to be replaced:
4321

4322
        1. create new LVs on the target node with unique names
4323
        1. detach old LVs from the drbd device
4324
        1. rename old LVs to name_replaced.<time_t>
4325
        1. rename new LVs to old LVs
4326
        1. attach the new LVs (with the old names now) to the drbd device
4327

4328
      1. wait for sync across all devices
4329

4330
      1. for each modified disk:
4331

4332
        1. remove old LVs (which have the name name_replaces.<time_t>)
4333

4334
    Failures are not very well handled.
4335

4336
    """
4337
    steps_total = 6
4338
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4339
    instance = self.instance
4340
    iv_names = {}
4341
    vgname = self.cfg.GetVGName()
4342
    # start of work
4343
    cfg = self.cfg
4344
    tgt_node = self.tgt_node
4345
    oth_node = self.oth_node
4346

    
4347
    # Step: check device activation
4348
    self.proc.LogStep(1, steps_total, "check device existence")
4349
    info("checking volume groups")
4350
    my_vg = cfg.GetVGName()
4351
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4352
    if not results:
4353
      raise errors.OpExecError("Can't list volume groups on the nodes")
4354
    for node in oth_node, tgt_node:
4355
      res = results[node]
4356
      if res.failed or not res.data or my_vg not in res.data:
4357
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4358
                                 (my_vg, node))
4359
    for idx, dev in enumerate(instance.disks):
4360
      if idx not in self.op.disks:
4361
        continue
4362
      for node in tgt_node, oth_node:
4363
        info("checking disk/%d on %s" % (idx, node))
4364
        cfg.SetDiskID(dev, node)
4365
        if not self.rpc.call_blockdev_find(node, dev):
4366
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4367
                                   (idx, node))
4368

    
4369
    # Step: check other node consistency
4370
    self.proc.LogStep(2, steps_total, "check peer consistency")
4371
    for idx, dev in enumerate(instance.disks):
4372
      if idx not in self.op.disks:
4373
        continue
4374
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4375
      if not _CheckDiskConsistency(self, dev, oth_node,
4376
                                   oth_node==instance.primary_node):
4377
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4378
                                 " to replace disks on this node (%s)" %
4379
                                 (oth_node, tgt_node))
4380

    
4381
    # Step: create new storage
4382
    self.proc.LogStep(3, steps_total, "allocate new storage")
4383
    for idx, dev in enumerate(instance.disks):
4384
      if idx not in self.op.disks:
4385
        continue
4386
      size = dev.size
4387
      cfg.SetDiskID(dev, tgt_node)
4388
      lv_names = [".disk%d_%s" % (idx, suf)
4389
                  for suf in ["data", "meta"]]
4390
      names = _GenerateUniqueNames(self, lv_names)
4391
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4392
                             logical_id=(vgname, names[0]))
4393
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4394
                             logical_id=(vgname, names[1]))
4395
      new_lvs = [lv_data, lv_meta]
4396
      old_lvs = dev.children
4397
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4398
      info("creating new local storage on %s for %s" %
4399
           (tgt_node, dev.iv_name))
4400
      # since we *always* want to create this LV, we use the
4401
      # _Create...OnPrimary (which forces the creation), even if we
4402
      # are talking about the secondary node
4403
      for new_lv in new_lvs:
4404
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4405
                                        _GetInstanceInfoText(instance)):
4406
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4407
                                   " node '%s'" %
4408
                                   (new_lv.logical_id[1], tgt_node))
4409

    
4410
    # Step: for each lv, detach+rename*2+attach
4411
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4412
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4413
      info("detaching %s drbd from local storage" % dev.iv_name)
4414
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4415
      result.Raise()
4416
      if not result.data:
4417
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4418
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4419
      #dev.children = []
4420
      #cfg.Update(instance)
4421

    
4422
      # ok, we created the new LVs, so now we know we have the needed
4423
      # storage; as such, we proceed on the target node to rename
4424
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4425
      # using the assumption that logical_id == physical_id (which in
4426
      # turn is the unique_id on that node)
4427

    
4428
      # FIXME(iustin): use a better name for the replaced LVs
4429
      temp_suffix = int(time.time())
4430
      ren_fn = lambda d, suff: (d.physical_id[0],
4431
                                d.physical_id[1] + "_replaced-%s" % suff)
4432
      # build the rename list based on what LVs exist on the node
4433
      rlist = []
4434
      for to_ren in old_lvs:
4435
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4436
        if not find_res.failed and find_res.data is not None: # device exists
4437
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4438

    
4439
      info("renaming the old LVs on the target node")
4440
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4441
      result.Raise()
4442
      if not result.data:
4443
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4444
      # now we rename the new LVs to the old LVs
4445
      info("renaming the new LVs on the target node")
4446
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4447
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4448
      result.Raise()
4449
      if not result.data:
4450
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4451

    
4452
      for old, new in zip(old_lvs, new_lvs):
4453
        new.logical_id = old.logical_id
4454
        cfg.SetDiskID(new, tgt_node)
4455

    
4456
      for disk in old_lvs:
4457
        disk.logical_id = ren_fn(disk, temp_suffix)
4458
        cfg.SetDiskID(disk, tgt_node)
4459

    
4460
      # now that the new lvs have the old name, we can add them to the device
4461
      info("adding new mirror component on %s" % tgt_node)
4462
      result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4463
      if result.failed or not result.data:
4464
        for new_lv in new_lvs:
4465
          result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4466
          if result.failed or not result.data:
4467
            warning("Can't rollback device %s", hint="manually cleanup unused"
4468
                    " logical volumes")
4469
        raise errors.OpExecError("Can't add local storage to drbd")
4470

    
4471
      dev.children = new_lvs
4472
      cfg.Update(instance)
4473

    
4474
    # Step: wait for sync
4475

    
4476
    # this can fail as the old devices are degraded and _WaitForSync
4477
    # does a combined result over all disks, so we don't check its
4478
    # return value
4479
    self.proc.LogStep(5, steps_total, "sync devices")
4480
    _WaitForSync(self, instance, unlock=True)
4481

    
4482
    # so check manually all the devices
4483
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4484
      cfg.SetDiskID(dev, instance.primary_node)
4485
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4486
      if result.failed or result.data[5]:
4487
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4488

    
4489
    # Step: remove old storage
4490
    self.proc.LogStep(6, steps_total, "removing old storage")
4491
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4492
      info("remove logical volumes for %s" % name)
4493
      for lv in old_lvs:
4494
        cfg.SetDiskID(lv, tgt_node)
4495
        result = self.rpc.call_blockdev_remove(tgt_node, lv)
4496
        if result.failed or not result.data:
4497
          warning("Can't remove old LV", hint="manually remove unused LVs")
4498
          continue
4499

    
4500
  def _ExecD8Secondary(self, feedback_fn):
4501
    """Replace the secondary node for drbd8.
4502

4503
    The algorithm for replace is quite complicated:
4504
      - for all disks of the instance:
4505
        - create new LVs on the new node with same names
4506
        - shutdown the drbd device on the old secondary
4507
        - disconnect the drbd network on the primary
4508
        - create the drbd device on the new secondary
4509
        - network attach the drbd on the primary, using an artifice:
4510
          the drbd code for Attach() will connect to the network if it
4511
          finds a device which is connected to the good local disks but
4512
          not network enabled
4513
      - wait for sync across all devices
4514
      - remove all disks from the old secondary
4515

4516
    Failures are not very well handled.
4517

4518
    """
4519
    steps_total = 6
4520
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4521
    instance = self.instance
4522
    iv_names = {}
4523
    vgname = self.cfg.GetVGName()
4524
    # start of work
4525
    cfg = self.cfg
4526
    old_node = self.tgt_node
4527
    new_node = self.new_node
4528
    pri_node = instance.primary_node
4529

    
4530
    # Step: check device activation
4531
    self.proc.LogStep(1, steps_total, "check device existence")
4532
    info("checking volume groups")
4533
    my_vg = cfg.GetVGName()
4534
    results = self.rpc.call_vg_list([pri_node, new_node])
4535
    for node in pri_node, new_node:
4536
      res = results[node]
4537
      if res.failed or not res.data or my_vg not in res.data:
4538
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4539
                                 (my_vg, node))
4540
    for idx, dev in enumerate(instance.disks):
4541
      if idx not in self.op.disks:
4542
        continue
4543
      info("checking disk/%d on %s" % (idx, pri_node))
4544
      cfg.SetDiskID(dev, pri_node)
4545
      result = self.rpc.call_blockdev_find(pri_node, dev)
4546
      result.Raise()
4547
      if not result.data:
4548
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4549
                                 (idx, pri_node))
4550

    
4551
    # Step: check other node consistency
4552
    self.proc.LogStep(2, steps_total, "check peer consistency")
4553
    for idx, dev in enumerate(instance.disks):
4554
      if idx not in self.op.disks:
4555
        continue
4556
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4557
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4558
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4559
                                 " unsafe to replace the secondary" %
4560
                                 pri_node)
4561

    
4562
    # Step: create new storage
4563
    self.proc.LogStep(3, steps_total, "allocate new storage")
4564
    for idx, dev in enumerate(instance.disks):
4565
      size = dev.size
4566
      info("adding new local storage on %s for disk/%d" %
4567
           (new_node, idx))
4568
      # since we *always* want to create this LV, we use the
4569
      # _Create...OnPrimary (which forces the creation), even if we
4570
      # are talking about the secondary node
4571
      for new_lv in dev.children:
4572
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4573
                                        _GetInstanceInfoText(instance)):
4574
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4575
                                   " node '%s'" %
4576
                                   (new_lv.logical_id[1], new_node))
4577

    
4578
    # Step 4: dbrd minors and drbd setups changes
4579
    # after this, we must manually remove the drbd minors on both the
4580
    # error and the success paths
4581
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4582
                                   instance.name)
4583
    logging.debug("Allocated minors %s" % (minors,))
4584
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4585
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4586
      size = dev.size
4587
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4588
      # create new devices on new_node
4589
      if pri_node == dev.logical_id[0]:
4590
        new_logical_id = (pri_node, new_node,
4591
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4592
                          dev.logical_id[5])
4593
      else:
4594
        new_logical_id = (new_node, pri_node,
4595
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4596
                          dev.logical_id[5])
4597
      iv_names[idx] = (dev, dev.children, new_logical_id)
4598
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4599
                    new_logical_id)
4600
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4601
                              logical_id=new_logical_id,
4602
                              children=dev.children)
4603
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4604
                                        new_drbd, False,
4605
                                        _GetInstanceInfoText(instance)):
4606
        self.cfg.ReleaseDRBDMinors(instance.name)
4607
        raise errors.OpExecError("Failed to create new DRBD on"
4608
                                 " node '%s'" % new_node)
4609

    
4610
    for idx, dev in enumerate(instance.disks):
4611
      # we have new devices, shutdown the drbd on the old secondary
4612
      info("shutting down drbd for disk/%d on old node" % idx)
4613
      cfg.SetDiskID(dev, old_node)
4614
      result = self.rpc.call_blockdev_shutdown(old_node, dev)
4615
      if result.failed or not result.data:
4616
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4617
                hint="Please cleanup this device manually as soon as possible")
4618

    
4619
    info("detaching primary drbds from the network (=> standalone)")
4620
    done = 0
4621
    for idx, dev in enumerate(instance.disks):
4622
      cfg.SetDiskID(dev, pri_node)
4623
      # set the network part of the physical (unique in bdev terms) id
4624
      # to None, meaning detach from network
4625
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4626
      # and 'find' the device, which will 'fix' it to match the
4627
      # standalone state
4628
      result = self.rpc.call_blockdev_find(pri_node, dev)
4629
      if not result.failed and result.data:
4630
        done += 1
4631
      else:
4632
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4633
                idx)
4634

    
4635
    if not done:
4636
      # no detaches succeeded (very unlikely)
4637
      self.cfg.ReleaseDRBDMinors(instance.name)
4638
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4639

    
4640
    # if we managed to detach at least one, we update all the disks of
4641
    # the instance to point to the new secondary
4642
    info("updating instance configuration")
4643
    for dev, _, new_logical_id in iv_names.itervalues():
4644
      dev.logical_id = new_logical_id
4645
      cfg.SetDiskID(dev, pri_node)
4646
    cfg.Update(instance)
4647
    # we can remove now the temp minors as now the new values are
4648
    # written to the config file (and therefore stable)
4649
    self.cfg.ReleaseDRBDMinors(instance.name)
4650

    
4651
    # and now perform the drbd attach
4652
    info("attaching primary drbds to new secondary (standalone => connected)")
4653
    failures = []
4654
    for idx, dev in enumerate(instance.disks):
4655
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4656
      # since the attach is smart, it's enough to 'find' the device,
4657
      # it will automatically activate the network, if the physical_id
4658
      # is correct
4659
      cfg.SetDiskID(dev, pri_node)
4660
      logging.debug("Disk to attach: %s", dev)
4661
      result = self.rpc.call_blockdev_find(pri_node, dev)
4662
      if result.failed or not result.data:
4663
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4664
                "please do a gnt-instance info to see the status of disks")
4665

    
4666
    # this can fail as the old devices are degraded and _WaitForSync
4667
    # does a combined result over all disks, so we don't check its
4668
    # return value
4669
    self.proc.LogStep(5, steps_total, "sync devices")
4670
    _WaitForSync(self, instance, unlock=True)
4671

    
4672
    # so check manually all the devices
4673
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4674
      cfg.SetDiskID(dev, pri_node)
4675
      result = self.rpc.call_blockdev_find(pri_node, dev)
4676
      result.Raise()
4677
      if result.data[5]:
4678
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4679

    
4680
    self.proc.LogStep(6, steps_total, "removing old storage")
4681
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4682
      info("remove logical volumes for disk/%d" % idx)
4683
      for lv in old_lvs:
4684
        cfg.SetDiskID(lv, old_node)
4685
        result = self.rpc.call_blockdev_remove(old_node, lv)
4686
        if result.failed or not result.data:
4687
          warning("Can't remove LV on old secondary",
4688
                  hint="Cleanup stale volumes by hand")
4689

    
4690
  def Exec(self, feedback_fn):
4691
    """Execute disk replacement.
4692

4693
    This dispatches the disk replacement to the appropriate handler.
4694

4695
    """
4696
    instance = self.instance
4697

    
4698
    # Activate the instance disks if we're replacing them on a down instance
4699
    if instance.status == "down":
4700
      _StartInstanceDisks(self, instance, True)
4701

    
4702
    if instance.disk_template == constants.DT_DRBD8:
4703
      if self.op.remote_node is None:
4704
        fn = self._ExecD8DiskOnly
4705
      else:
4706
        fn = self._ExecD8Secondary
4707
    else:
4708
      raise errors.ProgrammerError("Unhandled disk replacement case")
4709

    
4710
    ret = fn(feedback_fn)
4711

    
4712
    # Deactivate the instance disks if we're replacing them on a down instance
4713
    if instance.status == "down":
4714
      _SafeShutdownInstanceDisks(self, instance)
4715

    
4716
    return ret
4717

    
4718

    
4719
class LUGrowDisk(LogicalUnit):
4720
  """Grow a disk of an instance.
4721

4722
  """
4723
  HPATH = "disk-grow"
4724
  HTYPE = constants.HTYPE_INSTANCE
4725
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4726
  REQ_BGL = False
4727

    
4728
  def ExpandNames(self):
4729
    self._ExpandAndLockInstance()
4730
    self.needed_locks[locking.LEVEL_NODE] = []
4731
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4732

    
4733
  def DeclareLocks(self, level):
4734
    if level == locking.LEVEL_NODE:
4735
      self._LockInstancesNodes()
4736

    
4737
  def BuildHooksEnv(self):
4738
    """Build hooks env.
4739

4740
    This runs on the master, the primary and all the secondaries.
4741

4742
    """
4743
    env = {
4744
      "DISK": self.op.disk,
4745
      "AMOUNT": self.op.amount,
4746
      }
4747
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4748
    nl = [
4749
      self.cfg.GetMasterNode(),
4750
      self.instance.primary_node,
4751
      ]
4752
    return env, nl, nl
4753

    
4754
  def CheckPrereq(self):
4755
    """Check prerequisites.
4756

4757
    This checks that the instance is in the cluster.
4758

4759
    """
4760
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4761
    assert instance is not None, \
4762
      "Cannot retrieve locked instance %s" % self.op.instance_name
4763

    
4764
    self.instance = instance
4765

    
4766
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4767
      raise errors.OpPrereqError("Instance's disk layout does not support"
4768
                                 " growing.")
4769

    
4770
    self.disk = instance.FindDisk(self.op.disk)
4771

    
4772
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4773
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4774
                                       instance.hypervisor)
4775
    for node in nodenames:
4776
      info = nodeinfo[node]
4777
      if info.failed or not info.data:
4778
        raise errors.OpPrereqError("Cannot get current information"
4779
                                   " from node '%s'" % node)
4780
      vg_free = info.data.get('vg_free', None)
4781
      if not isinstance(vg_free, int):
4782
        raise errors.OpPrereqError("Can't compute free disk space on"
4783
                                   " node %s" % node)
4784
      if self.op.amount > vg_free:
4785
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4786
                                   " %d MiB available, %d MiB required" %
4787
                                   (node, vg_free, self.op.amount))
4788

    
4789
  def Exec(self, feedback_fn):
4790
    """Execute disk grow.
4791

4792
    """
4793
    instance = self.instance
4794
    disk = self.disk
4795
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4796
      self.cfg.SetDiskID(disk, node)
4797
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4798
      result.Raise()
4799
      if (not result.data or not isinstance(result.data, (list, tuple)) or
4800
          len(result.data) != 2):
4801
        raise errors.OpExecError("Grow request failed to node %s" % node)
4802
      elif not result.data[0]:
4803
        raise errors.OpExecError("Grow request failed to node %s: %s" %
4804
                                 (node, result.data[1]))
4805
    disk.RecordGrow(self.op.amount)
4806
    self.cfg.Update(instance)
4807
    if self.op.wait_for_sync:
4808
      disk_abort = not _WaitForSync(self, instance)
4809
      if disk_abort:
4810
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4811
                             " status.\nPlease check the instance.")
4812

    
4813

    
4814
class LUQueryInstanceData(NoHooksLU):
4815
  """Query runtime instance data.
4816

4817
  """
4818
  _OP_REQP = ["instances", "static"]
4819
  REQ_BGL = False
4820

    
4821
  def ExpandNames(self):
4822
    self.needed_locks = {}
4823
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4824

    
4825
    if not isinstance(self.op.instances, list):
4826
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4827

    
4828
    if self.op.instances:
4829
      self.wanted_names = []
4830
      for name in self.op.instances:
4831
        full_name = self.cfg.ExpandInstanceName(name)
4832
        if full_name is None:
4833
          raise errors.OpPrereqError("Instance '%s' not known" %
4834
                                     self.op.instance_name)
4835
        self.wanted_names.append(full_name)
4836
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4837
    else:
4838
      self.wanted_names = None
4839
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4840

    
4841
    self.needed_locks[locking.LEVEL_NODE] = []
4842
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4843

    
4844
  def DeclareLocks(self, level):
4845
    if level == locking.LEVEL_NODE:
4846
      self._LockInstancesNodes()
4847

    
4848
  def CheckPrereq(self):
4849
    """Check prerequisites.
4850

4851
    This only checks the optional instance list against the existing names.
4852

4853
    """
4854
    if self.wanted_names is None:
4855
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4856

    
4857
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4858
                             in self.wanted_names]
4859
    return
4860

    
4861
  def _ComputeDiskStatus(self, instance, snode, dev):
4862
    """Compute block device status.
4863

4864
    """
4865
    static = self.op.static
4866
    if not static:
4867
      self.cfg.SetDiskID(dev, instance.primary_node)
4868
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4869
      dev_pstatus.Raise()
4870
      dev_pstatus = dev_pstatus.data
4871
    else:
4872
      dev_pstatus = None
4873

    
4874
    if dev.dev_type in constants.LDS_DRBD:
4875
      # we change the snode then (otherwise we use the one passed in)
4876
      if dev.logical_id[0] == instance.primary_node:
4877
        snode = dev.logical_id[1]
4878
      else:
4879
        snode = dev.logical_id[0]
4880

    
4881
    if snode and not static:
4882
      self.cfg.SetDiskID(dev, snode)
4883
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4884
      dev_sstatus.Raise()
4885
      dev_sstatus = dev_sstatus.data
4886
    else:
4887
      dev_sstatus = None
4888

    
4889
    if dev.children:
4890
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4891
                      for child in dev.children]
4892
    else:
4893
      dev_children = []
4894

    
4895
    data = {
4896
      "iv_name": dev.iv_name,
4897
      "dev_type": dev.dev_type,
4898
      "logical_id": dev.logical_id,
4899
      "physical_id": dev.physical_id,
4900
      "pstatus": dev_pstatus,
4901
      "sstatus": dev_sstatus,
4902
      "children": dev_children,
4903
      "mode": dev.mode,
4904
      }
4905

    
4906
    return data
4907

    
4908
  def Exec(self, feedback_fn):
4909
    """Gather and return data"""
4910
    result = {}
4911

    
4912
    cluster = self.cfg.GetClusterInfo()
4913

    
4914
    for instance in self.wanted_instances:
4915
      if not self.op.static:
4916
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4917
                                                  instance.name,
4918
                                                  instance.hypervisor)
4919
        remote_info.Raise()
4920
        remote_info = remote_info.data
4921
        if remote_info and "state" in remote_info:
4922
          remote_state = "up"
4923
        else:
4924
          remote_state = "down"
4925
      else:
4926
        remote_state = None
4927
      if instance.status == "down":
4928
        config_state = "down"
4929
      else:
4930
        config_state = "up"
4931

    
4932
      disks = [self._ComputeDiskStatus(instance, None, device)
4933
               for device in instance.disks]
4934

    
4935
      idict = {
4936
        "name": instance.name,
4937
        "config_state": config_state,
4938
        "run_state": remote_state,
4939
        "pnode": instance.primary_node,
4940
        "snodes": instance.secondary_nodes,
4941
        "os": instance.os,
4942
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4943
        "disks": disks,
4944
        "hypervisor": instance.hypervisor,
4945
        "network_port": instance.network_port,
4946
        "hv_instance": instance.hvparams,
4947
        "hv_actual": cluster.FillHV(instance),
4948
        "be_instance": instance.beparams,
4949
        "be_actual": cluster.FillBE(instance),
4950
        }
4951

    
4952
      result[instance.name] = idict
4953

    
4954
    return result
4955

    
4956

    
4957
class LUSetInstanceParams(LogicalUnit):
4958
  """Modifies an instances's parameters.
4959

4960
  """
4961
  HPATH = "instance-modify"
4962
  HTYPE = constants.HTYPE_INSTANCE
4963
  _OP_REQP = ["instance_name"]
4964
  REQ_BGL = False
4965

    
4966
  def CheckArguments(self):
4967
    if not hasattr(self.op, 'nics'):
4968
      self.op.nics = []
4969
    if not hasattr(self.op, 'disks'):
4970
      self.op.disks = []
4971
    if not hasattr(self.op, 'beparams'):
4972
      self.op.beparams = {}
4973
    if not hasattr(self.op, 'hvparams'):
4974
      self.op.hvparams = {}
4975
    self.op.force = getattr(self.op, "force", False)
4976
    if not (self.op.nics or self.op.disks or
4977
            self.op.hvparams or self.op.beparams):
4978
      raise errors.OpPrereqError("No changes submitted")
4979

    
4980
    utils.CheckBEParams(self.op.beparams)
4981

    
4982
    # Disk validation
4983
    disk_addremove = 0
4984
    for disk_op, disk_dict in self.op.disks:
4985
      if disk_op == constants.DDM_REMOVE:
4986
        disk_addremove += 1
4987
        continue
4988
      elif disk_op == constants.DDM_ADD:
4989
        disk_addremove += 1
4990
      else:
4991
        if not isinstance(disk_op, int):
4992
          raise errors.OpPrereqError("Invalid disk index")
4993
      if disk_op == constants.DDM_ADD:
4994
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4995
        if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4996
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4997
        size = disk_dict.get('size', None)
4998
        if size is None:
4999
          raise errors.OpPrereqError("Required disk parameter size missing")
5000
        try:
5001
          size = int(size)
5002
        except ValueError, err:
5003
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5004
                                     str(err))
5005
        disk_dict['size'] = size
5006
      else:
5007
        # modification of disk
5008
        if 'size' in disk_dict:
5009
          raise errors.OpPrereqError("Disk size change not possible, use"
5010
                                     " grow-disk")
5011

    
5012
    if disk_addremove > 1:
5013
      raise errors.OpPrereqError("Only one disk add or remove operation"
5014
                                 " supported at a time")
5015

    
5016
    # NIC validation
5017
    nic_addremove = 0
5018
    for nic_op, nic_dict in self.op.nics:
5019
      if nic_op == constants.DDM_REMOVE:
5020
        nic_addremove += 1
5021
        continue
5022
      elif nic_op == constants.DDM_ADD:
5023
        nic_addremove += 1
5024
      else:
5025
        if not isinstance(nic_op, int):
5026
          raise errors.OpPrereqError("Invalid nic index")
5027

    
5028
      # nic_dict should be a dict
5029
      nic_ip = nic_dict.get('ip', None)
5030
      if nic_ip is not None:
5031
        if nic_ip.lower() == "none":
5032
          nic_dict['ip'] = None
5033
        else:
5034
          if not utils.IsValidIP(nic_ip):
5035
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5036
      # we can only check None bridges and assign the default one
5037
      nic_bridge = nic_dict.get('bridge', None)
5038
      if nic_bridge is None:
5039
        nic_dict['bridge'] = self.cfg.GetDefBridge()
5040
      # but we can validate MACs
5041
      nic_mac = nic_dict.get('mac', None)
5042
      if nic_mac is not None:
5043
        if self.cfg.IsMacInUse(nic_mac):
5044
          raise errors.OpPrereqError("MAC address %s already in use"
5045
                                     " in cluster" % nic_mac)
5046
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5047
          if not utils.IsValidMac(nic_mac):
5048
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5049
    if nic_addremove > 1:
5050
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5051
                                 " supported at a time")
5052

    
5053
  def ExpandNames(self):
5054
    self._ExpandAndLockInstance()
5055
    self.needed_locks[locking.LEVEL_NODE] = []
5056
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5057

    
5058
  def DeclareLocks(self, level):
5059
    if level == locking.LEVEL_NODE:
5060
      self._LockInstancesNodes()
5061

    
5062
  def BuildHooksEnv(self):
5063
    """Build hooks env.
5064

5065
    This runs on the master, primary and secondaries.
5066

5067
    """
5068
    args = dict()
5069
    if constants.BE_MEMORY in self.be_new:
5070
      args['memory'] = self.be_new[constants.BE_MEMORY]
5071
    if constants.BE_VCPUS in self.be_new:
5072
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5073
    # FIXME: readd disk/nic changes
5074
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5075
    nl = [self.cfg.GetMasterNode(),
5076
          self.instance.primary_node] + list(self.instance.secondary_nodes)
5077
    return env, nl, nl
5078

    
5079
  def CheckPrereq(self):
5080
    """Check prerequisites.
5081

5082
    This only checks the instance list against the existing names.
5083

5084
    """
5085
    force = self.force = self.op.force
5086

    
5087
    # checking the new params on the primary/secondary nodes
5088

    
5089
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5090
    assert self.instance is not None, \
5091
      "Cannot retrieve locked instance %s" % self.op.instance_name
5092
    pnode = self.instance.primary_node
5093
    nodelist = [pnode]
5094
    nodelist.extend(instance.secondary_nodes)
5095

    
5096
    # hvparams processing
5097
    if self.op.hvparams:
5098
      i_hvdict = copy.deepcopy(instance.hvparams)
5099
      for key, val in self.op.hvparams.iteritems():
5100
        if val == constants.VALUE_DEFAULT:
5101
          try:
5102
            del i_hvdict[key]
5103
          except KeyError:
5104
            pass
5105
        elif val == constants.VALUE_NONE:
5106
          i_hvdict[key] = None
5107
        else:
5108
          i_hvdict[key] = val
5109
      cluster = self.cfg.GetClusterInfo()
5110
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5111
                                i_hvdict)
5112
      # local check
5113
      hypervisor.GetHypervisor(
5114
        instance.hypervisor).CheckParameterSyntax(hv_new)
5115
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5116
      self.hv_new = hv_new # the new actual values
5117
      self.hv_inst = i_hvdict # the new dict (without defaults)
5118
    else:
5119
      self.hv_new = self.hv_inst = {}
5120

    
5121
    # beparams processing
5122
    if self.op.beparams:
5123
      i_bedict = copy.deepcopy(instance.beparams)
5124
      for key, val in self.op.beparams.iteritems():
5125
        if val == constants.VALUE_DEFAULT:
5126
          try:
5127
            del i_bedict[key]
5128
          except KeyError:
5129
            pass
5130
        else:
5131
          i_bedict[key] = val
5132
      cluster = self.cfg.GetClusterInfo()
5133
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5134
                                i_bedict)
5135
      self.be_new = be_new # the new actual values
5136
      self.be_inst = i_bedict # the new dict (without defaults)
5137
    else:
5138
      self.be_new = self.be_inst = {}
5139

    
5140
    self.warn = []
5141

    
5142
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5143
      mem_check_list = [pnode]
5144
      if be_new[constants.BE_AUTO_BALANCE]:
5145
        # either we changed auto_balance to yes or it was from before
5146
        mem_check_list.extend(instance.secondary_nodes)
5147
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5148
                                                  instance.hypervisor)
5149
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5150
                                         instance.hypervisor)
5151
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5152
        # Assume the primary node is unreachable and go ahead
5153
        self.warn.append("Can't get info from primary node %s" % pnode)
5154
      else:
5155
        if not instance_info.failed and instance_info.data:
5156
          current_mem = instance_info.data['memory']
5157
        else:
5158
          # Assume instance not running
5159
          # (there is a slight race condition here, but it's not very probable,
5160
          # and we have no other way to check)
5161
          current_mem = 0
5162
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5163
                    nodeinfo[pnode].data['memory_free'])
5164
        if miss_mem > 0:
5165
          raise errors.OpPrereqError("This change will prevent the instance"
5166
                                     " from starting, due to %d MB of memory"
5167
                                     " missing on its primary node" % miss_mem)
5168

    
5169
      if be_new[constants.BE_AUTO_BALANCE]:
5170
        for node, nres in instance.secondary_nodes.iteritems():
5171
          if nres.failed or not isinstance(nres.data, dict):
5172
            self.warn.append("Can't get info from secondary node %s" % node)
5173
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5174
            self.warn.append("Not enough memory to failover instance to"
5175
                             " secondary node %s" % node)
5176

    
5177
    # NIC processing
5178
    for nic_op, nic_dict in self.op.nics:
5179
      if nic_op == constants.DDM_REMOVE:
5180
        if not instance.nics:
5181
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5182
        continue
5183
      if nic_op != constants.DDM_ADD:
5184
        # an existing nic
5185
        if nic_op < 0 or nic_op >= len(instance.nics):
5186
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5187
                                     " are 0 to %d" %
5188
                                     (nic_op, len(instance.nics)))
5189
      nic_bridge = nic_dict.get('bridge', None)
5190
      if nic_bridge is not None:
5191
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5192
          msg = ("Bridge '%s' doesn't exist on one of"
5193
                 " the instance nodes" % nic_bridge)
5194
          if self.force:
5195
            self.warn.append(msg)
5196
          else:
5197
            raise errors.OpPrereqError(msg)
5198

    
5199
    # DISK processing
5200
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5201
      raise errors.OpPrereqError("Disk operations not supported for"
5202
                                 " diskless instances")
5203
    for disk_op, disk_dict in self.op.disks:
5204
      if disk_op == constants.DDM_REMOVE:
5205
        if len(instance.disks) == 1:
5206
          raise errors.OpPrereqError("Cannot remove the last disk of"
5207
                                     " an instance")
5208
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5209
        ins_l = ins_l[pnode]
5210
        if not type(ins_l) is list:
5211
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5212
        if instance.name in ins_l:
5213
          raise errors.OpPrereqError("Instance is running, can't remove"
5214
                                     " disks.")
5215

    
5216
      if (disk_op == constants.DDM_ADD and
5217
          len(instance.nics) >= constants.MAX_DISKS):
5218
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5219
                                   " add more" % constants.MAX_DISKS)
5220
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5221
        # an existing disk
5222
        if disk_op < 0 or disk_op >= len(instance.disks):
5223
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5224
                                     " are 0 to %d" %
5225
                                     (disk_op, len(instance.disks)))
5226

    
5227
    return
5228

    
5229
  def Exec(self, feedback_fn):
5230
    """Modifies an instance.
5231

5232
    All parameters take effect only at the next restart of the instance.
5233

5234
    """
5235
    # Process here the warnings from CheckPrereq, as we don't have a
5236
    # feedback_fn there.
5237
    for warn in self.warn:
5238
      feedback_fn("WARNING: %s" % warn)
5239

    
5240
    result = []
5241
    instance = self.instance
5242
    # disk changes
5243
    for disk_op, disk_dict in self.op.disks:
5244
      if disk_op == constants.DDM_REMOVE:
5245
        # remove the last disk
5246
        device = instance.disks.pop()
5247
        device_idx = len(instance.disks)
5248
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5249
          self.cfg.SetDiskID(disk, node)
5250
          result = self.rpc.call_blockdev_remove(node, disk)
5251
          if result.failed or not result.data:
5252
            self.proc.LogWarning("Could not remove disk/%d on node %s,"
5253
                                 " continuing anyway", device_idx, node)
5254
        result.append(("disk/%d" % device_idx, "remove"))
5255
      elif disk_op == constants.DDM_ADD:
5256
        # add a new disk
5257
        if instance.disk_template == constants.DT_FILE:
5258
          file_driver, file_path = instance.disks[0].logical_id
5259
          file_path = os.path.dirname(file_path)
5260
        else:
5261
          file_driver = file_path = None
5262
        disk_idx_base = len(instance.disks)
5263
        new_disk = _GenerateDiskTemplate(self,
5264
                                         instance.disk_template,
5265
                                         instance, instance.primary_node,
5266
                                         instance.secondary_nodes,
5267
                                         [disk_dict],
5268
                                         file_path,
5269
                                         file_driver,
5270
                                         disk_idx_base)[0]
5271
        new_disk.mode = disk_dict['mode']
5272
        instance.disks.append(new_disk)
5273
        info = _GetInstanceInfoText(instance)
5274

    
5275
        logging.info("Creating volume %s for instance %s",
5276
                     new_disk.iv_name, instance.name)
5277
        # Note: this needs to be kept in sync with _CreateDisks
5278
        #HARDCODE
5279
        for secondary_node in instance.secondary_nodes:
5280
          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5281
                                            new_disk, False, info):
5282
            self.LogWarning("Failed to create volume %s (%s) on"
5283
                            " secondary node %s!",
5284
                            new_disk.iv_name, new_disk, secondary_node)
5285
        #HARDCODE
5286
        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5287
                                        instance, new_disk, info):
5288
          self.LogWarning("Failed to create volume %s on primary!",
5289
                          new_disk.iv_name)
5290
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5291
                       (new_disk.size, new_disk.mode)))
5292
      else:
5293
        # change a given disk
5294
        instance.disks[disk_op].mode = disk_dict['mode']
5295
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5296
    # NIC changes
5297
    for nic_op, nic_dict in self.op.nics:
5298
      if nic_op == constants.DDM_REMOVE:
5299
        # remove the last nic
5300
        del instance.nics[-1]
5301
        result.append(("nic.%d" % len(instance.nics), "remove"))
5302
      elif nic_op == constants.DDM_ADD:
5303
        # add a new nic
5304
        if 'mac' not in nic_dict:
5305
          mac = constants.VALUE_GENERATE
5306
        else:
5307
          mac = nic_dict['mac']
5308
        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5309
          mac = self.cfg.GenerateMAC()
5310
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5311
                              bridge=nic_dict.get('bridge', None))
5312
        instance.nics.append(new_nic)
5313
        result.append(("nic.%d" % (len(instance.nics) - 1),
5314
                       "add:mac=%s,ip=%s,bridge=%s" %
5315
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
5316
      else:
5317
        # change a given nic
5318
        for key in 'mac', 'ip', 'bridge':
5319
          if key in nic_dict:
5320
            setattr(instance.nics[nic_op], key, nic_dict[key])
5321
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5322

    
5323
    # hvparams changes
5324
    if self.op.hvparams:
5325
      instance.hvparams = self.hv_new
5326
      for key, val in self.op.hvparams.iteritems():
5327
        result.append(("hv/%s" % key, val))
5328

    
5329
    # beparams changes
5330
    if self.op.beparams:
5331
      instance.beparams = self.be_inst
5332
      for key, val in self.op.beparams.iteritems():
5333
        result.append(("be/%s" % key, val))
5334

    
5335
    self.cfg.Update(instance)
5336

    
5337
    return result
5338

    
5339

    
5340
class LUQueryExports(NoHooksLU):
5341
  """Query the exports list
5342

5343
  """
5344
  _OP_REQP = ['nodes']
5345
  REQ_BGL = False
5346

    
5347
  def ExpandNames(self):
5348
    self.needed_locks = {}
5349
    self.share_locks[locking.LEVEL_NODE] = 1
5350
    if not self.op.nodes:
5351
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5352
    else:
5353
      self.needed_locks[locking.LEVEL_NODE] = \
5354
        _GetWantedNodes(self, self.op.nodes)
5355

    
5356
  def CheckPrereq(self):
5357
    """Check prerequisites.
5358

5359
    """
5360
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5361

    
5362
  def Exec(self, feedback_fn):
5363
    """Compute the list of all the exported system images.
5364

5365
    @rtype: dict
5366
    @return: a dictionary with the structure node->(export-list)
5367
        where export-list is a list of the instances exported on
5368
        that node.
5369

5370
    """
5371
    rpcresult = self.rpc.call_export_list(self.nodes)
5372
    result = {}
5373
    for node in rpcresult:
5374
      if rpcresult[node].failed:
5375
        result[node] = False
5376
      else:
5377
        result[node] = rpcresult[node].data
5378

    
5379
    return result
5380

    
5381

    
5382
class LUExportInstance(LogicalUnit):
5383
  """Export an instance to an image in the cluster.
5384

5385
  """
5386
  HPATH = "instance-export"
5387
  HTYPE = constants.HTYPE_INSTANCE
5388
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
5389
  REQ_BGL = False
5390

    
5391
  def ExpandNames(self):
5392
    self._ExpandAndLockInstance()
5393
    # FIXME: lock only instance primary and destination node
5394
    #
5395
    # Sad but true, for now we have do lock all nodes, as we don't know where
5396
    # the previous export might be, and and in this LU we search for it and
5397
    # remove it from its current node. In the future we could fix this by:
5398
    #  - making a tasklet to search (share-lock all), then create the new one,
5399
    #    then one to remove, after
5400
    #  - removing the removal operation altoghether
5401
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5402

    
5403
  def DeclareLocks(self, level):
5404
    """Last minute lock declaration."""
5405
    # All nodes are locked anyway, so nothing to do here.
5406

    
5407
  def BuildHooksEnv(self):
5408
    """Build hooks env.
5409

5410
    This will run on the master, primary node and target node.
5411

5412
    """
5413
    env = {
5414
      "EXPORT_NODE": self.op.target_node,
5415
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5416
      }
5417
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5418
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5419
          self.op.target_node]
5420
    return env, nl, nl
5421

    
5422
  def CheckPrereq(self):
5423
    """Check prerequisites.
5424

5425
    This checks that the instance and node names are valid.
5426

5427
    """
5428
    instance_name = self.op.instance_name
5429
    self.instance = self.cfg.GetInstanceInfo(instance_name)
5430
    assert self.instance is not None, \
5431
          "Cannot retrieve locked instance %s" % self.op.instance_name
5432

    
5433
    self.dst_node = self.cfg.GetNodeInfo(
5434
      self.cfg.ExpandNodeName(self.op.target_node))
5435

    
5436
    if self.dst_node is None:
5437
      # This is wrong node name, not a non-locked node
5438
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5439

    
5440
    # instance disk type verification
5441
    for disk in self.instance.disks:
5442
      if disk.dev_type == constants.LD_FILE:
5443
        raise errors.OpPrereqError("Export not supported for instances with"
5444
                                   " file-based disks")
5445

    
5446
  def Exec(self, feedback_fn):
5447
    """Export an instance to an image in the cluster.
5448

5449
    """
5450
    instance = self.instance
5451
    dst_node = self.dst_node
5452
    src_node = instance.primary_node
5453
    if self.op.shutdown:
5454
      # shutdown the instance, but not the disks
5455
      result = self.rpc.call_instance_shutdown(src_node, instance)
5456
      result.Raise()
5457
      if not result.data:
5458
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5459
                                 (instance.name, src_node))
5460

    
5461
    vgname = self.cfg.GetVGName()
5462

    
5463
    snap_disks = []
5464

    
5465
    try:
5466
      for disk in instance.disks:
5467
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5468
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5469
        if new_dev_name.failed or not new_dev_name.data:
5470
          self.LogWarning("Could not snapshot block device %s on node %s",
5471
                          disk.logical_id[1], src_node)
5472
          snap_disks.append(False)
5473
        else:
5474
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5475
                                 logical_id=(vgname, new_dev_name.data),
5476
                                 physical_id=(vgname, new_dev_name.data),
5477
                                 iv_name=disk.iv_name)
5478
          snap_disks.append(new_dev)
5479

    
5480
    finally:
5481
      if self.op.shutdown and instance.status == "up":
5482
        result = self.rpc.call_instance_start(src_node, instance, None)
5483
        if result.failed or not result.data:
5484
          _ShutdownInstanceDisks(self, instance)
5485
          raise errors.OpExecError("Could not start instance")
5486

    
5487
    # TODO: check for size
5488

    
5489
    cluster_name = self.cfg.GetClusterName()
5490
    for idx, dev in enumerate(snap_disks):
5491
      if dev:
5492
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5493
                                               instance, cluster_name, idx)
5494
        if result.failed or not result.data:
5495
          self.LogWarning("Could not export block device %s from node %s to"
5496
                          " node %s", dev.logical_id[1], src_node,
5497
                          dst_node.name)
5498
        result = self.rpc.call_blockdev_remove(src_node, dev)
5499
        if result.failed or not result.data:
5500
          self.LogWarning("Could not remove snapshot block device %s from node"
5501
                          " %s", dev.logical_id[1], src_node)
5502

    
5503
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5504
    if result.failed or not result.data:
5505
      self.LogWarning("Could not finalize export for instance %s on node %s",
5506
                      instance.name, dst_node.name)
5507

    
5508
    nodelist = self.cfg.GetNodeList()
5509
    nodelist.remove(dst_node.name)
5510

    
5511
    # on one-node clusters nodelist will be empty after the removal
5512
    # if we proceed the backup would be removed because OpQueryExports
5513
    # substitutes an empty list with the full cluster node list.
5514
    if nodelist:
5515
      exportlist = self.rpc.call_export_list(nodelist)
5516
      for node in exportlist:
5517
        if exportlist[node].failed:
5518
          continue
5519
        if instance.name in exportlist[node].data:
5520
          if not self.rpc.call_export_remove(node, instance.name):
5521
            self.LogWarning("Could not remove older export for instance %s"
5522
                            " on node %s", instance.name, node)
5523

    
5524

    
5525
class LURemoveExport(NoHooksLU):
5526
  """Remove exports related to the named instance.
5527

5528
  """
5529
  _OP_REQP = ["instance_name"]
5530
  REQ_BGL = False
5531

    
5532
  def ExpandNames(self):
5533
    self.needed_locks = {}
5534
    # We need all nodes to be locked in order for RemoveExport to work, but we
5535
    # don't need to lock the instance itself, as nothing will happen to it (and
5536
    # we can remove exports also for a removed instance)
5537
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5538

    
5539
  def CheckPrereq(self):
5540
    """Check prerequisites.
5541
    """
5542
    pass
5543

    
5544
  def Exec(self, feedback_fn):
5545
    """Remove any export.
5546

5547
    """
5548
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5549
    # If the instance was not found we'll try with the name that was passed in.
5550
    # This will only work if it was an FQDN, though.
5551
    fqdn_warn = False
5552
    if not instance_name:
5553
      fqdn_warn = True
5554
      instance_name = self.op.instance_name
5555

    
5556
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5557
      locking.LEVEL_NODE])
5558
    found = False
5559
    for node in exportlist:
5560
      if exportlist[node].failed:
5561
        self.LogWarning("Failed to query node %s, continuing" % node)
5562
        continue
5563
      if instance_name in exportlist[node].data:
5564
        found = True
5565
        result = self.rpc.call_export_remove(node, instance_name)
5566
        if result.failed or not result.data:
5567
          logging.error("Could not remove export for instance %s"
5568
                        " on node %s", instance_name, node)
5569

    
5570
    if fqdn_warn and not found:
5571
      feedback_fn("Export not found. If trying to remove an export belonging"
5572
                  " to a deleted instance please use its Fully Qualified"
5573
                  " Domain Name.")
5574

    
5575

    
5576
class TagsLU(NoHooksLU):
5577
  """Generic tags LU.
5578

5579
  This is an abstract class which is the parent of all the other tags LUs.
5580

5581
  """
5582

    
5583
  def ExpandNames(self):
5584
    self.needed_locks = {}
5585
    if self.op.kind == constants.TAG_NODE:
5586
      name = self.cfg.ExpandNodeName(self.op.name)
5587
      if name is None:
5588
        raise errors.OpPrereqError("Invalid node name (%s)" %
5589
                                   (self.op.name,))
5590
      self.op.name = name
5591
      self.needed_locks[locking.LEVEL_NODE] = name
5592
    elif self.op.kind == constants.TAG_INSTANCE:
5593
      name = self.cfg.ExpandInstanceName(self.op.name)
5594
      if name is None:
5595
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5596
                                   (self.op.name,))
5597
      self.op.name = name
5598
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5599

    
5600
  def CheckPrereq(self):
5601
    """Check prerequisites.
5602

5603
    """
5604
    if self.op.kind == constants.TAG_CLUSTER:
5605
      self.target = self.cfg.GetClusterInfo()
5606
    elif self.op.kind == constants.TAG_NODE:
5607
      self.target = self.cfg.GetNodeInfo(self.op.name)
5608
    elif self.op.kind == constants.TAG_INSTANCE:
5609
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5610
    else:
5611
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5612
                                 str(self.op.kind))
5613

    
5614

    
5615
class LUGetTags(TagsLU):
5616
  """Returns the tags of a given object.
5617

5618
  """
5619
  _OP_REQP = ["kind", "name"]
5620
  REQ_BGL = False
5621

    
5622
  def Exec(self, feedback_fn):
5623
    """Returns the tag list.
5624

5625
    """
5626
    return list(self.target.GetTags())
5627

    
5628

    
5629
class LUSearchTags(NoHooksLU):
5630
  """Searches the tags for a given pattern.
5631

5632
  """
5633
  _OP_REQP = ["pattern"]
5634
  REQ_BGL = False
5635

    
5636
  def ExpandNames(self):
5637
    self.needed_locks = {}
5638

    
5639
  def CheckPrereq(self):
5640
    """Check prerequisites.
5641

5642
    This checks the pattern passed for validity by compiling it.
5643

5644
    """
5645
    try:
5646
      self.re = re.compile(self.op.pattern)
5647
    except re.error, err:
5648
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5649
                                 (self.op.pattern, err))
5650

    
5651
  def Exec(self, feedback_fn):
5652
    """Returns the tag list.
5653

5654
    """
5655
    cfg = self.cfg
5656
    tgts = [("/cluster", cfg.GetClusterInfo())]
5657
    ilist = cfg.GetAllInstancesInfo().values()
5658
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5659
    nlist = cfg.GetAllNodesInfo().values()
5660
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5661
    results = []
5662
    for path, target in tgts:
5663
      for tag in target.GetTags():
5664
        if self.re.search(tag):
5665
          results.append((path, tag))
5666
    return results
5667

    
5668

    
5669
class LUAddTags(TagsLU):
5670
  """Sets a tag on a given object.
5671

5672
  """
5673
  _OP_REQP = ["kind", "name", "tags"]
5674
  REQ_BGL = False
5675

    
5676
  def CheckPrereq(self):
5677
    """Check prerequisites.
5678

5679
    This checks the type and length of the tag name and value.
5680

5681
    """
5682
    TagsLU.CheckPrereq(self)
5683
    for tag in self.op.tags:
5684
      objects.TaggableObject.ValidateTag(tag)
5685

    
5686
  def Exec(self, feedback_fn):
5687
    """Sets the tag.
5688

5689
    """
5690
    try:
5691
      for tag in self.op.tags:
5692
        self.target.AddTag(tag)
5693
    except errors.TagError, err:
5694
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5695
    try:
5696
      self.cfg.Update(self.target)
5697
    except errors.ConfigurationError:
5698
      raise errors.OpRetryError("There has been a modification to the"
5699
                                " config file and the operation has been"
5700
                                " aborted. Please retry.")
5701

    
5702

    
5703
class LUDelTags(TagsLU):
5704
  """Delete a list of tags from a given object.
5705

5706
  """
5707
  _OP_REQP = ["kind", "name", "tags"]
5708
  REQ_BGL = False
5709

    
5710
  def CheckPrereq(self):
5711
    """Check prerequisites.
5712

5713
    This checks that we have the given tag.
5714

5715
    """
5716
    TagsLU.CheckPrereq(self)
5717
    for tag in self.op.tags:
5718
      objects.TaggableObject.ValidateTag(tag)
5719
    del_tags = frozenset(self.op.tags)
5720
    cur_tags = self.target.GetTags()
5721
    if not del_tags <= cur_tags:
5722
      diff_tags = del_tags - cur_tags
5723
      diff_names = ["'%s'" % tag for tag in diff_tags]
5724
      diff_names.sort()
5725
      raise errors.OpPrereqError("Tag(s) %s not found" %
5726
                                 (",".join(diff_names)))
5727

    
5728
  def Exec(self, feedback_fn):
5729
    """Remove the tag from the object.
5730

5731
    """
5732
    for tag in self.op.tags:
5733
      self.target.RemoveTag(tag)
5734
    try:
5735
      self.cfg.Update(self.target)
5736
    except errors.ConfigurationError:
5737
      raise errors.OpRetryError("There has been a modification to the"
5738
                                " config file and the operation has been"
5739
                                " aborted. Please retry.")
5740

    
5741

    
5742
class LUTestDelay(NoHooksLU):
5743
  """Sleep for a specified amount of time.
5744

5745
  This LU sleeps on the master and/or nodes for a specified amount of
5746
  time.
5747

5748
  """
5749
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5750
  REQ_BGL = False
5751

    
5752
  def ExpandNames(self):
5753
    """Expand names and set required locks.
5754

5755
    This expands the node list, if any.
5756

5757
    """
5758
    self.needed_locks = {}
5759
    if self.op.on_nodes:
5760
      # _GetWantedNodes can be used here, but is not always appropriate to use
5761
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5762
      # more information.
5763
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5764
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5765

    
5766
  def CheckPrereq(self):
5767
    """Check prerequisites.
5768

5769
    """
5770

    
5771
  def Exec(self, feedback_fn):
5772
    """Do the actual sleep.
5773

5774
    """
5775
    if self.op.on_master:
5776
      if not utils.TestDelay(self.op.duration):
5777
        raise errors.OpExecError("Error during master delay test")
5778
    if self.op.on_nodes:
5779
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5780
      if not result:
5781
        raise errors.OpExecError("Complete failure from rpc call")
5782
      for node, node_result in result.items():
5783
        node_result.Raise()
5784
        if not node_result.data:
5785
          raise errors.OpExecError("Failure during rpc call to node %s,"
5786
                                   " result: %s" % (node, node_result.data))
5787

    
5788

    
5789
class IAllocator(object):
5790
  """IAllocator framework.
5791

5792
  An IAllocator instance has three sets of attributes:
5793
    - cfg that is needed to query the cluster
5794
    - input data (all members of the _KEYS class attribute are required)
5795
    - four buffer attributes (in|out_data|text), that represent the
5796
      input (to the external script) in text and data structure format,
5797
      and the output from it, again in two formats
5798
    - the result variables from the script (success, info, nodes) for
5799
      easy usage
5800

5801
  """
5802
  _ALLO_KEYS = [
5803
    "mem_size", "disks", "disk_template",
5804
    "os", "tags", "nics", "vcpus", "hypervisor",
5805
    ]
5806
  _RELO_KEYS = [
5807
    "relocate_from",
5808
    ]
5809

    
5810
  def __init__(self, lu, mode, name, **kwargs):
5811
    self.lu = lu
5812
    # init buffer variables
5813
    self.in_text = self.out_text = self.in_data = self.out_data = None
5814
    # init all input fields so that pylint is happy
5815
    self.mode = mode
5816
    self.name = name
5817
    self.mem_size = self.disks = self.disk_template = None
5818
    self.os = self.tags = self.nics = self.vcpus = None
5819
    self.relocate_from = None
5820
    # computed fields
5821
    self.required_nodes = None
5822
    # init result fields
5823
    self.success = self.info = self.nodes = None
5824
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5825
      keyset = self._ALLO_KEYS
5826
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5827
      keyset = self._RELO_KEYS
5828
    else:
5829
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5830
                                   " IAllocator" % self.mode)
5831
    for key in kwargs:
5832
      if key not in keyset:
5833
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5834
                                     " IAllocator" % key)
5835
      setattr(self, key, kwargs[key])
5836
    for key in keyset:
5837
      if key not in kwargs:
5838
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5839
                                     " IAllocator" % key)
5840
    self._BuildInputData()
5841

    
5842
  def _ComputeClusterData(self):
5843
    """Compute the generic allocator input data.
5844

5845
    This is the data that is independent of the actual operation.
5846

5847
    """
5848
    cfg = self.lu.cfg
5849
    cluster_info = cfg.GetClusterInfo()
5850
    # cluster data
5851
    data = {
5852
      "version": 1,
5853
      "cluster_name": cfg.GetClusterName(),
5854
      "cluster_tags": list(cluster_info.GetTags()),
5855
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5856
      # we don't have job IDs
5857
      }
5858
    iinfo = cfg.GetAllInstancesInfo().values()
5859
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5860

    
5861
    # node data
5862
    node_results = {}
5863
    node_list = cfg.GetNodeList()
5864

    
5865
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5866
      hypervisor = self.hypervisor
5867
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5868
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5869

    
5870
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5871
                                           hypervisor)
5872
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5873
                       cluster_info.enabled_hypervisors)
5874
    for nname in node_list:
5875
      ninfo = cfg.GetNodeInfo(nname)
5876
      node_data[nname].Raise()
5877
      if not isinstance(node_data[nname].data, dict):
5878
        raise errors.OpExecError("Can't get data for node %s" % nname)
5879
      remote_info = node_data[nname].data
5880
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5881
                   'vg_size', 'vg_free', 'cpu_total']:
5882
        if attr not in remote_info:
5883
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5884
                                   (nname, attr))
5885
        try:
5886
          remote_info[attr] = int(remote_info[attr])
5887
        except ValueError, err:
5888
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5889
                                   " %s" % (nname, attr, str(err)))
5890
      # compute memory used by primary instances
5891
      i_p_mem = i_p_up_mem = 0
5892
      for iinfo, beinfo in i_list:
5893
        if iinfo.primary_node == nname:
5894
          i_p_mem += beinfo[constants.BE_MEMORY]
5895
          if iinfo.name not in node_iinfo[nname]:
5896
            i_used_mem = 0
5897
          else:
5898
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5899
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5900
          remote_info['memory_free'] -= max(0, i_mem_diff)
5901

    
5902
          if iinfo.status == "up":
5903
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5904

    
5905
      # compute memory used by instances
5906
      pnr = {
5907
        "tags": list(ninfo.GetTags()),
5908
        "total_memory": remote_info['memory_total'],
5909
        "reserved_memory": remote_info['memory_dom0'],
5910
        "free_memory": remote_info['memory_free'],
5911
        "i_pri_memory": i_p_mem,
5912
        "i_pri_up_memory": i_p_up_mem,
5913
        "total_disk": remote_info['vg_size'],
5914
        "free_disk": remote_info['vg_free'],
5915
        "primary_ip": ninfo.primary_ip,
5916
        "secondary_ip": ninfo.secondary_ip,
5917
        "total_cpus": remote_info['cpu_total'],
5918
        "offline": ninfo.offline,
5919
        }
5920
      node_results[nname] = pnr
5921
    data["nodes"] = node_results
5922

    
5923
    # instance data
5924
    instance_data = {}
5925
    for iinfo, beinfo in i_list:
5926
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5927
                  for n in iinfo.nics]
5928
      pir = {
5929
        "tags": list(iinfo.GetTags()),
5930
        "should_run": iinfo.status == "up",
5931
        "vcpus": beinfo[constants.BE_VCPUS],
5932
        "memory": beinfo[constants.BE_MEMORY],
5933
        "os": iinfo.os,
5934
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5935
        "nics": nic_data,
5936
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5937
        "disk_template": iinfo.disk_template,
5938
        "hypervisor": iinfo.hypervisor,
5939
        }
5940
      instance_data[iinfo.name] = pir
5941

    
5942
    data["instances"] = instance_data
5943

    
5944
    self.in_data = data
5945

    
5946
  def _AddNewInstance(self):
5947
    """Add new instance data to allocator structure.
5948

5949
    This in combination with _AllocatorGetClusterData will create the
5950
    correct structure needed as input for the allocator.
5951

5952
    The checks for the completeness of the opcode must have already been
5953
    done.
5954

5955
    """
5956
    data = self.in_data
5957
    if len(self.disks) != 2:
5958
      raise errors.OpExecError("Only two-disk configurations supported")
5959

    
5960
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5961

    
5962
    if self.disk_template in constants.DTS_NET_MIRROR:
5963
      self.required_nodes = 2
5964
    else:
5965
      self.required_nodes = 1
5966
    request = {
5967
      "type": "allocate",
5968
      "name": self.name,
5969
      "disk_template": self.disk_template,
5970
      "tags": self.tags,
5971
      "os": self.os,
5972
      "vcpus": self.vcpus,
5973
      "memory": self.mem_size,
5974
      "disks": self.disks,
5975
      "disk_space_total": disk_space,
5976
      "nics": self.nics,
5977
      "required_nodes": self.required_nodes,
5978
      }
5979
    data["request"] = request
5980

    
5981
  def _AddRelocateInstance(self):
5982
    """Add relocate instance data to allocator structure.
5983

5984
    This in combination with _IAllocatorGetClusterData will create the
5985
    correct structure needed as input for the allocator.
5986

5987
    The checks for the completeness of the opcode must have already been
5988
    done.
5989

5990
    """
5991
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5992
    if instance is None:
5993
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5994
                                   " IAllocator" % self.name)
5995

    
5996
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5997
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5998

    
5999
    if len(instance.secondary_nodes) != 1:
6000
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6001

    
6002
    self.required_nodes = 1
6003
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6004
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6005

    
6006
    request = {
6007
      "type": "relocate",
6008
      "name": self.name,
6009
      "disk_space_total": disk_space,
6010
      "required_nodes": self.required_nodes,
6011
      "relocate_from": self.relocate_from,
6012
      }
6013
    self.in_data["request"] = request
6014

    
6015
  def _BuildInputData(self):
6016
    """Build input data structures.
6017

6018
    """
6019
    self._ComputeClusterData()
6020

    
6021
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6022
      self._AddNewInstance()
6023
    else:
6024
      self._AddRelocateInstance()
6025

    
6026
    self.in_text = serializer.Dump(self.in_data)
6027

    
6028
  def Run(self, name, validate=True, call_fn=None):
6029
    """Run an instance allocator and return the results.
6030

6031
    """
6032
    if call_fn is None:
6033
      call_fn = self.lu.rpc.call_iallocator_runner
6034
    data = self.in_text
6035

    
6036
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6037
    result.Raise()
6038

    
6039
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6040
      raise errors.OpExecError("Invalid result from master iallocator runner")
6041

    
6042
    rcode, stdout, stderr, fail = result.data
6043

    
6044
    if rcode == constants.IARUN_NOTFOUND:
6045
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6046
    elif rcode == constants.IARUN_FAILURE:
6047
      raise errors.OpExecError("Instance allocator call failed: %s,"
6048
                               " output: %s" % (fail, stdout+stderr))
6049
    self.out_text = stdout
6050
    if validate:
6051
      self._ValidateResult()
6052

    
6053
  def _ValidateResult(self):
6054
    """Process the allocator results.
6055

6056
    This will process and if successful save the result in
6057
    self.out_data and the other parameters.
6058

6059
    """
6060
    try:
6061
      rdict = serializer.Load(self.out_text)
6062
    except Exception, err:
6063
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6064

    
6065
    if not isinstance(rdict, dict):
6066
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6067

    
6068
    for key in "success", "info", "nodes":
6069
      if key not in rdict:
6070
        raise errors.OpExecError("Can't parse iallocator results:"
6071
                                 " missing key '%s'" % key)
6072
      setattr(self, key, rdict[key])
6073

    
6074
    if not isinstance(rdict["nodes"], list):
6075
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6076
                               " is not a list")
6077
    self.out_data = rdict
6078

    
6079

    
6080
class LUTestAllocator(NoHooksLU):
6081
  """Run allocator tests.
6082

6083
  This LU runs the allocator tests
6084

6085
  """
6086
  _OP_REQP = ["direction", "mode", "name"]
6087

    
6088
  def CheckPrereq(self):
6089
    """Check prerequisites.
6090

6091
    This checks the opcode parameters depending on the director and mode test.
6092

6093
    """
6094
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6095
      for attr in ["name", "mem_size", "disks", "disk_template",
6096
                   "os", "tags", "nics", "vcpus"]:
6097
        if not hasattr(self.op, attr):
6098
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6099
                                     attr)
6100
      iname = self.cfg.ExpandInstanceName(self.op.name)
6101
      if iname is not None:
6102
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6103
                                   iname)
6104
      if not isinstance(self.op.nics, list):
6105
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6106
      for row in self.op.nics:
6107
        if (not isinstance(row, dict) or
6108
            "mac" not in row or
6109
            "ip" not in row or
6110
            "bridge" not in row):
6111
          raise errors.OpPrereqError("Invalid contents of the"
6112
                                     " 'nics' parameter")
6113
      if not isinstance(self.op.disks, list):
6114
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6115
      if len(self.op.disks) != 2:
6116
        raise errors.OpPrereqError("Only two-disk configurations supported")
6117
      for row in self.op.disks:
6118
        if (not isinstance(row, dict) or
6119
            "size" not in row or
6120
            not isinstance(row["size"], int) or
6121
            "mode" not in row or
6122
            row["mode"] not in ['r', 'w']):
6123
          raise errors.OpPrereqError("Invalid contents of the"
6124
                                     " 'disks' parameter")
6125
      if self.op.hypervisor is None:
6126
        self.op.hypervisor = self.cfg.GetHypervisorType()
6127
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6128
      if not hasattr(self.op, "name"):
6129
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6130
      fname = self.cfg.ExpandInstanceName(self.op.name)
6131
      if fname is None:
6132
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6133
                                   self.op.name)
6134
      self.op.name = fname
6135
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6136
    else:
6137
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6138
                                 self.op.mode)
6139

    
6140
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6141
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6142
        raise errors.OpPrereqError("Missing allocator name")
6143
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6144
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6145
                                 self.op.direction)
6146

    
6147
  def Exec(self, feedback_fn):
6148
    """Run the allocator test.
6149

6150
    """
6151
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6152
      ial = IAllocator(self,
6153
                       mode=self.op.mode,
6154
                       name=self.op.name,
6155
                       mem_size=self.op.mem_size,
6156
                       disks=self.op.disks,
6157
                       disk_template=self.op.disk_template,
6158
                       os=self.op.os,
6159
                       tags=self.op.tags,
6160
                       nics=self.op.nics,
6161
                       vcpus=self.op.vcpus,
6162
                       hypervisor=self.op.hypervisor,
6163
                       )
6164
    else:
6165
      ial = IAllocator(self,
6166
                       mode=self.op.mode,
6167
                       name=self.op.name,
6168
                       relocate_from=list(self.relocate_from),
6169
                       )
6170

    
6171
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6172
      result = ial.in_text
6173
    else:
6174
      ial.Run(self.op.allocator, validate=False)
6175
      result = ial.out_text
6176
    return result