Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ eb1742d5

History | View | Annotate | Download (214.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = lu.cfg.GetInstanceList()
396
  return utils.NiceSort(wanted)
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419
                          memory, vcpus, nics):
420
  """Builds instance related env variables for hooks
421

422
  This builds the hook environment from individual variables.
423

424
  @type name: string
425
  @param name: the name of the instance
426
  @type primary_node: string
427
  @param primary_node: the name of the instance's primary node
428
  @type secondary_nodes: list
429
  @param secondary_nodes: list of secondary nodes as strings
430
  @type os_type: string
431
  @param os_type: the name of the instance's OS
432
  @type status: string
433
  @param status: the desired status of the instances
434
  @type memory: string
435
  @param memory: the memory size of the instance
436
  @type vcpus: string
437
  @param vcpus: the count of VCPUs the instance has
438
  @type nics: list
439
  @param nics: list of tuples (ip, bridge, mac) representing
440
      the NICs the instance  has
441
  @rtype: dict
442
  @return: the hook environment for this instance
443

444
  """
445
  env = {
446
    "OP_TARGET": name,
447
    "INSTANCE_NAME": name,
448
    "INSTANCE_PRIMARY": primary_node,
449
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450
    "INSTANCE_OS_TYPE": os_type,
451
    "INSTANCE_STATUS": status,
452
    "INSTANCE_MEMORY": memory,
453
    "INSTANCE_VCPUS": vcpus,
454
  }
455

    
456
  if nics:
457
    nic_count = len(nics)
458
    for idx, (ip, bridge, mac) in enumerate(nics):
459
      if ip is None:
460
        ip = ""
461
      env["INSTANCE_NIC%d_IP" % idx] = ip
462
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
464
  else:
465
    nic_count = 0
466

    
467
  env["INSTANCE_NIC_COUNT"] = nic_count
468

    
469
  return env
470

    
471

    
472
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473
  """Builds instance related env variables for hooks from an object.
474

475
  @type lu: L{LogicalUnit}
476
  @param lu: the logical unit on whose behalf we execute
477
  @type instance: L{objects.Instance}
478
  @param instance: the instance for which we should build the
479
      environment
480
  @type override: dict
481
  @param override: dictionary with key/values that will override
482
      our values
483
  @rtype: dict
484
  @return: the hook environment dictionary
485

486
  """
487
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
488
  args = {
489
    'name': instance.name,
490
    'primary_node': instance.primary_node,
491
    'secondary_nodes': instance.secondary_nodes,
492
    'os_type': instance.os,
493
    'status': instance.os,
494
    'memory': bep[constants.BE_MEMORY],
495
    'vcpus': bep[constants.BE_VCPUS],
496
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
497
  }
498
  if override:
499
    args.update(override)
500
  return _BuildInstanceHookEnv(**args)
501

    
502

    
503
def _CheckInstanceBridgesExist(lu, instance):
504
  """Check that the brigdes needed by an instance exist.
505

506
  """
507
  # check bridges existance
508
  brlist = [nic.bridge for nic in instance.nics]
509
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
510
  result.Raise()
511
  if not result.data:
512
    raise errors.OpPrereqError("One or more target bridges %s does not"
513
                               " exist on destination node '%s'" %
514
                               (brlist, instance.primary_node))
515

    
516

    
517
class LUDestroyCluster(NoHooksLU):
518
  """Logical unit for destroying the cluster.
519

520
  """
521
  _OP_REQP = []
522

    
523
  def CheckPrereq(self):
524
    """Check prerequisites.
525

526
    This checks whether the cluster is empty.
527

528
    Any errors are signalled by raising errors.OpPrereqError.
529

530
    """
531
    master = self.cfg.GetMasterNode()
532

    
533
    nodelist = self.cfg.GetNodeList()
534
    if len(nodelist) != 1 or nodelist[0] != master:
535
      raise errors.OpPrereqError("There are still %d node(s) in"
536
                                 " this cluster." % (len(nodelist) - 1))
537
    instancelist = self.cfg.GetInstanceList()
538
    if instancelist:
539
      raise errors.OpPrereqError("There are still %d instance(s) in"
540
                                 " this cluster." % len(instancelist))
541

    
542
  def Exec(self, feedback_fn):
543
    """Destroys the cluster.
544

545
    """
546
    master = self.cfg.GetMasterNode()
547
    result = self.rpc.call_node_stop_master(master, False)
548
    result.Raise()
549
    if not result.data:
550
      raise errors.OpExecError("Could not disable the master role")
551
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552
    utils.CreateBackup(priv_key)
553
    utils.CreateBackup(pub_key)
554
    return master
555

    
556

    
557
class LUVerifyCluster(LogicalUnit):
558
  """Verifies the cluster status.
559

560
  """
561
  HPATH = "cluster-verify"
562
  HTYPE = constants.HTYPE_CLUSTER
563
  _OP_REQP = ["skip_checks"]
564
  REQ_BGL = False
565

    
566
  def ExpandNames(self):
567
    self.needed_locks = {
568
      locking.LEVEL_NODE: locking.ALL_SET,
569
      locking.LEVEL_INSTANCE: locking.ALL_SET,
570
    }
571
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
572

    
573
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574
                  node_result, feedback_fn, master_files):
575
    """Run multiple tests against a node.
576

577
    Test list:
578

579
      - compares ganeti version
580
      - checks vg existance and size > 20G
581
      - checks config file checksum
582
      - checks ssh to other nodes
583

584
    @type nodeinfo: L{objects.Node}
585
    @param nodeinfo: the node to check
586
    @param file_list: required list of files
587
    @param local_cksum: dictionary of local files and their checksums
588
    @param node_result: the results from the node
589
    @param feedback_fn: function used to accumulate results
590
    @param master_files: list of files that only masters should have
591

592
    """
593
    node = nodeinfo.name
594

    
595
    # main result, node_result should be a non-empty dict
596
    if not node_result or not isinstance(node_result, dict):
597
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
598
      return True
599

    
600
    # compares ganeti version
601
    local_version = constants.PROTOCOL_VERSION
602
    remote_version = node_result.get('version', None)
603
    if not remote_version:
604
      feedback_fn("  - ERROR: connection to %s failed" % (node))
605
      return True
606

    
607
    if local_version != remote_version:
608
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
609
                      (local_version, node, remote_version))
610
      return True
611

    
612
    # checks vg existance and size > 20G
613

    
614
    bad = False
615
    vglist = node_result.get(constants.NV_VGLIST, None)
616
    if not vglist:
617
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
618
                      (node,))
619
      bad = True
620
    else:
621
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622
                                            constants.MIN_VG_SIZE)
623
      if vgstatus:
624
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
625
        bad = True
626

    
627
    # checks config file checksum
628

    
629
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
630
    if not isinstance(remote_cksum, dict):
631
      bad = True
632
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
633
    else:
634
      for file_name in file_list:
635
        node_is_mc = nodeinfo.master_candidate
636
        must_have_file = file_name not in master_files
637
        if file_name not in remote_cksum:
638
          if node_is_mc or must_have_file:
639
            bad = True
640
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
641
        elif remote_cksum[file_name] != local_cksum[file_name]:
642
          if node_is_mc or must_have_file:
643
            bad = True
644
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
645
          else:
646
            # not candidate and this is not a must-have file
647
            bad = True
648
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
649
                        " '%s'" % file_name)
650
        else:
651
          # all good, except non-master/non-must have combination
652
          if not node_is_mc and not must_have_file:
653
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
654
                        " candidates" % file_name)
655

    
656
    # checks ssh to any
657

    
658
    if constants.NV_NODELIST not in node_result:
659
      bad = True
660
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
661
    else:
662
      if node_result[constants.NV_NODELIST]:
663
        bad = True
664
        for node in node_result[constants.NV_NODELIST]:
665
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
666
                          (node, node_result[constants.NV_NODELIST][node]))
667

    
668
    if constants.NV_NODENETTEST not in node_result:
669
      bad = True
670
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
671
    else:
672
      if node_result[constants.NV_NODENETTEST]:
673
        bad = True
674
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
675
        for node in nlist:
676
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
677
                          (node, node_result[constants.NV_NODENETTEST][node]))
678

    
679
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680
    if isinstance(hyp_result, dict):
681
      for hv_name, hv_result in hyp_result.iteritems():
682
        if hv_result is not None:
683
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
684
                      (hv_name, hv_result))
685
    return bad
686

    
687
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688
                      node_instance, feedback_fn):
689
    """Verify an instance.
690

691
    This function checks to see if the required block devices are
692
    available on the instance's node.
693

694
    """
695
    bad = False
696

    
697
    node_current = instanceconfig.primary_node
698

    
699
    node_vol_should = {}
700
    instanceconfig.MapLVsByNode(node_vol_should)
701

    
702
    for node in node_vol_should:
703
      for volume in node_vol_should[node]:
704
        if node not in node_vol_is or volume not in node_vol_is[node]:
705
          feedback_fn("  - ERROR: volume %s missing on node %s" %
706
                          (volume, node))
707
          bad = True
708

    
709
    if not instanceconfig.status == 'down':
710
      if (node_current not in node_instance or
711
          not instance in node_instance[node_current]):
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758
    """Verify N+1 Memory Resilience.
759

760
    Check that if one single node dies we can still start all the instances it
761
    was primary for.
762

763
    """
764
    bad = False
765

    
766
    for node, nodeinfo in node_info.iteritems():
767
      # This code checks that every node which is now listed as secondary has
768
      # enough memory to host all instances it is supposed to should a single
769
      # other node in the cluster fail.
770
      # FIXME: not ready for failover to an arbitrary node
771
      # FIXME: does not support file-backed instances
772
      # WARNING: we currently take into account down instances as well as up
773
      # ones, considering that even if they're down someone might want to start
774
      # them even in the event of a node failure.
775
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776
        needed_mem = 0
777
        for instance in instances:
778
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779
          if bep[constants.BE_AUTO_BALANCE]:
780
            needed_mem += bep[constants.BE_MEMORY]
781
        if nodeinfo['mfree'] < needed_mem:
782
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
783
                      " failovers should node %s fail" % (node, prinode))
784
          bad = True
785
    return bad
786

    
787
  def CheckPrereq(self):
788
    """Check prerequisites.
789

790
    Transform the list of checks we're going to skip into a set and check that
791
    all its members are valid.
792

793
    """
794
    self.skip_set = frozenset(self.op.skip_checks)
795
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
797

    
798
  def BuildHooksEnv(self):
799
    """Build hooks env.
800

801
    Cluster-Verify hooks just rone in the post phase and their failure makes
802
    the output be logged in the verify output and the verification to fail.
803

804
    """
805
    all_nodes = self.cfg.GetNodeList()
806
    # TODO: populate the environment with useful information for verify hooks
807
    env = {}
808
    return env, [], all_nodes
809

    
810
  def Exec(self, feedback_fn):
811
    """Verify integrity of cluster, performing various test on nodes.
812

813
    """
814
    bad = False
815
    feedback_fn("* Verifying global settings")
816
    for msg in self.cfg.VerifyConfig():
817
      feedback_fn("  - ERROR: %s" % msg)
818

    
819
    vg_name = self.cfg.GetVGName()
820
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
822
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824
    i_non_redundant = [] # Non redundant instances
825
    i_non_a_balanced = [] # Non auto-balanced instances
826
    node_volume = {}
827
    node_instance = {}
828
    node_info = {}
829
    instance_cfg = {}
830

    
831
    # FIXME: verify OS list
832
    # do local checksums
833
    master_files = [constants.CLUSTER_CONF_FILE]
834

    
835
    file_names = ssconf.SimpleStore().GetFileList()
836
    file_names.append(constants.SSL_CERT_FILE)
837
    file_names.extend(master_files)
838

    
839
    local_checksums = utils.FingerprintFiles(file_names)
840

    
841
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842
    node_verify_param = {
843
      constants.NV_FILELIST: file_names,
844
      constants.NV_NODELIST: nodelist,
845
      constants.NV_HYPERVISOR: hypervisors,
846
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847
                                  node.secondary_ip) for node in nodeinfo],
848
      constants.NV_LVLIST: vg_name,
849
      constants.NV_INSTANCELIST: hypervisors,
850
      constants.NV_VGLIST: None,
851
      constants.NV_VERSION: None,
852
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
853
      }
854
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855
                                           self.cfg.GetClusterName())
856

    
857
    cluster = self.cfg.GetClusterInfo()
858
    master_node = self.cfg.GetMasterNode()
859
    for node_i in nodeinfo:
860
      node = node_i.name
861
      nresult = all_nvinfo[node].data
862

    
863
      if node == master_node:
864
        ntype = "master"
865
      elif node_i.master_candidate:
866
        ntype = "master candidate"
867
      else:
868
        ntype = "regular"
869
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
870

    
871
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
872
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
873
        bad = True
874
        continue
875

    
876
      result = self._VerifyNode(node_i, file_names, local_checksums,
877
                                nresult, feedback_fn, master_files)
878
      bad = bad or result
879

    
880
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881
      if isinstance(lvdata, basestring):
882
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
883
                    (node, lvdata.encode('string_escape')))
884
        bad = True
885
        node_volume[node] = {}
886
      elif not isinstance(lvdata, dict):
887
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
888
        bad = True
889
        continue
890
      else:
891
        node_volume[node] = lvdata
892

    
893
      # node_instance
894
      idata = nresult.get(constants.NV_INSTANCELIST, None)
895
      if not isinstance(idata, list):
896
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
897
                    (node,))
898
        bad = True
899
        continue
900

    
901
      node_instance[node] = idata
902

    
903
      # node_info
904
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
905
      if not isinstance(nodeinfo, dict):
906
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
907
        bad = True
908
        continue
909

    
910
      try:
911
        node_info[node] = {
912
          "mfree": int(nodeinfo['memory_free']),
913
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
914
          "pinst": [],
915
          "sinst": [],
916
          # dictionary holding all instances this node is secondary for,
917
          # grouped by their primary node. Each key is a cluster node, and each
918
          # value is a list of instances which have the key as primary and the
919
          # current node as secondary.  this is handy to calculate N+1 memory
920
          # availability if you can only failover from a primary to its
921
          # secondary.
922
          "sinst-by-pnode": {},
923
        }
924
      except ValueError:
925
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926
        bad = True
927
        continue
928

    
929
    node_vol_should = {}
930

    
931
    for instance in instancelist:
932
      feedback_fn("* Verifying instance %s" % instance)
933
      inst_config = self.cfg.GetInstanceInfo(instance)
934
      result =  self._VerifyInstance(instance, inst_config, node_volume,
935
                                     node_instance, feedback_fn)
936
      bad = bad or result
937

    
938
      inst_config.MapLVsByNode(node_vol_should)
939

    
940
      instance_cfg[instance] = inst_config
941

    
942
      pnode = inst_config.primary_node
943
      if pnode in node_info:
944
        node_info[pnode]['pinst'].append(instance)
945
      else:
946
        feedback_fn("  - ERROR: instance %s, connection to primary node"
947
                    " %s failed" % (instance, pnode))
948
        bad = True
949

    
950
      # If the instance is non-redundant we cannot survive losing its primary
951
      # node, so we are not N+1 compliant. On the other hand we have no disk
952
      # templates with more than one secondary so that situation is not well
953
      # supported either.
954
      # FIXME: does not support file-backed instances
955
      if len(inst_config.secondary_nodes) == 0:
956
        i_non_redundant.append(instance)
957
      elif len(inst_config.secondary_nodes) > 1:
958
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
959
                    % instance)
960

    
961
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962
        i_non_a_balanced.append(instance)
963

    
964
      for snode in inst_config.secondary_nodes:
965
        if snode in node_info:
966
          node_info[snode]['sinst'].append(instance)
967
          if pnode not in node_info[snode]['sinst-by-pnode']:
968
            node_info[snode]['sinst-by-pnode'][pnode] = []
969
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
970
        else:
971
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
972
                      " %s failed" % (instance, snode))
973

    
974
    feedback_fn("* Verifying orphan volumes")
975
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976
                                       feedback_fn)
977
    bad = bad or result
978

    
979
    feedback_fn("* Verifying remaining instances")
980
    result = self._VerifyOrphanInstances(instancelist, node_instance,
981
                                         feedback_fn)
982
    bad = bad or result
983

    
984
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985
      feedback_fn("* Verifying N+1 Memory redundancy")
986
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
987
      bad = bad or result
988

    
989
    feedback_fn("* Other Notes")
990
    if i_non_redundant:
991
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
992
                  % len(i_non_redundant))
993

    
994
    if i_non_a_balanced:
995
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
996
                  % len(i_non_a_balanced))
997

    
998
    return not bad
999

    
1000
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001
    """Analize the post-hooks' result
1002

1003
    This method analyses the hook result, handles it, and sends some
1004
    nicely-formatted feedback back to the user.
1005

1006
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008
    @param hooks_results: the results of the multi-node hooks rpc call
1009
    @param feedback_fn: function used send feedback back to the caller
1010
    @param lu_result: previous Exec result
1011
    @return: the new Exec result, based on the previous result
1012
        and hook results
1013

1014
    """
1015
    # We only really run POST phase hooks, and are only interested in
1016
    # their results
1017
    if phase == constants.HOOKS_PHASE_POST:
1018
      # Used to change hooks' output to proper indentation
1019
      indent_re = re.compile('^', re.M)
1020
      feedback_fn("* Hooks Results")
1021
      if not hooks_results:
1022
        feedback_fn("  - ERROR: general communication failure")
1023
        lu_result = 1
1024
      else:
1025
        for node_name in hooks_results:
1026
          show_node_header = True
1027
          res = hooks_results[node_name]
1028
          if res.failed or res.data is False or not isinstance(res.data, list):
1029
            feedback_fn("    Communication failure in hooks execution")
1030
            lu_result = 1
1031
            continue
1032
          for script, hkr, output in res.data:
1033
            if hkr == constants.HKR_FAIL:
1034
              # The node header is only shown once, if there are
1035
              # failing hooks on that node
1036
              if show_node_header:
1037
                feedback_fn("  Node %s:" % node_name)
1038
                show_node_header = False
1039
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1040
              output = indent_re.sub('      ', output)
1041
              feedback_fn("%s" % output)
1042
              lu_result = 1
1043

    
1044
      return lu_result
1045

    
1046

    
1047
class LUVerifyDisks(NoHooksLU):
1048
  """Verifies the cluster disks status.
1049

1050
  """
1051
  _OP_REQP = []
1052
  REQ_BGL = False
1053

    
1054
  def ExpandNames(self):
1055
    self.needed_locks = {
1056
      locking.LEVEL_NODE: locking.ALL_SET,
1057
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1058
    }
1059
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1060

    
1061
  def CheckPrereq(self):
1062
    """Check prerequisites.
1063

1064
    This has no prerequisites.
1065

1066
    """
1067
    pass
1068

    
1069
  def Exec(self, feedback_fn):
1070
    """Verify integrity of cluster disks.
1071

1072
    """
1073
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1074

    
1075
    vg_name = self.cfg.GetVGName()
1076
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1077
    instances = [self.cfg.GetInstanceInfo(name)
1078
                 for name in self.cfg.GetInstanceList()]
1079

    
1080
    nv_dict = {}
1081
    for inst in instances:
1082
      inst_lvs = {}
1083
      if (inst.status != "up" or
1084
          inst.disk_template not in constants.DTS_NET_MIRROR):
1085
        continue
1086
      inst.MapLVsByNode(inst_lvs)
1087
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088
      for node, vol_list in inst_lvs.iteritems():
1089
        for vol in vol_list:
1090
          nv_dict[(node, vol)] = inst
1091

    
1092
    if not nv_dict:
1093
      return result
1094

    
1095
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1096

    
1097
    to_act = set()
1098
    for node in nodes:
1099
      # node_volume
1100
      lvs = node_lvs[node]
1101
      if lvs.failed:
1102
        self.LogWarning("Connection to node %s failed: %s" %
1103
                        (node, lvs.data))
1104
        continue
1105
      lvs = lvs.data
1106
      if isinstance(lvs, basestring):
1107
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108
        res_nlvm[node] = lvs
1109
      elif not isinstance(lvs, dict):
1110
        logging.warning("Connection to node %s failed or invalid data"
1111
                        " returned", node)
1112
        res_nodes.append(node)
1113
        continue
1114

    
1115
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116
        inst = nv_dict.pop((node, lv_name), None)
1117
        if (not lv_online and inst is not None
1118
            and inst.name not in res_instances):
1119
          res_instances.append(inst.name)
1120

    
1121
    # any leftover items in nv_dict are missing LVs, let's arrange the
1122
    # data better
1123
    for key, inst in nv_dict.iteritems():
1124
      if inst.name not in res_missing:
1125
        res_missing[inst.name] = []
1126
      res_missing[inst.name].append(key)
1127

    
1128
    return result
1129

    
1130

    
1131
class LURenameCluster(LogicalUnit):
1132
  """Rename the cluster.
1133

1134
  """
1135
  HPATH = "cluster-rename"
1136
  HTYPE = constants.HTYPE_CLUSTER
1137
  _OP_REQP = ["name"]
1138

    
1139
  def BuildHooksEnv(self):
1140
    """Build hooks env.
1141

1142
    """
1143
    env = {
1144
      "OP_TARGET": self.cfg.GetClusterName(),
1145
      "NEW_NAME": self.op.name,
1146
      }
1147
    mn = self.cfg.GetMasterNode()
1148
    return env, [mn], [mn]
1149

    
1150
  def CheckPrereq(self):
1151
    """Verify that the passed name is a valid one.
1152

1153
    """
1154
    hostname = utils.HostInfo(self.op.name)
1155

    
1156
    new_name = hostname.name
1157
    self.ip = new_ip = hostname.ip
1158
    old_name = self.cfg.GetClusterName()
1159
    old_ip = self.cfg.GetMasterIP()
1160
    if new_name == old_name and new_ip == old_ip:
1161
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162
                                 " cluster has changed")
1163
    if new_ip != old_ip:
1164
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166
                                   " reachable on the network. Aborting." %
1167
                                   new_ip)
1168

    
1169
    self.op.name = new_name
1170

    
1171
  def Exec(self, feedback_fn):
1172
    """Rename the cluster.
1173

1174
    """
1175
    clustername = self.op.name
1176
    ip = self.ip
1177

    
1178
    # shutdown the master IP
1179
    master = self.cfg.GetMasterNode()
1180
    result = self.rpc.call_node_stop_master(master, False)
1181
    if result.failed or not result.data:
1182
      raise errors.OpExecError("Could not disable the master role")
1183

    
1184
    try:
1185
      cluster = self.cfg.GetClusterInfo()
1186
      cluster.cluster_name = clustername
1187
      cluster.master_ip = ip
1188
      self.cfg.Update(cluster)
1189

    
1190
      # update the known hosts file
1191
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192
      node_list = self.cfg.GetNodeList()
1193
      try:
1194
        node_list.remove(master)
1195
      except ValueError:
1196
        pass
1197
      result = self.rpc.call_upload_file(node_list,
1198
                                         constants.SSH_KNOWN_HOSTS_FILE)
1199
      for to_node, to_result in result.iteritems():
1200
        if to_result.failed or not to_result.data:
1201
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1202

    
1203
    finally:
1204
      result = self.rpc.call_node_start_master(master, False)
1205
      if result.failed or not result.data:
1206
        self.LogWarning("Could not re-enable the master role on"
1207
                        " the master, please restart manually.")
1208

    
1209

    
1210
def _RecursiveCheckIfLVMBased(disk):
1211
  """Check if the given disk or its children are lvm-based.
1212

1213
  @type disk: L{objects.Disk}
1214
  @param disk: the disk to check
1215
  @rtype: booleean
1216
  @return: boolean indicating whether a LD_LV dev_type was found or not
1217

1218
  """
1219
  if disk.children:
1220
    for chdisk in disk.children:
1221
      if _RecursiveCheckIfLVMBased(chdisk):
1222
        return True
1223
  return disk.dev_type == constants.LD_LV
1224

    
1225

    
1226
class LUSetClusterParams(LogicalUnit):
1227
  """Change the parameters of the cluster.
1228

1229
  """
1230
  HPATH = "cluster-modify"
1231
  HTYPE = constants.HTYPE_CLUSTER
1232
  _OP_REQP = []
1233
  REQ_BGL = False
1234

    
1235
  def CheckParameters(self):
1236
    """Check parameters
1237

1238
    """
1239
    if not hasattr(self.op, "candidate_pool_size"):
1240
      self.op.candidate_pool_size = None
1241
    if self.op.candidate_pool_size is not None:
1242
      try:
1243
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244
      except ValueError, err:
1245
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1246
                                   str(err))
1247
      if self.op.candidate_pool_size < 1:
1248
        raise errors.OpPrereqError("At least one master candidate needed")
1249

    
1250
  def ExpandNames(self):
1251
    # FIXME: in the future maybe other cluster params won't require checking on
1252
    # all nodes to be modified.
1253
    self.needed_locks = {
1254
      locking.LEVEL_NODE: locking.ALL_SET,
1255
    }
1256
    self.share_locks[locking.LEVEL_NODE] = 1
1257

    
1258
  def BuildHooksEnv(self):
1259
    """Build hooks env.
1260

1261
    """
1262
    env = {
1263
      "OP_TARGET": self.cfg.GetClusterName(),
1264
      "NEW_VG_NAME": self.op.vg_name,
1265
      }
1266
    mn = self.cfg.GetMasterNode()
1267
    return env, [mn], [mn]
1268

    
1269
  def CheckPrereq(self):
1270
    """Check prerequisites.
1271

1272
    This checks whether the given params don't conflict and
1273
    if the given volume group is valid.
1274

1275
    """
1276
    # FIXME: This only works because there is only one parameter that can be
1277
    # changed or removed.
1278
    if self.op.vg_name is not None and not self.op.vg_name:
1279
      instances = self.cfg.GetAllInstancesInfo().values()
1280
      for inst in instances:
1281
        for disk in inst.disks:
1282
          if _RecursiveCheckIfLVMBased(disk):
1283
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1284
                                       " lvm-based instances exist")
1285

    
1286
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1287

    
1288
    # if vg_name not None, checks given volume group on all nodes
1289
    if self.op.vg_name:
1290
      vglist = self.rpc.call_vg_list(node_list)
1291
      for node in node_list:
1292
        if vglist[node].failed:
1293
          # ignoring down node
1294
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1295
          continue
1296
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1297
                                              self.op.vg_name,
1298
                                              constants.MIN_VG_SIZE)
1299
        if vgstatus:
1300
          raise errors.OpPrereqError("Error on node '%s': %s" %
1301
                                     (node, vgstatus))
1302

    
1303
    self.cluster = cluster = self.cfg.GetClusterInfo()
1304
    # validate beparams changes
1305
    if self.op.beparams:
1306
      utils.CheckBEParams(self.op.beparams)
1307
      self.new_beparams = cluster.FillDict(
1308
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1309

    
1310
    # hypervisor list/parameters
1311
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312
    if self.op.hvparams:
1313
      if not isinstance(self.op.hvparams, dict):
1314
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315
      for hv_name, hv_dict in self.op.hvparams.items():
1316
        if hv_name not in self.new_hvparams:
1317
          self.new_hvparams[hv_name] = hv_dict
1318
        else:
1319
          self.new_hvparams[hv_name].update(hv_dict)
1320

    
1321
    if self.op.enabled_hypervisors is not None:
1322
      self.hv_list = self.op.enabled_hypervisors
1323
    else:
1324
      self.hv_list = cluster.enabled_hypervisors
1325

    
1326
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327
      # either the enabled list has changed, or the parameters have, validate
1328
      for hv_name, hv_params in self.new_hvparams.items():
1329
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330
            (self.op.enabled_hypervisors and
1331
             hv_name in self.op.enabled_hypervisors)):
1332
          # either this is a new hypervisor, or its parameters have changed
1333
          hv_class = hypervisor.GetHypervisor(hv_name)
1334
          hv_class.CheckParameterSyntax(hv_params)
1335
          _CheckHVParams(self, node_list, hv_name, hv_params)
1336

    
1337
  def Exec(self, feedback_fn):
1338
    """Change the parameters of the cluster.
1339

1340
    """
1341
    if self.op.vg_name is not None:
1342
      if self.op.vg_name != self.cfg.GetVGName():
1343
        self.cfg.SetVGName(self.op.vg_name)
1344
      else:
1345
        feedback_fn("Cluster LVM configuration already in desired"
1346
                    " state, not changing")
1347
    if self.op.hvparams:
1348
      self.cluster.hvparams = self.new_hvparams
1349
    if self.op.enabled_hypervisors is not None:
1350
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351
    if self.op.beparams:
1352
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353
    if self.op.candidate_pool_size is not None:
1354
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1355

    
1356
    self.cfg.Update(self.cluster)
1357

    
1358
    # we want to update nodes after the cluster so that if any errors
1359
    # happen, we have recorded and saved the cluster info
1360
    if self.op.candidate_pool_size is not None:
1361
      node_info = self.cfg.GetAllNodesInfo().values()
1362
      num_candidates = len([node for node in node_info
1363
                            if node.master_candidate])
1364
      num_nodes = len(node_info)
1365
      if num_candidates < self.op.candidate_pool_size:
1366
        random.shuffle(node_info)
1367
        for node in node_info:
1368
          if num_candidates >= self.op.candidate_pool_size:
1369
            break
1370
          if node.master_candidate:
1371
            continue
1372
          node.master_candidate = True
1373
          self.LogInfo("Promoting node %s to master candidate", node.name)
1374
          self.cfg.Update(node)
1375
          self.context.ReaddNode(node)
1376
          num_candidates += 1
1377
      elif num_candidates > self.op.candidate_pool_size:
1378
        self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379
                     " of candidate_pool_size (%d)" %
1380
                     (num_candidates, self.op.candidate_pool_size))
1381

    
1382

    
1383
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384
  """Sleep and poll for an instance's disk to sync.
1385

1386
  """
1387
  if not instance.disks:
1388
    return True
1389

    
1390
  if not oneshot:
1391
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1392

    
1393
  node = instance.primary_node
1394

    
1395
  for dev in instance.disks:
1396
    lu.cfg.SetDiskID(dev, node)
1397

    
1398
  retries = 0
1399
  while True:
1400
    max_time = 0
1401
    done = True
1402
    cumul_degraded = False
1403
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404
    if rstats.failed or not rstats.data:
1405
      lu.LogWarning("Can't get any data from node %s", node)
1406
      retries += 1
1407
      if retries >= 10:
1408
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1409
                                 " aborting." % node)
1410
      time.sleep(6)
1411
      continue
1412
    rstats = rstats.data
1413
    retries = 0
1414
    for i in range(len(rstats)):
1415
      mstat = rstats[i]
1416
      if mstat is None:
1417
        lu.LogWarning("Can't compute data for node %s/%s",
1418
                           node, instance.disks[i].iv_name)
1419
        continue
1420
      # we ignore the ldisk parameter
1421
      perc_done, est_time, is_degraded, _ = mstat
1422
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423
      if perc_done is not None:
1424
        done = False
1425
        if est_time is not None:
1426
          rem_time = "%d estimated seconds remaining" % est_time
1427
          max_time = est_time
1428
        else:
1429
          rem_time = "no time estimate"
1430
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431
                        (instance.disks[i].iv_name, perc_done, rem_time))
1432
    if done or oneshot:
1433
      break
1434

    
1435
    time.sleep(min(60, max_time))
1436

    
1437
  if done:
1438
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439
  return not cumul_degraded
1440

    
1441

    
1442
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443
  """Check that mirrors are not degraded.
1444

1445
  The ldisk parameter, if True, will change the test from the
1446
  is_degraded attribute (which represents overall non-ok status for
1447
  the device(s)) to the ldisk (representing the local storage status).
1448

1449
  """
1450
  lu.cfg.SetDiskID(dev, node)
1451
  if ldisk:
1452
    idx = 6
1453
  else:
1454
    idx = 5
1455

    
1456
  result = True
1457
  if on_primary or dev.AssembleOnSecondary():
1458
    rstats = lu.rpc.call_blockdev_find(node, dev)
1459
    if rstats.failed or not rstats.data:
1460
      logging.warning("Node %s: disk degraded, not found or node down", node)
1461
      result = False
1462
    else:
1463
      result = result and (not rstats.data[idx])
1464
  if dev.children:
1465
    for child in dev.children:
1466
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1467

    
1468
  return result
1469

    
1470

    
1471
class LUDiagnoseOS(NoHooksLU):
1472
  """Logical unit for OS diagnose/query.
1473

1474
  """
1475
  _OP_REQP = ["output_fields", "names"]
1476
  REQ_BGL = False
1477
  _FIELDS_STATIC = utils.FieldSet()
1478
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1479

    
1480
  def ExpandNames(self):
1481
    if self.op.names:
1482
      raise errors.OpPrereqError("Selective OS query not supported")
1483

    
1484
    _CheckOutputFields(static=self._FIELDS_STATIC,
1485
                       dynamic=self._FIELDS_DYNAMIC,
1486
                       selected=self.op.output_fields)
1487

    
1488
    # Lock all nodes, in shared mode
1489
    self.needed_locks = {}
1490
    self.share_locks[locking.LEVEL_NODE] = 1
1491
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1492

    
1493
  def CheckPrereq(self):
1494
    """Check prerequisites.
1495

1496
    """
1497

    
1498
  @staticmethod
1499
  def _DiagnoseByOS(node_list, rlist):
1500
    """Remaps a per-node return list into an a per-os per-node dictionary
1501

1502
    @param node_list: a list with the names of all nodes
1503
    @param rlist: a map with node names as keys and OS objects as values
1504

1505
    @rtype: dict
1506
    @returns: a dictionary with osnames as keys and as value another map, with
1507
        nodes as keys and list of OS objects as values, eg::
1508

1509
          {"debian-etch": {"node1": [<object>,...],
1510
                           "node2": [<object>,]}
1511
          }
1512

1513
    """
1514
    all_os = {}
1515
    for node_name, nr in rlist.iteritems():
1516
      if nr.failed or not nr.data:
1517
        continue
1518
      for os_obj in nr.data:
1519
        if os_obj.name not in all_os:
1520
          # build a list of nodes for this os containing empty lists
1521
          # for each node in node_list
1522
          all_os[os_obj.name] = {}
1523
          for nname in node_list:
1524
            all_os[os_obj.name][nname] = []
1525
        all_os[os_obj.name][node_name].append(os_obj)
1526
    return all_os
1527

    
1528
  def Exec(self, feedback_fn):
1529
    """Compute the list of OSes.
1530

1531
    """
1532
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1533
    node_data = self.rpc.call_os_diagnose(node_list)
1534
    if node_data == False:
1535
      raise errors.OpExecError("Can't gather the list of OSes")
1536
    pol = self._DiagnoseByOS(node_list, node_data)
1537
    output = []
1538
    for os_name, os_data in pol.iteritems():
1539
      row = []
1540
      for field in self.op.output_fields:
1541
        if field == "name":
1542
          val = os_name
1543
        elif field == "valid":
1544
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1545
        elif field == "node_status":
1546
          val = {}
1547
          for node_name, nos_list in os_data.iteritems():
1548
            val[node_name] = [(v.status, v.path) for v in nos_list]
1549
        else:
1550
          raise errors.ParameterError(field)
1551
        row.append(val)
1552
      output.append(row)
1553

    
1554
    return output
1555

    
1556

    
1557
class LURemoveNode(LogicalUnit):
1558
  """Logical unit for removing a node.
1559

1560
  """
1561
  HPATH = "node-remove"
1562
  HTYPE = constants.HTYPE_NODE
1563
  _OP_REQP = ["node_name"]
1564

    
1565
  def BuildHooksEnv(self):
1566
    """Build hooks env.
1567

1568
    This doesn't run on the target node in the pre phase as a failed
1569
    node would then be impossible to remove.
1570

1571
    """
1572
    env = {
1573
      "OP_TARGET": self.op.node_name,
1574
      "NODE_NAME": self.op.node_name,
1575
      }
1576
    all_nodes = self.cfg.GetNodeList()
1577
    all_nodes.remove(self.op.node_name)
1578
    return env, all_nodes, all_nodes
1579

    
1580
  def CheckPrereq(self):
1581
    """Check prerequisites.
1582

1583
    This checks:
1584
     - the node exists in the configuration
1585
     - it does not have primary or secondary instances
1586
     - it's not the master
1587

1588
    Any errors are signalled by raising errors.OpPrereqError.
1589

1590
    """
1591
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1592
    if node is None:
1593
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1594

    
1595
    instance_list = self.cfg.GetInstanceList()
1596

    
1597
    masternode = self.cfg.GetMasterNode()
1598
    if node.name == masternode:
1599
      raise errors.OpPrereqError("Node is the master node,"
1600
                                 " you need to failover first.")
1601

    
1602
    for instance_name in instance_list:
1603
      instance = self.cfg.GetInstanceInfo(instance_name)
1604
      if node.name == instance.primary_node:
1605
        raise errors.OpPrereqError("Instance %s still running on the node,"
1606
                                   " please remove first." % instance_name)
1607
      if node.name in instance.secondary_nodes:
1608
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609
                                   " please remove first." % instance_name)
1610
    self.op.node_name = node.name
1611
    self.node = node
1612

    
1613
  def Exec(self, feedback_fn):
1614
    """Removes the node from the cluster.
1615

1616
    """
1617
    node = self.node
1618
    logging.info("Stopping the node daemon and removing configs from node %s",
1619
                 node.name)
1620

    
1621
    self.context.RemoveNode(node.name)
1622

    
1623
    self.rpc.call_node_leave_cluster(node.name)
1624

    
1625
    # Promote nodes to master candidate as needed
1626
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1627
    node_info = self.cfg.GetAllNodesInfo().values()
1628
    num_candidates = len([n for n in node_info
1629
                          if n.master_candidate])
1630
    num_nodes = len(node_info)
1631
    random.shuffle(node_info)
1632
    for node in node_info:
1633
      if num_candidates >= cp_size or num_candidates >= num_nodes:
1634
        break
1635
      if node.master_candidate:
1636
        continue
1637
      node.master_candidate = True
1638
      self.LogInfo("Promoting node %s to master candidate", node.name)
1639
      self.cfg.Update(node)
1640
      self.context.ReaddNode(node)
1641
      num_candidates += 1
1642

    
1643

    
1644
class LUQueryNodes(NoHooksLU):
1645
  """Logical unit for querying nodes.
1646

1647
  """
1648
  _OP_REQP = ["output_fields", "names"]
1649
  REQ_BGL = False
1650
  _FIELDS_DYNAMIC = utils.FieldSet(
1651
    "dtotal", "dfree",
1652
    "mtotal", "mnode", "mfree",
1653
    "bootid",
1654
    "ctotal",
1655
    )
1656

    
1657
  _FIELDS_STATIC = utils.FieldSet(
1658
    "name", "pinst_cnt", "sinst_cnt",
1659
    "pinst_list", "sinst_list",
1660
    "pip", "sip", "tags",
1661
    "serial_no",
1662
    "master_candidate",
1663
    "master",
1664
    )
1665

    
1666
  def ExpandNames(self):
1667
    _CheckOutputFields(static=self._FIELDS_STATIC,
1668
                       dynamic=self._FIELDS_DYNAMIC,
1669
                       selected=self.op.output_fields)
1670

    
1671
    self.needed_locks = {}
1672
    self.share_locks[locking.LEVEL_NODE] = 1
1673

    
1674
    if self.op.names:
1675
      self.wanted = _GetWantedNodes(self, self.op.names)
1676
    else:
1677
      self.wanted = locking.ALL_SET
1678

    
1679
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1680
    if self.do_locking:
1681
      # if we don't request only static fields, we need to lock the nodes
1682
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1683

    
1684

    
1685
  def CheckPrereq(self):
1686
    """Check prerequisites.
1687

1688
    """
1689
    # The validation of the node list is done in the _GetWantedNodes,
1690
    # if non empty, and if empty, there's no validation to do
1691
    pass
1692

    
1693
  def Exec(self, feedback_fn):
1694
    """Computes the list of nodes and their attributes.
1695

1696
    """
1697
    all_info = self.cfg.GetAllNodesInfo()
1698
    if self.do_locking:
1699
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1700
    elif self.wanted != locking.ALL_SET:
1701
      nodenames = self.wanted
1702
      missing = set(nodenames).difference(all_info.keys())
1703
      if missing:
1704
        raise errors.OpExecError(
1705
          "Some nodes were removed before retrieving their data: %s" % missing)
1706
    else:
1707
      nodenames = all_info.keys()
1708

    
1709
    nodenames = utils.NiceSort(nodenames)
1710
    nodelist = [all_info[name] for name in nodenames]
1711

    
1712
    # begin data gathering
1713

    
1714
    if self.do_locking:
1715
      live_data = {}
1716
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1717
                                          self.cfg.GetHypervisorType())
1718
      for name in nodenames:
1719
        nodeinfo = node_data[name]
1720
        if not nodeinfo.failed and nodeinfo.data:
1721
          nodeinfo = nodeinfo.data
1722
          fn = utils.TryConvert
1723
          live_data[name] = {
1724
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1725
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1726
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1727
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1728
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1729
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1730
            "bootid": nodeinfo.get('bootid', None),
1731
            }
1732
        else:
1733
          live_data[name] = {}
1734
    else:
1735
      live_data = dict.fromkeys(nodenames, {})
1736

    
1737
    node_to_primary = dict([(name, set()) for name in nodenames])
1738
    node_to_secondary = dict([(name, set()) for name in nodenames])
1739

    
1740
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1741
                             "sinst_cnt", "sinst_list"))
1742
    if inst_fields & frozenset(self.op.output_fields):
1743
      instancelist = self.cfg.GetInstanceList()
1744

    
1745
      for instance_name in instancelist:
1746
        inst = self.cfg.GetInstanceInfo(instance_name)
1747
        if inst.primary_node in node_to_primary:
1748
          node_to_primary[inst.primary_node].add(inst.name)
1749
        for secnode in inst.secondary_nodes:
1750
          if secnode in node_to_secondary:
1751
            node_to_secondary[secnode].add(inst.name)
1752

    
1753
    master_node = self.cfg.GetMasterNode()
1754

    
1755
    # end data gathering
1756

    
1757
    output = []
1758
    for node in nodelist:
1759
      node_output = []
1760
      for field in self.op.output_fields:
1761
        if field == "name":
1762
          val = node.name
1763
        elif field == "pinst_list":
1764
          val = list(node_to_primary[node.name])
1765
        elif field == "sinst_list":
1766
          val = list(node_to_secondary[node.name])
1767
        elif field == "pinst_cnt":
1768
          val = len(node_to_primary[node.name])
1769
        elif field == "sinst_cnt":
1770
          val = len(node_to_secondary[node.name])
1771
        elif field == "pip":
1772
          val = node.primary_ip
1773
        elif field == "sip":
1774
          val = node.secondary_ip
1775
        elif field == "tags":
1776
          val = list(node.GetTags())
1777
        elif field == "serial_no":
1778
          val = node.serial_no
1779
        elif field == "master_candidate":
1780
          val = node.master_candidate
1781
        elif field == "master":
1782
          val = node.name == master_node
1783
        elif self._FIELDS_DYNAMIC.Matches(field):
1784
          val = live_data[node.name].get(field, None)
1785
        else:
1786
          raise errors.ParameterError(field)
1787
        node_output.append(val)
1788
      output.append(node_output)
1789

    
1790
    return output
1791

    
1792

    
1793
class LUQueryNodeVolumes(NoHooksLU):
1794
  """Logical unit for getting volumes on node(s).
1795

1796
  """
1797
  _OP_REQP = ["nodes", "output_fields"]
1798
  REQ_BGL = False
1799
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1800
  _FIELDS_STATIC = utils.FieldSet("node")
1801

    
1802
  def ExpandNames(self):
1803
    _CheckOutputFields(static=self._FIELDS_STATIC,
1804
                       dynamic=self._FIELDS_DYNAMIC,
1805
                       selected=self.op.output_fields)
1806

    
1807
    self.needed_locks = {}
1808
    self.share_locks[locking.LEVEL_NODE] = 1
1809
    if not self.op.nodes:
1810
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1811
    else:
1812
      self.needed_locks[locking.LEVEL_NODE] = \
1813
        _GetWantedNodes(self, self.op.nodes)
1814

    
1815
  def CheckPrereq(self):
1816
    """Check prerequisites.
1817

1818
    This checks that the fields required are valid output fields.
1819

1820
    """
1821
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1822

    
1823
  def Exec(self, feedback_fn):
1824
    """Computes the list of nodes and their attributes.
1825

1826
    """
1827
    nodenames = self.nodes
1828
    volumes = self.rpc.call_node_volumes(nodenames)
1829

    
1830
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1831
             in self.cfg.GetInstanceList()]
1832

    
1833
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1834

    
1835
    output = []
1836
    for node in nodenames:
1837
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1838
        continue
1839

    
1840
      node_vols = volumes[node].data[:]
1841
      node_vols.sort(key=lambda vol: vol['dev'])
1842

    
1843
      for vol in node_vols:
1844
        node_output = []
1845
        for field in self.op.output_fields:
1846
          if field == "node":
1847
            val = node
1848
          elif field == "phys":
1849
            val = vol['dev']
1850
          elif field == "vg":
1851
            val = vol['vg']
1852
          elif field == "name":
1853
            val = vol['name']
1854
          elif field == "size":
1855
            val = int(float(vol['size']))
1856
          elif field == "instance":
1857
            for inst in ilist:
1858
              if node not in lv_by_node[inst]:
1859
                continue
1860
              if vol['name'] in lv_by_node[inst][node]:
1861
                val = inst.name
1862
                break
1863
            else:
1864
              val = '-'
1865
          else:
1866
            raise errors.ParameterError(field)
1867
          node_output.append(str(val))
1868

    
1869
        output.append(node_output)
1870

    
1871
    return output
1872

    
1873

    
1874
class LUAddNode(LogicalUnit):
1875
  """Logical unit for adding node to the cluster.
1876

1877
  """
1878
  HPATH = "node-add"
1879
  HTYPE = constants.HTYPE_NODE
1880
  _OP_REQP = ["node_name"]
1881

    
1882
  def BuildHooksEnv(self):
1883
    """Build hooks env.
1884

1885
    This will run on all nodes before, and on all nodes + the new node after.
1886

1887
    """
1888
    env = {
1889
      "OP_TARGET": self.op.node_name,
1890
      "NODE_NAME": self.op.node_name,
1891
      "NODE_PIP": self.op.primary_ip,
1892
      "NODE_SIP": self.op.secondary_ip,
1893
      }
1894
    nodes_0 = self.cfg.GetNodeList()
1895
    nodes_1 = nodes_0 + [self.op.node_name, ]
1896
    return env, nodes_0, nodes_1
1897

    
1898
  def CheckPrereq(self):
1899
    """Check prerequisites.
1900

1901
    This checks:
1902
     - the new node is not already in the config
1903
     - it is resolvable
1904
     - its parameters (single/dual homed) matches the cluster
1905

1906
    Any errors are signalled by raising errors.OpPrereqError.
1907

1908
    """
1909
    node_name = self.op.node_name
1910
    cfg = self.cfg
1911

    
1912
    dns_data = utils.HostInfo(node_name)
1913

    
1914
    node = dns_data.name
1915
    primary_ip = self.op.primary_ip = dns_data.ip
1916
    secondary_ip = getattr(self.op, "secondary_ip", None)
1917
    if secondary_ip is None:
1918
      secondary_ip = primary_ip
1919
    if not utils.IsValidIP(secondary_ip):
1920
      raise errors.OpPrereqError("Invalid secondary IP given")
1921
    self.op.secondary_ip = secondary_ip
1922

    
1923
    node_list = cfg.GetNodeList()
1924
    if not self.op.readd and node in node_list:
1925
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1926
                                 node)
1927
    elif self.op.readd and node not in node_list:
1928
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1929

    
1930
    for existing_node_name in node_list:
1931
      existing_node = cfg.GetNodeInfo(existing_node_name)
1932

    
1933
      if self.op.readd and node == existing_node_name:
1934
        if (existing_node.primary_ip != primary_ip or
1935
            existing_node.secondary_ip != secondary_ip):
1936
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1937
                                     " address configuration as before")
1938
        continue
1939

    
1940
      if (existing_node.primary_ip == primary_ip or
1941
          existing_node.secondary_ip == primary_ip or
1942
          existing_node.primary_ip == secondary_ip or
1943
          existing_node.secondary_ip == secondary_ip):
1944
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1945
                                   " existing node %s" % existing_node.name)
1946

    
1947
    # check that the type of the node (single versus dual homed) is the
1948
    # same as for the master
1949
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1950
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1951
    newbie_singlehomed = secondary_ip == primary_ip
1952
    if master_singlehomed != newbie_singlehomed:
1953
      if master_singlehomed:
1954
        raise errors.OpPrereqError("The master has no private ip but the"
1955
                                   " new node has one")
1956
      else:
1957
        raise errors.OpPrereqError("The master has a private ip but the"
1958
                                   " new node doesn't have one")
1959

    
1960
    # checks reachablity
1961
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1962
      raise errors.OpPrereqError("Node not reachable by ping")
1963

    
1964
    if not newbie_singlehomed:
1965
      # check reachability from my secondary ip to newbie's secondary ip
1966
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1967
                           source=myself.secondary_ip):
1968
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1969
                                   " based ping to noded port")
1970

    
1971
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1972
    node_info = self.cfg.GetAllNodesInfo().values()
1973
    num_candidates = len([n for n in node_info
1974
                          if n.master_candidate])
1975
    master_candidate = num_candidates < cp_size
1976

    
1977
    self.new_node = objects.Node(name=node,
1978
                                 primary_ip=primary_ip,
1979
                                 secondary_ip=secondary_ip,
1980
                                 master_candidate=master_candidate)
1981

    
1982
  def Exec(self, feedback_fn):
1983
    """Adds the new node to the cluster.
1984

1985
    """
1986
    new_node = self.new_node
1987
    node = new_node.name
1988

    
1989
    # check connectivity
1990
    result = self.rpc.call_version([node])[node]
1991
    result.Raise()
1992
    if result.data:
1993
      if constants.PROTOCOL_VERSION == result.data:
1994
        logging.info("Communication to node %s fine, sw version %s match",
1995
                     node, result.data)
1996
      else:
1997
        raise errors.OpExecError("Version mismatch master version %s,"
1998
                                 " node version %s" %
1999
                                 (constants.PROTOCOL_VERSION, result.data))
2000
    else:
2001
      raise errors.OpExecError("Cannot get version from the new node")
2002

    
2003
    # setup ssh on node
2004
    logging.info("Copy ssh key to node %s", node)
2005
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2006
    keyarray = []
2007
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2008
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2009
                priv_key, pub_key]
2010

    
2011
    for i in keyfiles:
2012
      f = open(i, 'r')
2013
      try:
2014
        keyarray.append(f.read())
2015
      finally:
2016
        f.close()
2017

    
2018
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2019
                                    keyarray[2],
2020
                                    keyarray[3], keyarray[4], keyarray[5])
2021

    
2022
    if result.failed or not result.data:
2023
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2024

    
2025
    # Add node to our /etc/hosts, and add key to known_hosts
2026
    utils.AddHostToEtcHosts(new_node.name)
2027

    
2028
    if new_node.secondary_ip != new_node.primary_ip:
2029
      result = self.rpc.call_node_has_ip_address(new_node.name,
2030
                                                 new_node.secondary_ip)
2031
      if result.failed or not result.data:
2032
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2033
                                 " you gave (%s). Please fix and re-run this"
2034
                                 " command." % new_node.secondary_ip)
2035

    
2036
    node_verify_list = [self.cfg.GetMasterNode()]
2037
    node_verify_param = {
2038
      'nodelist': [node],
2039
      # TODO: do a node-net-test as well?
2040
    }
2041

    
2042
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2043
                                       self.cfg.GetClusterName())
2044
    for verifier in node_verify_list:
2045
      if result[verifier].failed or not result[verifier].data:
2046
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2047
                                 " for remote verification" % verifier)
2048
      if result[verifier].data['nodelist']:
2049
        for failed in result[verifier].data['nodelist']:
2050
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2051
                      (verifier, result[verifier]['nodelist'][failed]))
2052
        raise errors.OpExecError("ssh/hostname verification failed.")
2053

    
2054
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2055
    # including the node just added
2056
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2057
    dist_nodes = self.cfg.GetNodeList()
2058
    if not self.op.readd:
2059
      dist_nodes.append(node)
2060
    if myself.name in dist_nodes:
2061
      dist_nodes.remove(myself.name)
2062

    
2063
    logging.debug("Copying hosts and known_hosts to all nodes")
2064
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2065
      result = self.rpc.call_upload_file(dist_nodes, fname)
2066
      for to_node, to_result in result.iteritems():
2067
        if to_result.failed or not to_result.data:
2068
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2069

    
2070
    to_copy = []
2071
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2072
      to_copy.append(constants.VNC_PASSWORD_FILE)
2073
    for fname in to_copy:
2074
      result = self.rpc.call_upload_file([node], fname)
2075
      if result[node].failed or not result[node]:
2076
        logging.error("Could not copy file %s to node %s", fname, node)
2077

    
2078
    if self.op.readd:
2079
      self.context.ReaddNode(new_node)
2080
    else:
2081
      self.context.AddNode(new_node)
2082

    
2083

    
2084
class LUSetNodeParams(LogicalUnit):
2085
  """Modifies the parameters of a node.
2086

2087
  """
2088
  HPATH = "node-modify"
2089
  HTYPE = constants.HTYPE_NODE
2090
  _OP_REQP = ["node_name"]
2091
  REQ_BGL = False
2092

    
2093
  def CheckArguments(self):
2094
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2095
    if node_name is None:
2096
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2097
    self.op.node_name = node_name
2098
    if not hasattr(self.op, 'master_candidate'):
2099
      raise errors.OpPrereqError("Please pass at least one modification")
2100
    self.op.master_candidate = bool(self.op.master_candidate)
2101

    
2102
  def ExpandNames(self):
2103
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2104

    
2105
  def BuildHooksEnv(self):
2106
    """Build hooks env.
2107

2108
    This runs on the master node.
2109

2110
    """
2111
    env = {
2112
      "OP_TARGET": self.op.node_name,
2113
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2114
      }
2115
    nl = [self.cfg.GetMasterNode(),
2116
          self.op.node_name]
2117
    return env, nl, nl
2118

    
2119
  def CheckPrereq(self):
2120
    """Check prerequisites.
2121

2122
    This only checks the instance list against the existing names.
2123

2124
    """
2125
    force = self.force = self.op.force
2126

    
2127
    if self.op.master_candidate == False:
2128
      if self.op.node_name == self.cfg.GetMasterNode():
2129
        raise errors.OpPrereqError("The master node has to be a"
2130
                                   " master candidate")
2131
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2132
      node_info = self.cfg.GetAllNodesInfo().values()
2133
      num_candidates = len([node for node in node_info
2134
                            if node.master_candidate])
2135
      if num_candidates <= cp_size:
2136
        msg = ("Not enough master candidates (desired"
2137
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2138
        if force:
2139
          self.LogWarning(msg)
2140
        else:
2141
          raise errors.OpPrereqError(msg)
2142

    
2143
    return
2144

    
2145
  def Exec(self, feedback_fn):
2146
    """Modifies a node.
2147

2148
    """
2149
    node = self.cfg.GetNodeInfo(self.op.node_name)
2150

    
2151
    result = []
2152

    
2153
    if self.op.master_candidate is not None:
2154
      node.master_candidate = self.op.master_candidate
2155
      result.append(("master_candidate", str(self.op.master_candidate)))
2156

    
2157
    # this will trigger configuration file update, if needed
2158
    self.cfg.Update(node)
2159
    # this will trigger job queue propagation or cleanup
2160
    if self.op.node_name != self.cfg.GetMasterNode():
2161
      self.context.ReaddNode(node)
2162

    
2163
    return result
2164

    
2165

    
2166
class LUQueryClusterInfo(NoHooksLU):
2167
  """Query cluster configuration.
2168

2169
  """
2170
  _OP_REQP = []
2171
  REQ_BGL = False
2172

    
2173
  def ExpandNames(self):
2174
    self.needed_locks = {}
2175

    
2176
  def CheckPrereq(self):
2177
    """No prerequsites needed for this LU.
2178

2179
    """
2180
    pass
2181

    
2182
  def Exec(self, feedback_fn):
2183
    """Return cluster config.
2184

2185
    """
2186
    cluster = self.cfg.GetClusterInfo()
2187
    result = {
2188
      "software_version": constants.RELEASE_VERSION,
2189
      "protocol_version": constants.PROTOCOL_VERSION,
2190
      "config_version": constants.CONFIG_VERSION,
2191
      "os_api_version": constants.OS_API_VERSION,
2192
      "export_version": constants.EXPORT_VERSION,
2193
      "architecture": (platform.architecture()[0], platform.machine()),
2194
      "name": cluster.cluster_name,
2195
      "master": cluster.master_node,
2196
      "default_hypervisor": cluster.default_hypervisor,
2197
      "enabled_hypervisors": cluster.enabled_hypervisors,
2198
      "hvparams": cluster.hvparams,
2199
      "beparams": cluster.beparams,
2200
      "candidate_pool_size": cluster.candidate_pool_size,
2201
      }
2202

    
2203
    return result
2204

    
2205

    
2206
class LUQueryConfigValues(NoHooksLU):
2207
  """Return configuration values.
2208

2209
  """
2210
  _OP_REQP = []
2211
  REQ_BGL = False
2212
  _FIELDS_DYNAMIC = utils.FieldSet()
2213
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2214

    
2215
  def ExpandNames(self):
2216
    self.needed_locks = {}
2217

    
2218
    _CheckOutputFields(static=self._FIELDS_STATIC,
2219
                       dynamic=self._FIELDS_DYNAMIC,
2220
                       selected=self.op.output_fields)
2221

    
2222
  def CheckPrereq(self):
2223
    """No prerequisites.
2224

2225
    """
2226
    pass
2227

    
2228
  def Exec(self, feedback_fn):
2229
    """Dump a representation of the cluster config to the standard output.
2230

2231
    """
2232
    values = []
2233
    for field in self.op.output_fields:
2234
      if field == "cluster_name":
2235
        entry = self.cfg.GetClusterName()
2236
      elif field == "master_node":
2237
        entry = self.cfg.GetMasterNode()
2238
      elif field == "drain_flag":
2239
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2240
      else:
2241
        raise errors.ParameterError(field)
2242
      values.append(entry)
2243
    return values
2244

    
2245

    
2246
class LUActivateInstanceDisks(NoHooksLU):
2247
  """Bring up an instance's disks.
2248

2249
  """
2250
  _OP_REQP = ["instance_name"]
2251
  REQ_BGL = False
2252

    
2253
  def ExpandNames(self):
2254
    self._ExpandAndLockInstance()
2255
    self.needed_locks[locking.LEVEL_NODE] = []
2256
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2257

    
2258
  def DeclareLocks(self, level):
2259
    if level == locking.LEVEL_NODE:
2260
      self._LockInstancesNodes()
2261

    
2262
  def CheckPrereq(self):
2263
    """Check prerequisites.
2264

2265
    This checks that the instance is in the cluster.
2266

2267
    """
2268
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2269
    assert self.instance is not None, \
2270
      "Cannot retrieve locked instance %s" % self.op.instance_name
2271

    
2272
  def Exec(self, feedback_fn):
2273
    """Activate the disks.
2274

2275
    """
2276
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2277
    if not disks_ok:
2278
      raise errors.OpExecError("Cannot activate block devices")
2279

    
2280
    return disks_info
2281

    
2282

    
2283
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2284
  """Prepare the block devices for an instance.
2285

2286
  This sets up the block devices on all nodes.
2287

2288
  @type lu: L{LogicalUnit}
2289
  @param lu: the logical unit on whose behalf we execute
2290
  @type instance: L{objects.Instance}
2291
  @param instance: the instance for whose disks we assemble
2292
  @type ignore_secondaries: boolean
2293
  @param ignore_secondaries: if true, errors on secondary nodes
2294
      won't result in an error return from the function
2295
  @return: False if the operation failed, otherwise a list of
2296
      (host, instance_visible_name, node_visible_name)
2297
      with the mapping from node devices to instance devices
2298

2299
  """
2300
  device_info = []
2301
  disks_ok = True
2302
  iname = instance.name
2303
  # With the two passes mechanism we try to reduce the window of
2304
  # opportunity for the race condition of switching DRBD to primary
2305
  # before handshaking occured, but we do not eliminate it
2306

    
2307
  # The proper fix would be to wait (with some limits) until the
2308
  # connection has been made and drbd transitions from WFConnection
2309
  # into any other network-connected state (Connected, SyncTarget,
2310
  # SyncSource, etc.)
2311

    
2312
  # 1st pass, assemble on all nodes in secondary mode
2313
  for inst_disk in instance.disks:
2314
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2315
      lu.cfg.SetDiskID(node_disk, node)
2316
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2317
      if result.failed or not result:
2318
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2319
                           " (is_primary=False, pass=1)",
2320
                           inst_disk.iv_name, node)
2321
        if not ignore_secondaries:
2322
          disks_ok = False
2323

    
2324
  # FIXME: race condition on drbd migration to primary
2325

    
2326
  # 2nd pass, do only the primary node
2327
  for inst_disk in instance.disks:
2328
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2329
      if node != instance.primary_node:
2330
        continue
2331
      lu.cfg.SetDiskID(node_disk, node)
2332
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2333
      if result.failed or not result:
2334
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2335
                           " (is_primary=True, pass=2)",
2336
                           inst_disk.iv_name, node)
2337
        disks_ok = False
2338
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2339

    
2340
  # leave the disks configured for the primary node
2341
  # this is a workaround that would be fixed better by
2342
  # improving the logical/physical id handling
2343
  for disk in instance.disks:
2344
    lu.cfg.SetDiskID(disk, instance.primary_node)
2345

    
2346
  return disks_ok, device_info
2347

    
2348

    
2349
def _StartInstanceDisks(lu, instance, force):
2350
  """Start the disks of an instance.
2351

2352
  """
2353
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2354
                                           ignore_secondaries=force)
2355
  if not disks_ok:
2356
    _ShutdownInstanceDisks(lu, instance)
2357
    if force is not None and not force:
2358
      lu.proc.LogWarning("", hint="If the message above refers to a"
2359
                         " secondary node,"
2360
                         " you can retry the operation using '--force'.")
2361
    raise errors.OpExecError("Disk consistency error")
2362

    
2363

    
2364
class LUDeactivateInstanceDisks(NoHooksLU):
2365
  """Shutdown an instance's disks.
2366

2367
  """
2368
  _OP_REQP = ["instance_name"]
2369
  REQ_BGL = False
2370

    
2371
  def ExpandNames(self):
2372
    self._ExpandAndLockInstance()
2373
    self.needed_locks[locking.LEVEL_NODE] = []
2374
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2375

    
2376
  def DeclareLocks(self, level):
2377
    if level == locking.LEVEL_NODE:
2378
      self._LockInstancesNodes()
2379

    
2380
  def CheckPrereq(self):
2381
    """Check prerequisites.
2382

2383
    This checks that the instance is in the cluster.
2384

2385
    """
2386
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2387
    assert self.instance is not None, \
2388
      "Cannot retrieve locked instance %s" % self.op.instance_name
2389

    
2390
  def Exec(self, feedback_fn):
2391
    """Deactivate the disks
2392

2393
    """
2394
    instance = self.instance
2395
    _SafeShutdownInstanceDisks(self, instance)
2396

    
2397

    
2398
def _SafeShutdownInstanceDisks(lu, instance):
2399
  """Shutdown block devices of an instance.
2400

2401
  This function checks if an instance is running, before calling
2402
  _ShutdownInstanceDisks.
2403

2404
  """
2405
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2406
                                      [instance.hypervisor])
2407
  ins_l = ins_l[instance.primary_node]
2408
  if ins_l.failed or not isinstance(ins_l.data, list):
2409
    raise errors.OpExecError("Can't contact node '%s'" %
2410
                             instance.primary_node)
2411

    
2412
  if instance.name in ins_l.data:
2413
    raise errors.OpExecError("Instance is running, can't shutdown"
2414
                             " block devices.")
2415

    
2416
  _ShutdownInstanceDisks(lu, instance)
2417

    
2418

    
2419
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2420
  """Shutdown block devices of an instance.
2421

2422
  This does the shutdown on all nodes of the instance.
2423

2424
  If the ignore_primary is false, errors on the primary node are
2425
  ignored.
2426

2427
  """
2428
  result = True
2429
  for disk in instance.disks:
2430
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2431
      lu.cfg.SetDiskID(top_disk, node)
2432
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2433
      if result.failed or not result.data:
2434
        logging.error("Could not shutdown block device %s on node %s",
2435
                      disk.iv_name, node)
2436
        if not ignore_primary or node != instance.primary_node:
2437
          result = False
2438
  return result
2439

    
2440

    
2441
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2442
  """Checks if a node has enough free memory.
2443

2444
  This function check if a given node has the needed amount of free
2445
  memory. In case the node has less memory or we cannot get the
2446
  information from the node, this function raise an OpPrereqError
2447
  exception.
2448

2449
  @type lu: C{LogicalUnit}
2450
  @param lu: a logical unit from which we get configuration data
2451
  @type node: C{str}
2452
  @param node: the node to check
2453
  @type reason: C{str}
2454
  @param reason: string to use in the error message
2455
  @type requested: C{int}
2456
  @param requested: the amount of memory in MiB to check for
2457
  @type hypervisor: C{str}
2458
  @param hypervisor: the hypervisor to ask for memory stats
2459
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2460
      we cannot check the node
2461

2462
  """
2463
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2464
  nodeinfo[node].Raise()
2465
  free_mem = nodeinfo[node].data.get('memory_free')
2466
  if not isinstance(free_mem, int):
2467
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2468
                             " was '%s'" % (node, free_mem))
2469
  if requested > free_mem:
2470
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2471
                             " needed %s MiB, available %s MiB" %
2472
                             (node, reason, requested, free_mem))
2473

    
2474

    
2475
class LUStartupInstance(LogicalUnit):
2476
  """Starts an instance.
2477

2478
  """
2479
  HPATH = "instance-start"
2480
  HTYPE = constants.HTYPE_INSTANCE
2481
  _OP_REQP = ["instance_name", "force"]
2482
  REQ_BGL = False
2483

    
2484
  def ExpandNames(self):
2485
    self._ExpandAndLockInstance()
2486

    
2487
  def BuildHooksEnv(self):
2488
    """Build hooks env.
2489

2490
    This runs on master, primary and secondary nodes of the instance.
2491

2492
    """
2493
    env = {
2494
      "FORCE": self.op.force,
2495
      }
2496
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2497
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2498
          list(self.instance.secondary_nodes))
2499
    return env, nl, nl
2500

    
2501
  def CheckPrereq(self):
2502
    """Check prerequisites.
2503

2504
    This checks that the instance is in the cluster.
2505

2506
    """
2507
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2508
    assert self.instance is not None, \
2509
      "Cannot retrieve locked instance %s" % self.op.instance_name
2510

    
2511
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2512
    # check bridges existance
2513
    _CheckInstanceBridgesExist(self, instance)
2514

    
2515
    _CheckNodeFreeMemory(self, instance.primary_node,
2516
                         "starting instance %s" % instance.name,
2517
                         bep[constants.BE_MEMORY], instance.hypervisor)
2518

    
2519
  def Exec(self, feedback_fn):
2520
    """Start the instance.
2521

2522
    """
2523
    instance = self.instance
2524
    force = self.op.force
2525
    extra_args = getattr(self.op, "extra_args", "")
2526

    
2527
    self.cfg.MarkInstanceUp(instance.name)
2528

    
2529
    node_current = instance.primary_node
2530

    
2531
    _StartInstanceDisks(self, instance, force)
2532

    
2533
    result = self.rpc.call_instance_start(node_current, instance, extra_args)
2534
    if result.failed or not result.data:
2535
      _ShutdownInstanceDisks(self, instance)
2536
      raise errors.OpExecError("Could not start instance")
2537

    
2538

    
2539
class LURebootInstance(LogicalUnit):
2540
  """Reboot an instance.
2541

2542
  """
2543
  HPATH = "instance-reboot"
2544
  HTYPE = constants.HTYPE_INSTANCE
2545
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2546
  REQ_BGL = False
2547

    
2548
  def ExpandNames(self):
2549
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2550
                                   constants.INSTANCE_REBOOT_HARD,
2551
                                   constants.INSTANCE_REBOOT_FULL]:
2552
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2553
                                  (constants.INSTANCE_REBOOT_SOFT,
2554
                                   constants.INSTANCE_REBOOT_HARD,
2555
                                   constants.INSTANCE_REBOOT_FULL))
2556
    self._ExpandAndLockInstance()
2557

    
2558
  def BuildHooksEnv(self):
2559
    """Build hooks env.
2560

2561
    This runs on master, primary and secondary nodes of the instance.
2562

2563
    """
2564
    env = {
2565
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2566
      }
2567
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2568
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2569
          list(self.instance.secondary_nodes))
2570
    return env, nl, nl
2571

    
2572
  def CheckPrereq(self):
2573
    """Check prerequisites.
2574

2575
    This checks that the instance is in the cluster.
2576

2577
    """
2578
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2579
    assert self.instance is not None, \
2580
      "Cannot retrieve locked instance %s" % self.op.instance_name
2581

    
2582
    # check bridges existance
2583
    _CheckInstanceBridgesExist(self, instance)
2584

    
2585
  def Exec(self, feedback_fn):
2586
    """Reboot the instance.
2587

2588
    """
2589
    instance = self.instance
2590
    ignore_secondaries = self.op.ignore_secondaries
2591
    reboot_type = self.op.reboot_type
2592
    extra_args = getattr(self.op, "extra_args", "")
2593

    
2594
    node_current = instance.primary_node
2595

    
2596
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2597
                       constants.INSTANCE_REBOOT_HARD]:
2598
      result = self.rpc.call_instance_reboot(node_current, instance,
2599
                                             reboot_type, extra_args)
2600
      if result.failed or not result.data:
2601
        raise errors.OpExecError("Could not reboot instance")
2602
    else:
2603
      if not self.rpc.call_instance_shutdown(node_current, instance):
2604
        raise errors.OpExecError("could not shutdown instance for full reboot")
2605
      _ShutdownInstanceDisks(self, instance)
2606
      _StartInstanceDisks(self, instance, ignore_secondaries)
2607
      result = self.rpc.call_instance_start(node_current, instance, extra_args)
2608
      if result.failed or not result.data:
2609
        _ShutdownInstanceDisks(self, instance)
2610
        raise errors.OpExecError("Could not start instance for full reboot")
2611

    
2612
    self.cfg.MarkInstanceUp(instance.name)
2613

    
2614

    
2615
class LUShutdownInstance(LogicalUnit):
2616
  """Shutdown an instance.
2617

2618
  """
2619
  HPATH = "instance-stop"
2620
  HTYPE = constants.HTYPE_INSTANCE
2621
  _OP_REQP = ["instance_name"]
2622
  REQ_BGL = False
2623

    
2624
  def ExpandNames(self):
2625
    self._ExpandAndLockInstance()
2626

    
2627
  def BuildHooksEnv(self):
2628
    """Build hooks env.
2629

2630
    This runs on master, primary and secondary nodes of the instance.
2631

2632
    """
2633
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2634
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2635
          list(self.instance.secondary_nodes))
2636
    return env, nl, nl
2637

    
2638
  def CheckPrereq(self):
2639
    """Check prerequisites.
2640

2641
    This checks that the instance is in the cluster.
2642

2643
    """
2644
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2645
    assert self.instance is not None, \
2646
      "Cannot retrieve locked instance %s" % self.op.instance_name
2647

    
2648
  def Exec(self, feedback_fn):
2649
    """Shutdown the instance.
2650

2651
    """
2652
    instance = self.instance
2653
    node_current = instance.primary_node
2654
    self.cfg.MarkInstanceDown(instance.name)
2655
    result = self.rpc.call_instance_shutdown(node_current, instance)
2656
    if result.failed or not result.data:
2657
      self.proc.LogWarning("Could not shutdown instance")
2658

    
2659
    _ShutdownInstanceDisks(self, instance)
2660

    
2661

    
2662
class LUReinstallInstance(LogicalUnit):
2663
  """Reinstall an instance.
2664

2665
  """
2666
  HPATH = "instance-reinstall"
2667
  HTYPE = constants.HTYPE_INSTANCE
2668
  _OP_REQP = ["instance_name"]
2669
  REQ_BGL = False
2670

    
2671
  def ExpandNames(self):
2672
    self._ExpandAndLockInstance()
2673

    
2674
  def BuildHooksEnv(self):
2675
    """Build hooks env.
2676

2677
    This runs on master, primary and secondary nodes of the instance.
2678

2679
    """
2680
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2681
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2682
          list(self.instance.secondary_nodes))
2683
    return env, nl, nl
2684

    
2685
  def CheckPrereq(self):
2686
    """Check prerequisites.
2687

2688
    This checks that the instance is in the cluster and is not running.
2689

2690
    """
2691
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2692
    assert instance is not None, \
2693
      "Cannot retrieve locked instance %s" % self.op.instance_name
2694

    
2695
    if instance.disk_template == constants.DT_DISKLESS:
2696
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2697
                                 self.op.instance_name)
2698
    if instance.status != "down":
2699
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2700
                                 self.op.instance_name)
2701
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2702
                                              instance.name,
2703
                                              instance.hypervisor)
2704
    if remote_info.failed or remote_info.data:
2705
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2706
                                 (self.op.instance_name,
2707
                                  instance.primary_node))
2708

    
2709
    self.op.os_type = getattr(self.op, "os_type", None)
2710
    if self.op.os_type is not None:
2711
      # OS verification
2712
      pnode = self.cfg.GetNodeInfo(
2713
        self.cfg.ExpandNodeName(instance.primary_node))
2714
      if pnode is None:
2715
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2716
                                   self.op.pnode)
2717
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2718
      result.Raise()
2719
      if not isinstance(result.data, objects.OS):
2720
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2721
                                   " primary node"  % self.op.os_type)
2722

    
2723
    self.instance = instance
2724

    
2725
  def Exec(self, feedback_fn):
2726
    """Reinstall the instance.
2727

2728
    """
2729
    inst = self.instance
2730

    
2731
    if self.op.os_type is not None:
2732
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2733
      inst.os = self.op.os_type
2734
      self.cfg.Update(inst)
2735

    
2736
    _StartInstanceDisks(self, inst, None)
2737
    try:
2738
      feedback_fn("Running the instance OS create scripts...")
2739
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2740
      result.Raise()
2741
      if not result.data:
2742
        raise errors.OpExecError("Could not install OS for instance %s"
2743
                                 " on node %s" %
2744
                                 (inst.name, inst.primary_node))
2745
    finally:
2746
      _ShutdownInstanceDisks(self, inst)
2747

    
2748

    
2749
class LURenameInstance(LogicalUnit):
2750
  """Rename an instance.
2751

2752
  """
2753
  HPATH = "instance-rename"
2754
  HTYPE = constants.HTYPE_INSTANCE
2755
  _OP_REQP = ["instance_name", "new_name"]
2756

    
2757
  def BuildHooksEnv(self):
2758
    """Build hooks env.
2759

2760
    This runs on master, primary and secondary nodes of the instance.
2761

2762
    """
2763
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2764
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2765
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2766
          list(self.instance.secondary_nodes))
2767
    return env, nl, nl
2768

    
2769
  def CheckPrereq(self):
2770
    """Check prerequisites.
2771

2772
    This checks that the instance is in the cluster and is not running.
2773

2774
    """
2775
    instance = self.cfg.GetInstanceInfo(
2776
      self.cfg.ExpandInstanceName(self.op.instance_name))
2777
    if instance is None:
2778
      raise errors.OpPrereqError("Instance '%s' not known" %
2779
                                 self.op.instance_name)
2780
    if instance.status != "down":
2781
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2782
                                 self.op.instance_name)
2783
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2784
                                              instance.name,
2785
                                              instance.hypervisor)
2786
    remote_info.Raise()
2787
    if remote_info.data:
2788
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2789
                                 (self.op.instance_name,
2790
                                  instance.primary_node))
2791
    self.instance = instance
2792

    
2793
    # new name verification
2794
    name_info = utils.HostInfo(self.op.new_name)
2795

    
2796
    self.op.new_name = new_name = name_info.name
2797
    instance_list = self.cfg.GetInstanceList()
2798
    if new_name in instance_list:
2799
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2800
                                 new_name)
2801

    
2802
    if not getattr(self.op, "ignore_ip", False):
2803
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2804
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2805
                                   (name_info.ip, new_name))
2806

    
2807

    
2808
  def Exec(self, feedback_fn):
2809
    """Reinstall the instance.
2810

2811
    """
2812
    inst = self.instance
2813
    old_name = inst.name
2814

    
2815
    if inst.disk_template == constants.DT_FILE:
2816
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2817

    
2818
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2819
    # Change the instance lock. This is definitely safe while we hold the BGL
2820
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2821
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2822

    
2823
    # re-read the instance from the configuration after rename
2824
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2825

    
2826
    if inst.disk_template == constants.DT_FILE:
2827
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2828
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2829
                                                     old_file_storage_dir,
2830
                                                     new_file_storage_dir)
2831
      result.Raise()
2832
      if not result.data:
2833
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2834
                                 " directory '%s' to '%s' (but the instance"
2835
                                 " has been renamed in Ganeti)" % (
2836
                                 inst.primary_node, old_file_storage_dir,
2837
                                 new_file_storage_dir))
2838

    
2839
      if not result.data[0]:
2840
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2841
                                 " (but the instance has been renamed in"
2842
                                 " Ganeti)" % (old_file_storage_dir,
2843
                                               new_file_storage_dir))
2844

    
2845
    _StartInstanceDisks(self, inst, None)
2846
    try:
2847
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2848
                                                 old_name)
2849
      if result.failed or not result.data:
2850
        msg = ("Could not run OS rename script for instance %s on node %s"
2851
               " (but the instance has been renamed in Ganeti)" %
2852
               (inst.name, inst.primary_node))
2853
        self.proc.LogWarning(msg)
2854
    finally:
2855
      _ShutdownInstanceDisks(self, inst)
2856

    
2857

    
2858
class LURemoveInstance(LogicalUnit):
2859
  """Remove an instance.
2860

2861
  """
2862
  HPATH = "instance-remove"
2863
  HTYPE = constants.HTYPE_INSTANCE
2864
  _OP_REQP = ["instance_name", "ignore_failures"]
2865
  REQ_BGL = False
2866

    
2867
  def ExpandNames(self):
2868
    self._ExpandAndLockInstance()
2869
    self.needed_locks[locking.LEVEL_NODE] = []
2870
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2871

    
2872
  def DeclareLocks(self, level):
2873
    if level == locking.LEVEL_NODE:
2874
      self._LockInstancesNodes()
2875

    
2876
  def BuildHooksEnv(self):
2877
    """Build hooks env.
2878

2879
    This runs on master, primary and secondary nodes of the instance.
2880

2881
    """
2882
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2883
    nl = [self.cfg.GetMasterNode()]
2884
    return env, nl, nl
2885

    
2886
  def CheckPrereq(self):
2887
    """Check prerequisites.
2888

2889
    This checks that the instance is in the cluster.
2890

2891
    """
2892
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2893
    assert self.instance is not None, \
2894
      "Cannot retrieve locked instance %s" % self.op.instance_name
2895

    
2896
  def Exec(self, feedback_fn):
2897
    """Remove the instance.
2898

2899
    """
2900
    instance = self.instance
2901
    logging.info("Shutting down instance %s on node %s",
2902
                 instance.name, instance.primary_node)
2903

    
2904
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2905
    if result.failed or not result.data:
2906
      if self.op.ignore_failures:
2907
        feedback_fn("Warning: can't shutdown instance")
2908
      else:
2909
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2910
                                 (instance.name, instance.primary_node))
2911

    
2912
    logging.info("Removing block devices for instance %s", instance.name)
2913

    
2914
    if not _RemoveDisks(self, instance):
2915
      if self.op.ignore_failures:
2916
        feedback_fn("Warning: can't remove instance's disks")
2917
      else:
2918
        raise errors.OpExecError("Can't remove instance's disks")
2919

    
2920
    logging.info("Removing instance %s out of cluster config", instance.name)
2921

    
2922
    self.cfg.RemoveInstance(instance.name)
2923
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2924

    
2925

    
2926
class LUQueryInstances(NoHooksLU):
2927
  """Logical unit for querying instances.
2928

2929
  """
2930
  _OP_REQP = ["output_fields", "names"]
2931
  REQ_BGL = False
2932
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2933
                                    "admin_state", "admin_ram",
2934
                                    "disk_template", "ip", "mac", "bridge",
2935
                                    "sda_size", "sdb_size", "vcpus", "tags",
2936
                                    "network_port", "beparams",
2937
                                    "(disk).(size)/([0-9]+)",
2938
                                    "(disk).(sizes)",
2939
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2940
                                    "(nic).(macs|ips|bridges)",
2941
                                    "(disk|nic).(count)",
2942
                                    "serial_no", "hypervisor", "hvparams",] +
2943
                                  ["hv/%s" % name
2944
                                   for name in constants.HVS_PARAMETERS] +
2945
                                  ["be/%s" % name
2946
                                   for name in constants.BES_PARAMETERS])
2947
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2948

    
2949

    
2950
  def ExpandNames(self):
2951
    _CheckOutputFields(static=self._FIELDS_STATIC,
2952
                       dynamic=self._FIELDS_DYNAMIC,
2953
                       selected=self.op.output_fields)
2954

    
2955
    self.needed_locks = {}
2956
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2957
    self.share_locks[locking.LEVEL_NODE] = 1
2958

    
2959
    if self.op.names:
2960
      self.wanted = _GetWantedInstances(self, self.op.names)
2961
    else:
2962
      self.wanted = locking.ALL_SET
2963

    
2964
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2965
    if self.do_locking:
2966
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2967
      self.needed_locks[locking.LEVEL_NODE] = []
2968
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2969

    
2970
  def DeclareLocks(self, level):
2971
    if level == locking.LEVEL_NODE and self.do_locking:
2972
      self._LockInstancesNodes()
2973

    
2974
  def CheckPrereq(self):
2975
    """Check prerequisites.
2976

2977
    """
2978
    pass
2979

    
2980
  def Exec(self, feedback_fn):
2981
    """Computes the list of nodes and their attributes.
2982

2983
    """
2984
    all_info = self.cfg.GetAllInstancesInfo()
2985
    if self.do_locking:
2986
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2987
    elif self.wanted != locking.ALL_SET:
2988
      instance_names = self.wanted
2989
      missing = set(instance_names).difference(all_info.keys())
2990
      if missing:
2991
        raise errors.OpExecError(
2992
          "Some instances were removed before retrieving their data: %s"
2993
          % missing)
2994
    else:
2995
      instance_names = all_info.keys()
2996

    
2997
    instance_names = utils.NiceSort(instance_names)
2998
    instance_list = [all_info[iname] for iname in instance_names]
2999

    
3000
    # begin data gathering
3001

    
3002
    nodes = frozenset([inst.primary_node for inst in instance_list])
3003
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3004

    
3005
    bad_nodes = []
3006
    if self.do_locking:
3007
      live_data = {}
3008
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3009
      for name in nodes:
3010
        result = node_data[name]
3011
        if result.failed:
3012
          bad_nodes.append(name)
3013
        else:
3014
          if result.data:
3015
            live_data.update(result.data)
3016
            # else no instance is alive
3017
    else:
3018
      live_data = dict([(name, {}) for name in instance_names])
3019

    
3020
    # end data gathering
3021

    
3022
    HVPREFIX = "hv/"
3023
    BEPREFIX = "be/"
3024
    output = []
3025
    for instance in instance_list:
3026
      iout = []
3027
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3028
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3029
      for field in self.op.output_fields:
3030
        st_match = self._FIELDS_STATIC.Matches(field)
3031
        if field == "name":
3032
          val = instance.name
3033
        elif field == "os":
3034
          val = instance.os
3035
        elif field == "pnode":
3036
          val = instance.primary_node
3037
        elif field == "snodes":
3038
          val = list(instance.secondary_nodes)
3039
        elif field == "admin_state":
3040
          val = (instance.status != "down")
3041
        elif field == "oper_state":
3042
          if instance.primary_node in bad_nodes:
3043
            val = None
3044
          else:
3045
            val = bool(live_data.get(instance.name))
3046
        elif field == "status":
3047
          if instance.primary_node in bad_nodes:
3048
            val = "ERROR_nodedown"
3049
          else:
3050
            running = bool(live_data.get(instance.name))
3051
            if running:
3052
              if instance.status != "down":
3053
                val = "running"
3054
              else:
3055
                val = "ERROR_up"
3056
            else:
3057
              if instance.status != "down":
3058
                val = "ERROR_down"
3059
              else:
3060
                val = "ADMIN_down"
3061
        elif field == "oper_ram":
3062
          if instance.primary_node in bad_nodes:
3063
            val = None
3064
          elif instance.name in live_data:
3065
            val = live_data[instance.name].get("memory", "?")
3066
          else:
3067
            val = "-"
3068
        elif field == "disk_template":
3069
          val = instance.disk_template
3070
        elif field == "ip":
3071
          val = instance.nics[0].ip
3072
        elif field == "bridge":
3073
          val = instance.nics[0].bridge
3074
        elif field == "mac":
3075
          val = instance.nics[0].mac
3076
        elif field == "sda_size" or field == "sdb_size":
3077
          idx = ord(field[2]) - ord('a')
3078
          try:
3079
            val = instance.FindDisk(idx).size
3080
          except errors.OpPrereqError:
3081
            val = None
3082
        elif field == "tags":
3083
          val = list(instance.GetTags())
3084
        elif field == "serial_no":
3085
          val = instance.serial_no
3086
        elif field == "network_port":
3087
          val = instance.network_port
3088
        elif field == "hypervisor":
3089
          val = instance.hypervisor
3090
        elif field == "hvparams":
3091
          val = i_hv
3092
        elif (field.startswith(HVPREFIX) and
3093
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3094
          val = i_hv.get(field[len(HVPREFIX):], None)
3095
        elif field == "beparams":
3096
          val = i_be
3097
        elif (field.startswith(BEPREFIX) and
3098
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3099
          val = i_be.get(field[len(BEPREFIX):], None)
3100
        elif st_match and st_match.groups():
3101
          # matches a variable list
3102
          st_groups = st_match.groups()
3103
          if st_groups and st_groups[0] == "disk":
3104
            if st_groups[1] == "count":
3105
              val = len(instance.disks)
3106
            elif st_groups[1] == "sizes":
3107
              val = [disk.size for disk in instance.disks]
3108
            elif st_groups[1] == "size":
3109
              try:
3110
                val = instance.FindDisk(st_groups[2]).size
3111
              except errors.OpPrereqError:
3112
                val = None
3113
            else:
3114
              assert False, "Unhandled disk parameter"
3115
          elif st_groups[0] == "nic":
3116
            if st_groups[1] == "count":
3117
              val = len(instance.nics)
3118
            elif st_groups[1] == "macs":
3119
              val = [nic.mac for nic in instance.nics]
3120
            elif st_groups[1] == "ips":
3121
              val = [nic.ip for nic in instance.nics]
3122
            elif st_groups[1] == "bridges":
3123
              val = [nic.bridge for nic in instance.nics]
3124
            else:
3125
              # index-based item
3126
              nic_idx = int(st_groups[2])
3127
              if nic_idx >= len(instance.nics):
3128
                val = None
3129
              else:
3130
                if st_groups[1] == "mac":
3131
                  val = instance.nics[nic_idx].mac
3132
                elif st_groups[1] == "ip":
3133
                  val = instance.nics[nic_idx].ip
3134
                elif st_groups[1] == "bridge":
3135
                  val = instance.nics[nic_idx].bridge
3136
                else:
3137
                  assert False, "Unhandled NIC parameter"
3138
          else:
3139
            assert False, "Unhandled variable parameter"
3140
        else:
3141
          raise errors.ParameterError(field)
3142
        iout.append(val)
3143
      output.append(iout)
3144

    
3145
    return output
3146

    
3147

    
3148
class LUFailoverInstance(LogicalUnit):
3149
  """Failover an instance.
3150

3151
  """
3152
  HPATH = "instance-failover"
3153
  HTYPE = constants.HTYPE_INSTANCE
3154
  _OP_REQP = ["instance_name", "ignore_consistency"]
3155
  REQ_BGL = False
3156

    
3157
  def ExpandNames(self):
3158
    self._ExpandAndLockInstance()
3159
    self.needed_locks[locking.LEVEL_NODE] = []
3160
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3161

    
3162
  def DeclareLocks(self, level):
3163
    if level == locking.LEVEL_NODE:
3164
      self._LockInstancesNodes()
3165

    
3166
  def BuildHooksEnv(self):
3167
    """Build hooks env.
3168

3169
    This runs on master, primary and secondary nodes of the instance.
3170

3171
    """
3172
    env = {
3173
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3174
      }
3175
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3176
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3177
    return env, nl, nl
3178

    
3179
  def CheckPrereq(self):
3180
    """Check prerequisites.
3181

3182
    This checks that the instance is in the cluster.
3183

3184
    """
3185
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3186
    assert self.instance is not None, \
3187
      "Cannot retrieve locked instance %s" % self.op.instance_name
3188

    
3189
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3190
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3191
      raise errors.OpPrereqError("Instance's disk layout is not"
3192
                                 " network mirrored, cannot failover.")
3193

    
3194
    secondary_nodes = instance.secondary_nodes
3195
    if not secondary_nodes:
3196
      raise errors.ProgrammerError("no secondary node but using "
3197
                                   "a mirrored disk template")
3198

    
3199
    target_node = secondary_nodes[0]
3200
    # check memory requirements on the secondary node
3201
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3202
                         instance.name, bep[constants.BE_MEMORY],
3203
                         instance.hypervisor)
3204

    
3205
    # check bridge existance
3206
    brlist = [nic.bridge for nic in instance.nics]
3207
    result = self.rpc.call_bridges_exist(target_node, brlist)
3208
    result.Raise()
3209
    if not result.data:
3210
      raise errors.OpPrereqError("One or more target bridges %s does not"
3211
                                 " exist on destination node '%s'" %
3212
                                 (brlist, target_node))
3213

    
3214
  def Exec(self, feedback_fn):
3215
    """Failover an instance.
3216

3217
    The failover is done by shutting it down on its present node and
3218
    starting it on the secondary.
3219

3220
    """
3221
    instance = self.instance
3222

    
3223
    source_node = instance.primary_node
3224
    target_node = instance.secondary_nodes[0]
3225

    
3226
    feedback_fn("* checking disk consistency between source and target")
3227
    for dev in instance.disks:
3228
      # for drbd, these are drbd over lvm
3229
      if not _CheckDiskConsistency(self, dev, target_node, False):
3230
        if instance.status == "up" and not self.op.ignore_consistency:
3231
          raise errors.OpExecError("Disk %s is degraded on target node,"
3232
                                   " aborting failover." % dev.iv_name)
3233

    
3234
    feedback_fn("* shutting down instance on source node")
3235
    logging.info("Shutting down instance %s on node %s",
3236
                 instance.name, source_node)
3237

    
3238
    result = self.rpc.call_instance_shutdown(source_node, instance)
3239
    if result.failed or not result.data:
3240
      if self.op.ignore_consistency:
3241
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3242
                             " Proceeding"
3243
                             " anyway. Please make sure node %s is down",
3244
                             instance.name, source_node, source_node)
3245
      else:
3246
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3247
                                 (instance.name, source_node))
3248

    
3249
    feedback_fn("* deactivating the instance's disks on source node")
3250
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3251
      raise errors.OpExecError("Can't shut down the instance's disks.")
3252

    
3253
    instance.primary_node = target_node
3254
    # distribute new instance config to the other nodes
3255
    self.cfg.Update(instance)
3256

    
3257
    # Only start the instance if it's marked as up
3258
    if instance.status == "up":
3259
      feedback_fn("* activating the instance's disks on target node")
3260
      logging.info("Starting instance %s on node %s",
3261
                   instance.name, target_node)
3262

    
3263
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3264
                                               ignore_secondaries=True)
3265
      if not disks_ok:
3266
        _ShutdownInstanceDisks(self, instance)
3267
        raise errors.OpExecError("Can't activate the instance's disks")
3268

    
3269
      feedback_fn("* starting the instance on the target node")
3270
      result = self.rpc.call_instance_start(target_node, instance, None)
3271
      if result.failed or not result.data:
3272
        _ShutdownInstanceDisks(self, instance)
3273
        raise errors.OpExecError("Could not start instance %s on node %s." %
3274
                                 (instance.name, target_node))
3275

    
3276

    
3277
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3278
  """Create a tree of block devices on the primary node.
3279

3280
  This always creates all devices.
3281

3282
  """
3283
  if device.children:
3284
    for child in device.children:
3285
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3286
        return False
3287

    
3288
  lu.cfg.SetDiskID(device, node)
3289
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3290
                                       instance.name, True, info)
3291
  if new_id.failed or not new_id.data:
3292
    return False
3293
  if device.physical_id is None:
3294
    device.physical_id = new_id
3295
  return True
3296

    
3297

    
3298
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3299
  """Create a tree of block devices on a secondary node.
3300

3301
  If this device type has to be created on secondaries, create it and
3302
  all its children.
3303

3304
  If not, just recurse to children keeping the same 'force' value.
3305

3306
  """
3307
  if device.CreateOnSecondary():
3308
    force = True
3309
  if device.children:
3310
    for child in device.children:
3311
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3312
                                        child, force, info):
3313
        return False
3314

    
3315
  if not force:
3316
    return True
3317
  lu.cfg.SetDiskID(device, node)
3318
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3319
                                       instance.name, False, info)
3320
  if new_id.failed or not new_id.data:
3321
    return False
3322
  if device.physical_id is None:
3323
    device.physical_id = new_id
3324
  return True
3325

    
3326

    
3327
def _GenerateUniqueNames(lu, exts):
3328
  """Generate a suitable LV name.
3329

3330
  This will generate a logical volume name for the given instance.
3331

3332
  """
3333
  results = []
3334
  for val in exts:
3335
    new_id = lu.cfg.GenerateUniqueID()
3336
    results.append("%s%s" % (new_id, val))
3337
  return results
3338

    
3339

    
3340
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3341
                         p_minor, s_minor):
3342
  """Generate a drbd8 device complete with its children.
3343

3344
  """
3345
  port = lu.cfg.AllocatePort()
3346
  vgname = lu.cfg.GetVGName()
3347
  shared_secret = lu.cfg.GenerateDRBDSecret()
3348
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3349
                          logical_id=(vgname, names[0]))
3350
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3351
                          logical_id=(vgname, names[1]))
3352
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3353
                          logical_id=(primary, secondary, port,
3354
                                      p_minor, s_minor,
3355
                                      shared_secret),
3356
                          children=[dev_data, dev_meta],
3357
                          iv_name=iv_name)
3358
  return drbd_dev
3359

    
3360

    
3361
def _GenerateDiskTemplate(lu, template_name,
3362
                          instance_name, primary_node,
3363
                          secondary_nodes, disk_info,
3364
                          file_storage_dir, file_driver,
3365
                          base_index):
3366
  """Generate the entire disk layout for a given template type.
3367

3368
  """
3369
  #TODO: compute space requirements
3370

    
3371
  vgname = lu.cfg.GetVGName()
3372
  disk_count = len(disk_info)
3373
  disks = []
3374
  if template_name == constants.DT_DISKLESS:
3375
    pass
3376
  elif template_name == constants.DT_PLAIN:
3377
    if len(secondary_nodes) != 0:
3378
      raise errors.ProgrammerError("Wrong template configuration")
3379

    
3380
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3381
                                      for i in range(disk_count)])
3382
    for idx, disk in enumerate(disk_info):
3383
      disk_index = idx + base_index
3384
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3385
                              logical_id=(vgname, names[idx]),
3386
                              iv_name="disk/%d" % disk_index)
3387
      disks.append(disk_dev)
3388
  elif template_name == constants.DT_DRBD8:
3389
    if len(secondary_nodes) != 1:
3390
      raise errors.ProgrammerError("Wrong template configuration")
3391
    remote_node = secondary_nodes[0]
3392
    minors = lu.cfg.AllocateDRBDMinor(
3393
      [primary_node, remote_node] * len(disk_info), instance_name)
3394

    
3395
    names = _GenerateUniqueNames(lu,
3396
                                 [".disk%d_%s" % (i, s)
3397
                                  for i in range(disk_count)
3398
                                  for s in ("data", "meta")
3399
                                  ])
3400
    for idx, disk in enumerate(disk_info):
3401
      disk_index = idx + base_index
3402
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3403
                                      disk["size"], names[idx*2:idx*2+2],
3404
                                      "disk/%d" % disk_index,
3405
                                      minors[idx*2], minors[idx*2+1])
3406
      disks.append(disk_dev)
3407
  elif template_name == constants.DT_FILE:
3408
    if len(secondary_nodes) != 0:
3409
      raise errors.ProgrammerError("Wrong template configuration")
3410

    
3411
    for idx, disk in enumerate(disk_info):
3412
      disk_index = idx + base_index
3413
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3414
                              iv_name="disk/%d" % disk_index,
3415
                              logical_id=(file_driver,
3416
                                          "%s/disk%d" % (file_storage_dir,
3417
                                                         idx)))
3418
      disks.append(disk_dev)
3419
  else:
3420
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3421
  return disks
3422

    
3423

    
3424
def _GetInstanceInfoText(instance):
3425
  """Compute that text that should be added to the disk's metadata.
3426

3427
  """
3428
  return "originstname+%s" % instance.name
3429

    
3430

    
3431
def _CreateDisks(lu, instance):
3432
  """Create all disks for an instance.
3433

3434
  This abstracts away some work from AddInstance.
3435

3436
  @type lu: L{LogicalUnit}
3437
  @param lu: the logical unit on whose behalf we execute
3438
  @type instance: L{objects.Instance}
3439
  @param instance: the instance whose disks we should create
3440
  @rtype: boolean
3441
  @return: the success of the creation
3442

3443
  """
3444
  info = _GetInstanceInfoText(instance)
3445

    
3446
  if instance.disk_template == constants.DT_FILE:
3447
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3448
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3449
                                                 file_storage_dir)
3450

    
3451
    if result.failed or not result.data:
3452
      logging.error("Could not connect to node '%s'", instance.primary_node)
3453
      return False
3454

    
3455
    if not result.data[0]:
3456
      logging.error("Failed to create directory '%s'", file_storage_dir)
3457
      return False
3458

    
3459
  # Note: this needs to be kept in sync with adding of disks in
3460
  # LUSetInstanceParams
3461
  for device in instance.disks:
3462
    logging.info("Creating volume %s for instance %s",
3463
                 device.iv_name, instance.name)
3464
    #HARDCODE
3465
    for secondary_node in instance.secondary_nodes:
3466
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3467
                                        device, False, info):
3468
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3469
                      device.iv_name, device, secondary_node)
3470
        return False
3471
    #HARDCODE
3472
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3473
                                    instance, device, info):
3474
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3475
      return False
3476

    
3477
  return True
3478

    
3479

    
3480
def _RemoveDisks(lu, instance):
3481
  """Remove all disks for an instance.
3482

3483
  This abstracts away some work from `AddInstance()` and
3484
  `RemoveInstance()`. Note that in case some of the devices couldn't
3485
  be removed, the removal will continue with the other ones (compare
3486
  with `_CreateDisks()`).
3487

3488
  @type lu: L{LogicalUnit}
3489
  @param lu: the logical unit on whose behalf we execute
3490
  @type instance: L{objects.Instance}
3491
  @param instance: the instance whose disks we should remove
3492
  @rtype: boolean
3493
  @return: the success of the removal
3494

3495
  """
3496
  logging.info("Removing block devices for instance %s", instance.name)
3497

    
3498
  result = True
3499
  for device in instance.disks:
3500
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3501
      lu.cfg.SetDiskID(disk, node)
3502
      result = lu.rpc.call_blockdev_remove(node, disk)
3503
      if result.failed or not result.data:
3504
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3505
                           " continuing anyway", device.iv_name, node)
3506
        result = False
3507

    
3508
  if instance.disk_template == constants.DT_FILE:
3509
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3510
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3511
                                                 file_storage_dir)
3512
    if result.failed or not result.data:
3513
      logging.error("Could not remove directory '%s'", file_storage_dir)
3514
      result = False
3515

    
3516
  return result
3517

    
3518

    
3519
def _ComputeDiskSize(disk_template, disks):
3520
  """Compute disk size requirements in the volume group
3521

3522
  """
3523
  # Required free disk space as a function of disk and swap space
3524
  req_size_dict = {
3525
    constants.DT_DISKLESS: None,
3526
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3527
    # 128 MB are added for drbd metadata for each disk
3528
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3529
    constants.DT_FILE: None,
3530
  }
3531

    
3532
  if disk_template not in req_size_dict:
3533
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3534
                                 " is unknown" %  disk_template)
3535

    
3536
  return req_size_dict[disk_template]
3537

    
3538

    
3539
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3540
  """Hypervisor parameter validation.
3541

3542
  This function abstract the hypervisor parameter validation to be
3543
  used in both instance create and instance modify.
3544

3545
  @type lu: L{LogicalUnit}
3546
  @param lu: the logical unit for which we check
3547
  @type nodenames: list
3548
  @param nodenames: the list of nodes on which we should check
3549
  @type hvname: string
3550
  @param hvname: the name of the hypervisor we should use
3551
  @type hvparams: dict
3552
  @param hvparams: the parameters which we need to check
3553
  @raise errors.OpPrereqError: if the parameters are not valid
3554

3555
  """
3556
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3557
                                                  hvname,
3558
                                                  hvparams)
3559
  for node in nodenames:
3560
    info = hvinfo[node]
3561
    info.Raise()
3562
    if not info.data or not isinstance(info.data, (tuple, list)):
3563
      raise errors.OpPrereqError("Cannot get current information"
3564
                                 " from node '%s' (%s)" % (node, info.data))
3565
    if not info.data[0]:
3566
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3567
                                 " %s" % info.data[1])
3568

    
3569

    
3570
class LUCreateInstance(LogicalUnit):
3571
  """Create an instance.
3572

3573
  """
3574
  HPATH = "instance-add"
3575
  HTYPE = constants.HTYPE_INSTANCE
3576
  _OP_REQP = ["instance_name", "disks", "disk_template",
3577
              "mode", "start",
3578
              "wait_for_sync", "ip_check", "nics",
3579
              "hvparams", "beparams"]
3580
  REQ_BGL = False
3581

    
3582
  def _ExpandNode(self, node):
3583
    """Expands and checks one node name.
3584

3585
    """
3586
    node_full = self.cfg.ExpandNodeName(node)
3587
    if node_full is None:
3588
      raise errors.OpPrereqError("Unknown node %s" % node)
3589
    return node_full
3590

    
3591
  def ExpandNames(self):
3592
    """ExpandNames for CreateInstance.
3593

3594
    Figure out the right locks for instance creation.
3595

3596
    """
3597
    self.needed_locks = {}
3598

    
3599
    # set optional parameters to none if they don't exist
3600
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3601
      if not hasattr(self.op, attr):
3602
        setattr(self.op, attr, None)
3603

    
3604
    # cheap checks, mostly valid constants given
3605

    
3606
    # verify creation mode
3607
    if self.op.mode not in (constants.INSTANCE_CREATE,
3608
                            constants.INSTANCE_IMPORT):
3609
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3610
                                 self.op.mode)
3611

    
3612
    # disk template and mirror node verification
3613
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3614
      raise errors.OpPrereqError("Invalid disk template name")
3615

    
3616
    if self.op.hypervisor is None:
3617
      self.op.hypervisor = self.cfg.GetHypervisorType()
3618

    
3619
    cluster = self.cfg.GetClusterInfo()
3620
    enabled_hvs = cluster.enabled_hypervisors
3621
    if self.op.hypervisor not in enabled_hvs:
3622
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3623
                                 " cluster (%s)" % (self.op.hypervisor,
3624
                                  ",".join(enabled_hvs)))
3625

    
3626
    # check hypervisor parameter syntax (locally)
3627

    
3628
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3629
                                  self.op.hvparams)
3630
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3631
    hv_type.CheckParameterSyntax(filled_hvp)
3632

    
3633
    # fill and remember the beparams dict
3634
    utils.CheckBEParams(self.op.beparams)
3635
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3636
                                    self.op.beparams)
3637

    
3638
    #### instance parameters check
3639

    
3640
    # instance name verification
3641
    hostname1 = utils.HostInfo(self.op.instance_name)
3642
    self.op.instance_name = instance_name = hostname1.name
3643

    
3644
    # this is just a preventive check, but someone might still add this
3645
    # instance in the meantime, and creation will fail at lock-add time
3646
    if instance_name in self.cfg.GetInstanceList():
3647
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3648
                                 instance_name)
3649

    
3650
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3651

    
3652
    # NIC buildup
3653
    self.nics = []
3654
    for nic in self.op.nics:
3655
      # ip validity checks
3656
      ip = nic.get("ip", None)
3657
      if ip is None or ip.lower() == "none":
3658
        nic_ip = None
3659
      elif ip.lower() == constants.VALUE_AUTO:
3660
        nic_ip = hostname1.ip
3661
      else:
3662
        if not utils.IsValidIP(ip):
3663
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3664
                                     " like a valid IP" % ip)
3665
        nic_ip = ip
3666

    
3667
      # MAC address verification
3668
      mac = nic.get("mac", constants.VALUE_AUTO)
3669
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3670
        if not utils.IsValidMac(mac.lower()):
3671
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3672
                                     mac)
3673
      # bridge verification
3674
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3675
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3676

    
3677
    # disk checks/pre-build
3678
    self.disks = []
3679
    for disk in self.op.disks:
3680
      mode = disk.get("mode", constants.DISK_RDWR)
3681
      if mode not in constants.DISK_ACCESS_SET:
3682
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3683
                                   mode)
3684
      size = disk.get("size", None)
3685
      if size is None:
3686
        raise errors.OpPrereqError("Missing disk size")
3687
      try:
3688
        size = int(size)
3689
      except ValueError:
3690
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3691
      self.disks.append({"size": size, "mode": mode})
3692

    
3693
    # used in CheckPrereq for ip ping check
3694
    self.check_ip = hostname1.ip
3695

    
3696
    # file storage checks
3697
    if (self.op.file_driver and
3698
        not self.op.file_driver in constants.FILE_DRIVER):
3699
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3700
                                 self.op.file_driver)
3701

    
3702
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3703
      raise errors.OpPrereqError("File storage directory path not absolute")
3704

    
3705
    ### Node/iallocator related checks
3706
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3707
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3708
                                 " node must be given")
3709

    
3710
    if self.op.iallocator:
3711
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3712
    else:
3713
      self.op.pnode = self._ExpandNode(self.op.pnode)
3714
      nodelist = [self.op.pnode]
3715
      if self.op.snode is not None:
3716
        self.op.snode = self._ExpandNode(self.op.snode)
3717
        nodelist.append(self.op.snode)
3718
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3719

    
3720
    # in case of import lock the source node too
3721
    if self.op.mode == constants.INSTANCE_IMPORT:
3722
      src_node = getattr(self.op, "src_node", None)
3723
      src_path = getattr(self.op, "src_path", None)
3724

    
3725
      if src_path is None:
3726
        self.op.src_path = src_path = self.op.instance_name
3727

    
3728
      if src_node is None:
3729
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3730
        self.op.src_node = None
3731
        if os.path.isabs(src_path):
3732
          raise errors.OpPrereqError("Importing an instance from an absolute"
3733
                                     " path requires a source node option.")
3734
      else:
3735
        self.op.src_node = src_node = self._ExpandNode(src_node)
3736
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3737
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
3738
        if not os.path.isabs(src_path):
3739
          self.op.src_path = src_path = \
3740
            os.path.join(constants.EXPORT_DIR, src_path)
3741

    
3742
    else: # INSTANCE_CREATE
3743
      if getattr(self.op, "os_type", None) is None:
3744
        raise errors.OpPrereqError("No guest OS specified")
3745

    
3746
  def _RunAllocator(self):
3747
    """Run the allocator based on input opcode.
3748

3749
    """
3750
    nics = [n.ToDict() for n in self.nics]
3751
    ial = IAllocator(self,
3752
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3753
                     name=self.op.instance_name,
3754
                     disk_template=self.op.disk_template,
3755
                     tags=[],
3756
                     os=self.op.os_type,
3757
                     vcpus=self.be_full[constants.BE_VCPUS],
3758
                     mem_size=self.be_full[constants.BE_MEMORY],
3759
                     disks=self.disks,
3760
                     nics=nics,
3761
                     hypervisor=self.op.hypervisor,
3762
                     )
3763

    
3764
    ial.Run(self.op.iallocator)
3765

    
3766
    if not ial.success:
3767
      raise errors.OpPrereqError("Can't compute nodes using"
3768
                                 " iallocator '%s': %s" % (self.op.iallocator,
3769
                                                           ial.info))
3770
    if len(ial.nodes) != ial.required_nodes:
3771
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3772
                                 " of nodes (%s), required %s" %
3773
                                 (self.op.iallocator, len(ial.nodes),
3774
                                  ial.required_nodes))
3775
    self.op.pnode = ial.nodes[0]
3776
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3777
                 self.op.instance_name, self.op.iallocator,
3778
                 ", ".join(ial.nodes))
3779
    if ial.required_nodes == 2:
3780
      self.op.snode = ial.nodes[1]
3781

    
3782
  def BuildHooksEnv(self):
3783
    """Build hooks env.
3784

3785
    This runs on master, primary and secondary nodes of the instance.
3786

3787
    """
3788
    env = {
3789
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3790
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3791
      "INSTANCE_ADD_MODE": self.op.mode,
3792
      }
3793
    if self.op.mode == constants.INSTANCE_IMPORT:
3794
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3795
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3796
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3797

    
3798
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3799
      primary_node=self.op.pnode,
3800
      secondary_nodes=self.secondaries,
3801
      status=self.instance_status,
3802
      os_type=self.op.os_type,
3803
      memory=self.be_full[constants.BE_MEMORY],
3804
      vcpus=self.be_full[constants.BE_VCPUS],
3805
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3806
    ))
3807

    
3808
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3809
          self.secondaries)
3810
    return env, nl, nl
3811

    
3812

    
3813
  def CheckPrereq(self):
3814
    """Check prerequisites.
3815

3816
    """
3817
    if (not self.cfg.GetVGName() and
3818
        self.op.disk_template not in constants.DTS_NOT_LVM):
3819
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3820
                                 " instances")
3821

    
3822

    
3823
    if self.op.mode == constants.INSTANCE_IMPORT:
3824
      src_node = self.op.src_node
3825
      src_path = self.op.src_path
3826

    
3827
      if src_node is None:
3828
        exp_list = self.rpc.call_export_list(
3829
          self.acquired_locks[locking.LEVEL_NODE])
3830
        found = False
3831
        for node in exp_list:
3832
          if not exp_list[node].failed and src_path in exp_list[node].data:
3833
            found = True
3834
            self.op.src_node = src_node = node
3835
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3836
                                                       src_path)
3837
            break
3838
        if not found:
3839
          raise errors.OpPrereqError("No export found for relative path %s" %
3840
                                      src_path)
3841

    
3842
      result = self.rpc.call_export_info(src_node, src_path)
3843
      result.Raise()
3844
      if not result.data:
3845
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3846

    
3847
      export_info = result.data
3848
      if not export_info.has_section(constants.INISECT_EXP):
3849
        raise errors.ProgrammerError("Corrupted export config")
3850

    
3851
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3852
      if (int(ei_version) != constants.EXPORT_VERSION):
3853
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3854
                                   (ei_version, constants.EXPORT_VERSION))
3855

    
3856
      # Check that the new instance doesn't have less disks than the export
3857
      instance_disks = len(self.disks)
3858
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3859
      if instance_disks < export_disks:
3860
        raise errors.OpPrereqError("Not enough disks to import."
3861
                                   " (instance: %d, export: %d)" %
3862
                                   (instance_disks, export_disks))
3863

    
3864
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3865
      disk_images = []
3866
      for idx in range(export_disks):
3867
        option = 'disk%d_dump' % idx
3868
        if export_info.has_option(constants.INISECT_INS, option):
3869
          # FIXME: are the old os-es, disk sizes, etc. useful?
3870
          export_name = export_info.get(constants.INISECT_INS, option)
3871
          image = os.path.join(src_path, export_name)
3872
          disk_images.append(image)
3873
        else:
3874
          disk_images.append(False)
3875

    
3876
      self.src_images = disk_images
3877

    
3878
      old_name = export_info.get(constants.INISECT_INS, 'name')
3879
      # FIXME: int() here could throw a ValueError on broken exports
3880
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3881
      if self.op.instance_name == old_name:
3882
        for idx, nic in enumerate(self.nics):
3883
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3884
            nic_mac_ini = 'nic%d_mac' % idx
3885
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3886

    
3887
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3888
    if self.op.start and not self.op.ip_check:
3889
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3890
                                 " adding an instance in start mode")
3891

    
3892
    if self.op.ip_check:
3893
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3894
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3895
                                   (self.check_ip, self.op.instance_name))
3896

    
3897
    #### allocator run
3898

    
3899
    if self.op.iallocator is not None:
3900
      self._RunAllocator()
3901

    
3902
    #### node related checks
3903

    
3904
    # check primary node
3905
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3906
    assert self.pnode is not None, \
3907
      "Cannot retrieve locked node %s" % self.op.pnode
3908
    self.secondaries = []
3909

    
3910
    # mirror node verification
3911
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3912
      if self.op.snode is None:
3913
        raise errors.OpPrereqError("The networked disk templates need"
3914
                                   " a mirror node")
3915
      if self.op.snode == pnode.name:
3916
        raise errors.OpPrereqError("The secondary node cannot be"
3917
                                   " the primary node.")
3918
      self.secondaries.append(self.op.snode)
3919

    
3920
    nodenames = [pnode.name] + self.secondaries
3921

    
3922
    req_size = _ComputeDiskSize(self.op.disk_template,
3923
                                self.disks)
3924

    
3925
    # Check lv size requirements
3926
    if req_size is not None:
3927
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3928
                                         self.op.hypervisor)
3929
      for node in nodenames:
3930
        info = nodeinfo[node]
3931
        info.Raise()
3932
        info = info.data
3933
        if not info:
3934
          raise errors.OpPrereqError("Cannot get current information"
3935
                                     " from node '%s'" % node)
3936
        vg_free = info.get('vg_free', None)
3937
        if not isinstance(vg_free, int):
3938
          raise errors.OpPrereqError("Can't compute free disk space on"
3939
                                     " node %s" % node)
3940
        if req_size > info['vg_free']:
3941
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3942
                                     " %d MB available, %d MB required" %
3943
                                     (node, info['vg_free'], req_size))
3944

    
3945
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3946

    
3947
    # os verification
3948
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3949
    result.Raise()
3950
    if not isinstance(result.data, objects.OS):
3951
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3952
                                 " primary node"  % self.op.os_type)
3953

    
3954
    # bridge check on primary node
3955
    bridges = [n.bridge for n in self.nics]
3956
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3957
    result.Raise()
3958
    if not result.data:
3959
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
3960
                                 " exist on destination node '%s'" %
3961
                                 (",".join(bridges), pnode.name))
3962

    
3963
    # memory check on primary node
3964
    if self.op.start:
3965
      _CheckNodeFreeMemory(self, self.pnode.name,
3966
                           "creating instance %s" % self.op.instance_name,
3967
                           self.be_full[constants.BE_MEMORY],
3968
                           self.op.hypervisor)
3969

    
3970
    if self.op.start:
3971
      self.instance_status = 'up'
3972
    else:
3973
      self.instance_status = 'down'
3974

    
3975
  def Exec(self, feedback_fn):
3976
    """Create and add the instance to the cluster.
3977

3978
    """
3979
    instance = self.op.instance_name
3980
    pnode_name = self.pnode.name
3981

    
3982
    for nic in self.nics:
3983
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3984
        nic.mac = self.cfg.GenerateMAC()
3985

    
3986
    ht_kind = self.op.hypervisor
3987
    if ht_kind in constants.HTS_REQ_PORT:
3988
      network_port = self.cfg.AllocatePort()
3989
    else:
3990
      network_port = None
3991

    
3992
    ##if self.op.vnc_bind_address is None:
3993
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3994

    
3995
    # this is needed because os.path.join does not accept None arguments
3996
    if self.op.file_storage_dir is None:
3997
      string_file_storage_dir = ""
3998
    else:
3999
      string_file_storage_dir = self.op.file_storage_dir
4000

    
4001
    # build the full file storage dir path
4002
    file_storage_dir = os.path.normpath(os.path.join(
4003
                                        self.cfg.GetFileStorageDir(),
4004
                                        string_file_storage_dir, instance))
4005

    
4006

    
4007
    disks = _GenerateDiskTemplate(self,
4008
                                  self.op.disk_template,
4009
                                  instance, pnode_name,
4010
                                  self.secondaries,
4011
                                  self.disks,
4012
                                  file_storage_dir,
4013
                                  self.op.file_driver,
4014
                                  0)
4015

    
4016
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4017
                            primary_node=pnode_name,
4018
                            nics=self.nics, disks=disks,
4019
                            disk_template=self.op.disk_template,
4020
                            status=self.instance_status,
4021
                            network_port=network_port,
4022
                            beparams=self.op.beparams,
4023
                            hvparams=self.op.hvparams,
4024
                            hypervisor=self.op.hypervisor,
4025
                            )
4026

    
4027
    feedback_fn("* creating instance disks...")
4028
    if not _CreateDisks(self, iobj):
4029
      _RemoveDisks(self, iobj)
4030
      self.cfg.ReleaseDRBDMinors(instance)
4031
      raise errors.OpExecError("Device creation failed, reverting...")
4032

    
4033
    feedback_fn("adding instance %s to cluster config" % instance)
4034

    
4035
    self.cfg.AddInstance(iobj)
4036
    # Declare that we don't want to remove the instance lock anymore, as we've
4037
    # added the instance to the config
4038
    del self.remove_locks[locking.LEVEL_INSTANCE]
4039
    # Remove the temp. assignements for the instance's drbds
4040
    self.cfg.ReleaseDRBDMinors(instance)
4041
    # Unlock all the nodes
4042
    if self.op.mode == constants.INSTANCE_IMPORT:
4043
      nodes_keep = [self.op.src_node]
4044
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4045
                       if node != self.op.src_node]
4046
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4047
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4048
    else:
4049
      self.context.glm.release(locking.LEVEL_NODE)
4050
      del self.acquired_locks[locking.LEVEL_NODE]
4051

    
4052
    if self.op.wait_for_sync:
4053
      disk_abort = not _WaitForSync(self, iobj)
4054
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4055
      # make sure the disks are not degraded (still sync-ing is ok)
4056
      time.sleep(15)
4057
      feedback_fn("* checking mirrors status")
4058
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4059
    else:
4060
      disk_abort = False
4061

    
4062
    if disk_abort:
4063
      _RemoveDisks(self, iobj)
4064
      self.cfg.RemoveInstance(iobj.name)
4065
      # Make sure the instance lock gets removed
4066
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4067
      raise errors.OpExecError("There are some degraded disks for"
4068
                               " this instance")
4069

    
4070
    feedback_fn("creating os for instance %s on node %s" %
4071
                (instance, pnode_name))
4072

    
4073
    if iobj.disk_template != constants.DT_DISKLESS:
4074
      if self.op.mode == constants.INSTANCE_CREATE:
4075
        feedback_fn("* running the instance OS create scripts...")
4076
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4077
        result.Raise()
4078
        if not result.data:
4079
          raise errors.OpExecError("Could not add os for instance %s"
4080
                                   " on node %s" %
4081
                                   (instance, pnode_name))
4082

    
4083
      elif self.op.mode == constants.INSTANCE_IMPORT:
4084
        feedback_fn("* running the instance OS import scripts...")
4085
        src_node = self.op.src_node
4086
        src_images = self.src_images
4087
        cluster_name = self.cfg.GetClusterName()
4088
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4089
                                                         src_node, src_images,
4090
                                                         cluster_name)
4091
        import_result.Raise()
4092
        for idx, result in enumerate(import_result.data):
4093
          if not result:
4094
            self.LogWarning("Could not import the image %s for instance"
4095
                            " %s, disk %d, on node %s" %
4096
                            (src_images[idx], instance, idx, pnode_name))
4097
      else:
4098
        # also checked in the prereq part
4099
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4100
                                     % self.op.mode)
4101

    
4102
    if self.op.start:
4103
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4104
      feedback_fn("* starting instance...")
4105
      result = self.rpc.call_instance_start(pnode_name, iobj, None)
4106
      result.Raise()
4107
      if not result.data:
4108
        raise errors.OpExecError("Could not start instance")
4109

    
4110

    
4111
class LUConnectConsole(NoHooksLU):
4112
  """Connect to an instance's console.
4113

4114
  This is somewhat special in that it returns the command line that
4115
  you need to run on the master node in order to connect to the
4116
  console.
4117

4118
  """
4119
  _OP_REQP = ["instance_name"]
4120
  REQ_BGL = False
4121

    
4122
  def ExpandNames(self):
4123
    self._ExpandAndLockInstance()
4124

    
4125
  def CheckPrereq(self):
4126
    """Check prerequisites.
4127

4128
    This checks that the instance is in the cluster.
4129

4130
    """
4131
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4132
    assert self.instance is not None, \
4133
      "Cannot retrieve locked instance %s" % self.op.instance_name
4134

    
4135
  def Exec(self, feedback_fn):
4136
    """Connect to the console of an instance
4137

4138
    """
4139
    instance = self.instance
4140
    node = instance.primary_node
4141

    
4142
    node_insts = self.rpc.call_instance_list([node],
4143
                                             [instance.hypervisor])[node]
4144
    node_insts.Raise()
4145

    
4146
    if instance.name not in node_insts.data:
4147
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4148

    
4149
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4150

    
4151
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4152
    console_cmd = hyper.GetShellCommandForConsole(instance)
4153

    
4154
    # build ssh cmdline
4155
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4156

    
4157

    
4158
class LUReplaceDisks(LogicalUnit):
4159
  """Replace the disks of an instance.
4160

4161
  """
4162
  HPATH = "mirrors-replace"
4163
  HTYPE = constants.HTYPE_INSTANCE
4164
  _OP_REQP = ["instance_name", "mode", "disks"]
4165
  REQ_BGL = False
4166

    
4167
  def ExpandNames(self):
4168
    self._ExpandAndLockInstance()
4169

    
4170
    if not hasattr(self.op, "remote_node"):
4171
      self.op.remote_node = None
4172

    
4173
    ia_name = getattr(self.op, "iallocator", None)
4174
    if ia_name is not None:
4175
      if self.op.remote_node is not None:
4176
        raise errors.OpPrereqError("Give either the iallocator or the new"
4177
                                   " secondary, not both")
4178
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4179
    elif self.op.remote_node is not None:
4180
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4181
      if remote_node is None:
4182
        raise errors.OpPrereqError("Node '%s' not known" %
4183
                                   self.op.remote_node)
4184
      self.op.remote_node = remote_node
4185
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4186
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4187
    else:
4188
      self.needed_locks[locking.LEVEL_NODE] = []
4189
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4190

    
4191
  def DeclareLocks(self, level):
4192
    # If we're not already locking all nodes in the set we have to declare the
4193
    # instance's primary/secondary nodes.
4194
    if (level == locking.LEVEL_NODE and
4195
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4196
      self._LockInstancesNodes()
4197

    
4198
  def _RunAllocator(self):
4199
    """Compute a new secondary node using an IAllocator.
4200

4201
    """
4202
    ial = IAllocator(self,
4203
                     mode=constants.IALLOCATOR_MODE_RELOC,
4204
                     name=self.op.instance_name,
4205
                     relocate_from=[self.sec_node])
4206

    
4207
    ial.Run(self.op.iallocator)
4208

    
4209
    if not ial.success:
4210
      raise errors.OpPrereqError("Can't compute nodes using"
4211
                                 " iallocator '%s': %s" % (self.op.iallocator,
4212
                                                           ial.info))
4213
    if len(ial.nodes) != ial.required_nodes:
4214
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4215
                                 " of nodes (%s), required %s" %
4216
                                 (len(ial.nodes), ial.required_nodes))
4217
    self.op.remote_node = ial.nodes[0]
4218
    self.LogInfo("Selected new secondary for the instance: %s",
4219
                 self.op.remote_node)
4220

    
4221
  def BuildHooksEnv(self):
4222
    """Build hooks env.
4223

4224
    This runs on the master, the primary and all the secondaries.
4225

4226
    """
4227
    env = {
4228
      "MODE": self.op.mode,
4229
      "NEW_SECONDARY": self.op.remote_node,
4230
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4231
      }
4232
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4233
    nl = [
4234
      self.cfg.GetMasterNode(),
4235
      self.instance.primary_node,
4236
      ]
4237
    if self.op.remote_node is not None:
4238
      nl.append(self.op.remote_node)
4239
    return env, nl, nl
4240

    
4241
  def CheckPrereq(self):
4242
    """Check prerequisites.
4243

4244
    This checks that the instance is in the cluster.
4245

4246
    """
4247
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4248
    assert instance is not None, \
4249
      "Cannot retrieve locked instance %s" % self.op.instance_name
4250
    self.instance = instance
4251

    
4252
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4253
      raise errors.OpPrereqError("Instance's disk layout is not"
4254
                                 " network mirrored.")
4255

    
4256
    if len(instance.secondary_nodes) != 1:
4257
      raise errors.OpPrereqError("The instance has a strange layout,"
4258
                                 " expected one secondary but found %d" %
4259
                                 len(instance.secondary_nodes))
4260

    
4261
    self.sec_node = instance.secondary_nodes[0]
4262

    
4263
    ia_name = getattr(self.op, "iallocator", None)
4264
    if ia_name is not None:
4265
      self._RunAllocator()
4266

    
4267
    remote_node = self.op.remote_node
4268
    if remote_node is not None:
4269
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4270
      assert self.remote_node_info is not None, \
4271
        "Cannot retrieve locked node %s" % remote_node
4272
    else:
4273
      self.remote_node_info = None
4274
    if remote_node == instance.primary_node:
4275
      raise errors.OpPrereqError("The specified node is the primary node of"
4276
                                 " the instance.")
4277
    elif remote_node == self.sec_node:
4278
      if self.op.mode == constants.REPLACE_DISK_SEC:
4279
        # this is for DRBD8, where we can't execute the same mode of
4280
        # replacement as for drbd7 (no different port allocated)
4281
        raise errors.OpPrereqError("Same secondary given, cannot execute"
4282
                                   " replacement")
4283
    if instance.disk_template == constants.DT_DRBD8:
4284
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4285
          remote_node is not None):
4286
        # switch to replace secondary mode
4287
        self.op.mode = constants.REPLACE_DISK_SEC
4288

    
4289
      if self.op.mode == constants.REPLACE_DISK_ALL:
4290
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4291
                                   " secondary disk replacement, not"
4292
                                   " both at once")
4293
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4294
        if remote_node is not None:
4295
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4296
                                     " the secondary while doing a primary"
4297
                                     " node disk replacement")
4298
        self.tgt_node = instance.primary_node
4299
        self.oth_node = instance.secondary_nodes[0]
4300
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4301
        self.new_node = remote_node # this can be None, in which case
4302
                                    # we don't change the secondary
4303
        self.tgt_node = instance.secondary_nodes[0]
4304
        self.oth_node = instance.primary_node
4305
      else:
4306
        raise errors.ProgrammerError("Unhandled disk replace mode")
4307

    
4308
    if not self.op.disks:
4309
      self.op.disks = range(len(instance.disks))
4310

    
4311
    for disk_idx in self.op.disks:
4312
      instance.FindDisk(disk_idx)
4313

    
4314
  def _ExecD8DiskOnly(self, feedback_fn):
4315
    """Replace a disk on the primary or secondary for dbrd8.
4316

4317
    The algorithm for replace is quite complicated:
4318

4319
      1. for each disk to be replaced:
4320

4321
        1. create new LVs on the target node with unique names
4322
        1. detach old LVs from the drbd device
4323
        1. rename old LVs to name_replaced.<time_t>
4324
        1. rename new LVs to old LVs
4325
        1. attach the new LVs (with the old names now) to the drbd device
4326

4327
      1. wait for sync across all devices
4328

4329
      1. for each modified disk:
4330

4331
        1. remove old LVs (which have the name name_replaces.<time_t>)
4332

4333
    Failures are not very well handled.
4334

4335
    """
4336
    steps_total = 6
4337
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4338
    instance = self.instance
4339
    iv_names = {}
4340
    vgname = self.cfg.GetVGName()
4341
    # start of work
4342
    cfg = self.cfg
4343
    tgt_node = self.tgt_node
4344
    oth_node = self.oth_node
4345

    
4346
    # Step: check device activation
4347
    self.proc.LogStep(1, steps_total, "check device existence")
4348
    info("checking volume groups")
4349
    my_vg = cfg.GetVGName()
4350
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4351
    if not results:
4352
      raise errors.OpExecError("Can't list volume groups on the nodes")
4353
    for node in oth_node, tgt_node:
4354
      res = results[node]
4355
      if res.failed or not res.data or my_vg not in res.data:
4356
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4357
                                 (my_vg, node))
4358
    for idx, dev in enumerate(instance.disks):
4359
      if idx not in self.op.disks:
4360
        continue
4361
      for node in tgt_node, oth_node:
4362
        info("checking disk/%d on %s" % (idx, node))
4363
        cfg.SetDiskID(dev, node)
4364
        if not self.rpc.call_blockdev_find(node, dev):
4365
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4366
                                   (idx, node))
4367

    
4368
    # Step: check other node consistency
4369
    self.proc.LogStep(2, steps_total, "check peer consistency")
4370
    for idx, dev in enumerate(instance.disks):
4371
      if idx not in self.op.disks:
4372
        continue
4373
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4374
      if not _CheckDiskConsistency(self, dev, oth_node,
4375
                                   oth_node==instance.primary_node):
4376
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4377
                                 " to replace disks on this node (%s)" %
4378
                                 (oth_node, tgt_node))
4379

    
4380
    # Step: create new storage
4381
    self.proc.LogStep(3, steps_total, "allocate new storage")
4382
    for idx, dev in enumerate(instance.disks):
4383
      if idx not in self.op.disks:
4384
        continue
4385
      size = dev.size
4386
      cfg.SetDiskID(dev, tgt_node)
4387
      lv_names = [".disk%d_%s" % (idx, suf)
4388
                  for suf in ["data", "meta"]]
4389
      names = _GenerateUniqueNames(self, lv_names)
4390
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4391
                             logical_id=(vgname, names[0]))
4392
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4393
                             logical_id=(vgname, names[1]))
4394
      new_lvs = [lv_data, lv_meta]
4395
      old_lvs = dev.children
4396
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4397
      info("creating new local storage on %s for %s" %
4398
           (tgt_node, dev.iv_name))
4399
      # since we *always* want to create this LV, we use the
4400
      # _Create...OnPrimary (which forces the creation), even if we
4401
      # are talking about the secondary node
4402
      for new_lv in new_lvs:
4403
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4404
                                        _GetInstanceInfoText(instance)):
4405
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4406
                                   " node '%s'" %
4407
                                   (new_lv.logical_id[1], tgt_node))
4408

    
4409
    # Step: for each lv, detach+rename*2+attach
4410
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4411
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4412
      info("detaching %s drbd from local storage" % dev.iv_name)
4413
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4414
      result.Raise()
4415
      if not result.data:
4416
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4417
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4418
      #dev.children = []
4419
      #cfg.Update(instance)
4420

    
4421
      # ok, we created the new LVs, so now we know we have the needed
4422
      # storage; as such, we proceed on the target node to rename
4423
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4424
      # using the assumption that logical_id == physical_id (which in
4425
      # turn is the unique_id on that node)
4426

    
4427
      # FIXME(iustin): use a better name for the replaced LVs
4428
      temp_suffix = int(time.time())
4429
      ren_fn = lambda d, suff: (d.physical_id[0],
4430
                                d.physical_id[1] + "_replaced-%s" % suff)
4431
      # build the rename list based on what LVs exist on the node
4432
      rlist = []
4433
      for to_ren in old_lvs:
4434
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4435
        if not find_res.failed and find_res.data is not None: # device exists
4436
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4437

    
4438
      info("renaming the old LVs on the target node")
4439
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4440
      result.Raise()
4441
      if not result.data:
4442
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4443
      # now we rename the new LVs to the old LVs
4444
      info("renaming the new LVs on the target node")
4445
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4446
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4447
      result.Raise()
4448
      if not result.data:
4449
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4450

    
4451
      for old, new in zip(old_lvs, new_lvs):
4452
        new.logical_id = old.logical_id
4453
        cfg.SetDiskID(new, tgt_node)
4454

    
4455
      for disk in old_lvs:
4456
        disk.logical_id = ren_fn(disk, temp_suffix)
4457
        cfg.SetDiskID(disk, tgt_node)
4458

    
4459
      # now that the new lvs have the old name, we can add them to the device
4460
      info("adding new mirror component on %s" % tgt_node)
4461
      result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4462
      if result.failed or not result.data:
4463
        for new_lv in new_lvs:
4464
          result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4465
          if result.failed or not result.data:
4466
            warning("Can't rollback device %s", hint="manually cleanup unused"
4467
                    " logical volumes")
4468
        raise errors.OpExecError("Can't add local storage to drbd")
4469

    
4470
      dev.children = new_lvs
4471
      cfg.Update(instance)
4472

    
4473
    # Step: wait for sync
4474

    
4475
    # this can fail as the old devices are degraded and _WaitForSync
4476
    # does a combined result over all disks, so we don't check its
4477
    # return value
4478
    self.proc.LogStep(5, steps_total, "sync devices")
4479
    _WaitForSync(self, instance, unlock=True)
4480

    
4481
    # so check manually all the devices
4482
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4483
      cfg.SetDiskID(dev, instance.primary_node)
4484
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4485
      if result.failed or result.data[5]:
4486
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4487

    
4488
    # Step: remove old storage
4489
    self.proc.LogStep(6, steps_total, "removing old storage")
4490
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4491
      info("remove logical volumes for %s" % name)
4492
      for lv in old_lvs:
4493
        cfg.SetDiskID(lv, tgt_node)
4494
        result = self.rpc.call_blockdev_remove(tgt_node, lv)
4495
        if result.failed or not result.data:
4496
          warning("Can't remove old LV", hint="manually remove unused LVs")
4497
          continue
4498

    
4499
  def _ExecD8Secondary(self, feedback_fn):
4500
    """Replace the secondary node for drbd8.
4501

4502
    The algorithm for replace is quite complicated:
4503
      - for all disks of the instance:
4504
        - create new LVs on the new node with same names
4505
        - shutdown the drbd device on the old secondary
4506
        - disconnect the drbd network on the primary
4507
        - create the drbd device on the new secondary
4508
        - network attach the drbd on the primary, using an artifice:
4509
          the drbd code for Attach() will connect to the network if it
4510
          finds a device which is connected to the good local disks but
4511
          not network enabled
4512
      - wait for sync across all devices
4513
      - remove all disks from the old secondary
4514

4515
    Failures are not very well handled.
4516

4517
    """
4518
    steps_total = 6
4519
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4520
    instance = self.instance
4521
    iv_names = {}
4522
    vgname = self.cfg.GetVGName()
4523
    # start of work
4524
    cfg = self.cfg
4525
    old_node = self.tgt_node
4526
    new_node = self.new_node
4527
    pri_node = instance.primary_node
4528

    
4529
    # Step: check device activation
4530
    self.proc.LogStep(1, steps_total, "check device existence")
4531
    info("checking volume groups")
4532
    my_vg = cfg.GetVGName()
4533
    results = self.rpc.call_vg_list([pri_node, new_node])
4534
    for node in pri_node, new_node:
4535
      res = results[node]
4536
      if res.failed or not res.data or my_vg not in res.data:
4537
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4538
                                 (my_vg, node))
4539
    for idx, dev in enumerate(instance.disks):
4540
      if idx not in self.op.disks:
4541
        continue
4542
      info("checking disk/%d on %s" % (idx, pri_node))
4543
      cfg.SetDiskID(dev, pri_node)
4544
      result = self.rpc.call_blockdev_find(pri_node, dev)
4545
      result.Raise()
4546
      if not result.data:
4547
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4548
                                 (idx, pri_node))
4549

    
4550
    # Step: check other node consistency
4551
    self.proc.LogStep(2, steps_total, "check peer consistency")
4552
    for idx, dev in enumerate(instance.disks):
4553
      if idx not in self.op.disks:
4554
        continue
4555
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4556
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4557
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4558
                                 " unsafe to replace the secondary" %
4559
                                 pri_node)
4560

    
4561
    # Step: create new storage
4562
    self.proc.LogStep(3, steps_total, "allocate new storage")
4563
    for idx, dev in enumerate(instance.disks):
4564
      size = dev.size
4565
      info("adding new local storage on %s for disk/%d" %
4566
           (new_node, idx))
4567
      # since we *always* want to create this LV, we use the
4568
      # _Create...OnPrimary (which forces the creation), even if we
4569
      # are talking about the secondary node
4570
      for new_lv in dev.children:
4571
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4572
                                        _GetInstanceInfoText(instance)):
4573
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4574
                                   " node '%s'" %
4575
                                   (new_lv.logical_id[1], new_node))
4576

    
4577
    # Step 4: dbrd minors and drbd setups changes
4578
    # after this, we must manually remove the drbd minors on both the
4579
    # error and the success paths
4580
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4581
                                   instance.name)
4582
    logging.debug("Allocated minors %s" % (minors,))
4583
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4584
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4585
      size = dev.size
4586
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4587
      # create new devices on new_node
4588
      if pri_node == dev.logical_id[0]:
4589
        new_logical_id = (pri_node, new_node,
4590
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4591
                          dev.logical_id[5])
4592
      else:
4593
        new_logical_id = (new_node, pri_node,
4594
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4595
                          dev.logical_id[5])
4596
      iv_names[idx] = (dev, dev.children, new_logical_id)
4597
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4598
                    new_logical_id)
4599
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4600
                              logical_id=new_logical_id,
4601
                              children=dev.children)
4602
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4603
                                        new_drbd, False,
4604
                                        _GetInstanceInfoText(instance)):
4605
        self.cfg.ReleaseDRBDMinors(instance.name)
4606
        raise errors.OpExecError("Failed to create new DRBD on"
4607
                                 " node '%s'" % new_node)
4608

    
4609
    for idx, dev in enumerate(instance.disks):
4610
      # we have new devices, shutdown the drbd on the old secondary
4611
      info("shutting down drbd for disk/%d on old node" % idx)
4612
      cfg.SetDiskID(dev, old_node)
4613
      result = self.rpc.call_blockdev_shutdown(old_node, dev)
4614
      if result.failed or not result.data:
4615
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4616
                hint="Please cleanup this device manually as soon as possible")
4617

    
4618
    info("detaching primary drbds from the network (=> standalone)")
4619
    done = 0
4620
    for idx, dev in enumerate(instance.disks):
4621
      cfg.SetDiskID(dev, pri_node)
4622
      # set the network part of the physical (unique in bdev terms) id
4623
      # to None, meaning detach from network
4624
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4625
      # and 'find' the device, which will 'fix' it to match the
4626
      # standalone state
4627
      result = self.rpc.call_blockdev_find(pri_node, dev)
4628
      if not result.failed and result.data:
4629
        done += 1
4630
      else:
4631
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4632
                idx)
4633

    
4634
    if not done:
4635
      # no detaches succeeded (very unlikely)
4636
      self.cfg.ReleaseDRBDMinors(instance.name)
4637
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4638

    
4639
    # if we managed to detach at least one, we update all the disks of
4640
    # the instance to point to the new secondary
4641
    info("updating instance configuration")
4642
    for dev, _, new_logical_id in iv_names.itervalues():
4643
      dev.logical_id = new_logical_id
4644
      cfg.SetDiskID(dev, pri_node)
4645
    cfg.Update(instance)
4646
    # we can remove now the temp minors as now the new values are
4647
    # written to the config file (and therefore stable)
4648
    self.cfg.ReleaseDRBDMinors(instance.name)
4649

    
4650
    # and now perform the drbd attach
4651
    info("attaching primary drbds to new secondary (standalone => connected)")
4652
    failures = []
4653
    for idx, dev in enumerate(instance.disks):
4654
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4655
      # since the attach is smart, it's enough to 'find' the device,
4656
      # it will automatically activate the network, if the physical_id
4657
      # is correct
4658
      cfg.SetDiskID(dev, pri_node)
4659
      logging.debug("Disk to attach: %s", dev)
4660
      result = self.rpc.call_blockdev_find(pri_node, dev)
4661
      if result.failed or not result.data:
4662
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4663
                "please do a gnt-instance info to see the status of disks")
4664

    
4665
    # this can fail as the old devices are degraded and _WaitForSync
4666
    # does a combined result over all disks, so we don't check its
4667
    # return value
4668
    self.proc.LogStep(5, steps_total, "sync devices")
4669
    _WaitForSync(self, instance, unlock=True)
4670

    
4671
    # so check manually all the devices
4672
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4673
      cfg.SetDiskID(dev, pri_node)
4674
      result = self.rpc.call_blockdev_find(pri_node, dev)
4675
      result.Raise()
4676
      if result.data[5]:
4677
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4678

    
4679
    self.proc.LogStep(6, steps_total, "removing old storage")
4680
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4681
      info("remove logical volumes for disk/%d" % idx)
4682
      for lv in old_lvs:
4683
        cfg.SetDiskID(lv, old_node)
4684
        result = self.rpc.call_blockdev_remove(old_node, lv)
4685
        if result.failed or not result.data:
4686
          warning("Can't remove LV on old secondary",
4687
                  hint="Cleanup stale volumes by hand")
4688

    
4689
  def Exec(self, feedback_fn):
4690
    """Execute disk replacement.
4691

4692
    This dispatches the disk replacement to the appropriate handler.
4693

4694
    """
4695
    instance = self.instance
4696

    
4697
    # Activate the instance disks if we're replacing them on a down instance
4698
    if instance.status == "down":
4699
      _StartInstanceDisks(self, instance, True)
4700

    
4701
    if instance.disk_template == constants.DT_DRBD8:
4702
      if self.op.remote_node is None:
4703
        fn = self._ExecD8DiskOnly
4704
      else:
4705
        fn = self._ExecD8Secondary
4706
    else:
4707
      raise errors.ProgrammerError("Unhandled disk replacement case")
4708

    
4709
    ret = fn(feedback_fn)
4710

    
4711
    # Deactivate the instance disks if we're replacing them on a down instance
4712
    if instance.status == "down":
4713
      _SafeShutdownInstanceDisks(self, instance)
4714

    
4715
    return ret
4716

    
4717

    
4718
class LUGrowDisk(LogicalUnit):
4719
  """Grow a disk of an instance.
4720

4721
  """
4722
  HPATH = "disk-grow"
4723
  HTYPE = constants.HTYPE_INSTANCE
4724
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4725
  REQ_BGL = False
4726

    
4727
  def ExpandNames(self):
4728
    self._ExpandAndLockInstance()
4729
    self.needed_locks[locking.LEVEL_NODE] = []
4730
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4731

    
4732
  def DeclareLocks(self, level):
4733
    if level == locking.LEVEL_NODE:
4734
      self._LockInstancesNodes()
4735

    
4736
  def BuildHooksEnv(self):
4737
    """Build hooks env.
4738

4739
    This runs on the master, the primary and all the secondaries.
4740

4741
    """
4742
    env = {
4743
      "DISK": self.op.disk,
4744
      "AMOUNT": self.op.amount,
4745
      }
4746
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4747
    nl = [
4748
      self.cfg.GetMasterNode(),
4749
      self.instance.primary_node,
4750
      ]
4751
    return env, nl, nl
4752

    
4753
  def CheckPrereq(self):
4754
    """Check prerequisites.
4755

4756
    This checks that the instance is in the cluster.
4757

4758
    """
4759
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4760
    assert instance is not None, \
4761
      "Cannot retrieve locked instance %s" % self.op.instance_name
4762

    
4763
    self.instance = instance
4764

    
4765
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4766
      raise errors.OpPrereqError("Instance's disk layout does not support"
4767
                                 " growing.")
4768

    
4769
    self.disk = instance.FindDisk(self.op.disk)
4770

    
4771
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4772
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4773
                                       instance.hypervisor)
4774
    for node in nodenames:
4775
      info = nodeinfo[node]
4776
      if info.failed or not info.data:
4777
        raise errors.OpPrereqError("Cannot get current information"
4778
                                   " from node '%s'" % node)
4779
      vg_free = info.data.get('vg_free', None)
4780
      if not isinstance(vg_free, int):
4781
        raise errors.OpPrereqError("Can't compute free disk space on"
4782
                                   " node %s" % node)
4783
      if self.op.amount > vg_free:
4784
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4785
                                   " %d MiB available, %d MiB required" %
4786
                                   (node, vg_free, self.op.amount))
4787

    
4788
  def Exec(self, feedback_fn):
4789
    """Execute disk grow.
4790

4791
    """
4792
    instance = self.instance
4793
    disk = self.disk
4794
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4795
      self.cfg.SetDiskID(disk, node)
4796
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4797
      result.Raise()
4798
      if (not result.data or not isinstance(result.data, (list, tuple)) or
4799
          len(result.data) != 2):
4800
        raise errors.OpExecError("Grow request failed to node %s" % node)
4801
      elif not result.data[0]:
4802
        raise errors.OpExecError("Grow request failed to node %s: %s" %
4803
                                 (node, result.data[1]))
4804
    disk.RecordGrow(self.op.amount)
4805
    self.cfg.Update(instance)
4806
    if self.op.wait_for_sync:
4807
      disk_abort = not _WaitForSync(self, instance)
4808
      if disk_abort:
4809
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4810
                             " status.\nPlease check the instance.")
4811

    
4812

    
4813
class LUQueryInstanceData(NoHooksLU):
4814
  """Query runtime instance data.
4815

4816
  """
4817
  _OP_REQP = ["instances", "static"]
4818
  REQ_BGL = False
4819

    
4820
  def ExpandNames(self):
4821
    self.needed_locks = {}
4822
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4823

    
4824
    if not isinstance(self.op.instances, list):
4825
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4826

    
4827
    if self.op.instances:
4828
      self.wanted_names = []
4829
      for name in self.op.instances:
4830
        full_name = self.cfg.ExpandInstanceName(name)
4831
        if full_name is None:
4832
          raise errors.OpPrereqError("Instance '%s' not known" %
4833
                                     self.op.instance_name)
4834
        self.wanted_names.append(full_name)
4835
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4836
    else:
4837
      self.wanted_names = None
4838
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4839

    
4840
    self.needed_locks[locking.LEVEL_NODE] = []
4841
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4842

    
4843
  def DeclareLocks(self, level):
4844
    if level == locking.LEVEL_NODE:
4845
      self._LockInstancesNodes()
4846

    
4847
  def CheckPrereq(self):
4848
    """Check prerequisites.
4849

4850
    This only checks the optional instance list against the existing names.
4851

4852
    """
4853
    if self.wanted_names is None:
4854
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4855

    
4856
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4857
                             in self.wanted_names]
4858
    return
4859

    
4860
  def _ComputeDiskStatus(self, instance, snode, dev):
4861
    """Compute block device status.
4862

4863
    """
4864
    static = self.op.static
4865
    if not static:
4866
      self.cfg.SetDiskID(dev, instance.primary_node)
4867
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4868
      dev_pstatus.Raise()
4869
      dev_pstatus = dev_pstatus.data
4870
    else:
4871
      dev_pstatus = None
4872

    
4873
    if dev.dev_type in constants.LDS_DRBD:
4874
      # we change the snode then (otherwise we use the one passed in)
4875
      if dev.logical_id[0] == instance.primary_node:
4876
        snode = dev.logical_id[1]
4877
      else:
4878
        snode = dev.logical_id[0]
4879

    
4880
    if snode and not static:
4881
      self.cfg.SetDiskID(dev, snode)
4882
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4883
      dev_sstatus.Raise()
4884
      dev_sstatus = dev_sstatus.data
4885
    else:
4886
      dev_sstatus = None
4887

    
4888
    if dev.children:
4889
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4890
                      for child in dev.children]
4891
    else:
4892
      dev_children = []
4893

    
4894
    data = {
4895
      "iv_name": dev.iv_name,
4896
      "dev_type": dev.dev_type,
4897
      "logical_id": dev.logical_id,
4898
      "physical_id": dev.physical_id,
4899
      "pstatus": dev_pstatus,
4900
      "sstatus": dev_sstatus,
4901
      "children": dev_children,
4902
      "mode": dev.mode,
4903
      }
4904

    
4905
    return data
4906

    
4907
  def Exec(self, feedback_fn):
4908
    """Gather and return data"""
4909
    result = {}
4910

    
4911
    cluster = self.cfg.GetClusterInfo()
4912

    
4913
    for instance in self.wanted_instances:
4914
      if not self.op.static:
4915
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4916
                                                  instance.name,
4917
                                                  instance.hypervisor)
4918
        remote_info.Raise()
4919
        remote_info = remote_info.data
4920
        if remote_info and "state" in remote_info:
4921
          remote_state = "up"
4922
        else:
4923
          remote_state = "down"
4924
      else:
4925
        remote_state = None
4926
      if instance.status == "down":
4927
        config_state = "down"
4928
      else:
4929
        config_state = "up"
4930

    
4931
      disks = [self._ComputeDiskStatus(instance, None, device)
4932
               for device in instance.disks]
4933

    
4934
      idict = {
4935
        "name": instance.name,
4936
        "config_state": config_state,
4937
        "run_state": remote_state,
4938
        "pnode": instance.primary_node,
4939
        "snodes": instance.secondary_nodes,
4940
        "os": instance.os,
4941
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4942
        "disks": disks,
4943
        "hypervisor": instance.hypervisor,
4944
        "network_port": instance.network_port,
4945
        "hv_instance": instance.hvparams,
4946
        "hv_actual": cluster.FillHV(instance),
4947
        "be_instance": instance.beparams,
4948
        "be_actual": cluster.FillBE(instance),
4949
        }
4950

    
4951
      result[instance.name] = idict
4952

    
4953
    return result
4954

    
4955

    
4956
class LUSetInstanceParams(LogicalUnit):
4957
  """Modifies an instances's parameters.
4958

4959
  """
4960
  HPATH = "instance-modify"
4961
  HTYPE = constants.HTYPE_INSTANCE
4962
  _OP_REQP = ["instance_name"]
4963
  REQ_BGL = False
4964

    
4965
  def CheckArguments(self):
4966
    if not hasattr(self.op, 'nics'):
4967
      self.op.nics = []
4968
    if not hasattr(self.op, 'disks'):
4969
      self.op.disks = []
4970
    if not hasattr(self.op, 'beparams'):
4971
      self.op.beparams = {}
4972
    if not hasattr(self.op, 'hvparams'):
4973
      self.op.hvparams = {}
4974
    self.op.force = getattr(self.op, "force", False)
4975
    if not (self.op.nics or self.op.disks or
4976
            self.op.hvparams or self.op.beparams):
4977
      raise errors.OpPrereqError("No changes submitted")
4978

    
4979
    utils.CheckBEParams(self.op.beparams)
4980

    
4981
    # Disk validation
4982
    disk_addremove = 0
4983
    for disk_op, disk_dict in self.op.disks:
4984
      if disk_op == constants.DDM_REMOVE:
4985
        disk_addremove += 1
4986
        continue
4987
      elif disk_op == constants.DDM_ADD:
4988
        disk_addremove += 1
4989
      else:
4990
        if not isinstance(disk_op, int):
4991
          raise errors.OpPrereqError("Invalid disk index")
4992
      if disk_op == constants.DDM_ADD:
4993
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4994
        if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4995
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4996
        size = disk_dict.get('size', None)
4997
        if size is None:
4998
          raise errors.OpPrereqError("Required disk parameter size missing")
4999
        try:
5000
          size = int(size)
5001
        except ValueError, err:
5002
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5003
                                     str(err))
5004
        disk_dict['size'] = size
5005
      else:
5006
        # modification of disk
5007
        if 'size' in disk_dict:
5008
          raise errors.OpPrereqError("Disk size change not possible, use"
5009
                                     " grow-disk")
5010

    
5011
    if disk_addremove > 1:
5012
      raise errors.OpPrereqError("Only one disk add or remove operation"
5013
                                 " supported at a time")
5014

    
5015
    # NIC validation
5016
    nic_addremove = 0
5017
    for nic_op, nic_dict in self.op.nics:
5018
      if nic_op == constants.DDM_REMOVE:
5019
        nic_addremove += 1
5020
        continue
5021
      elif nic_op == constants.DDM_ADD:
5022
        nic_addremove += 1
5023
      else:
5024
        if not isinstance(nic_op, int):
5025
          raise errors.OpPrereqError("Invalid nic index")
5026

    
5027
      # nic_dict should be a dict
5028
      nic_ip = nic_dict.get('ip', None)
5029
      if nic_ip is not None:
5030
        if nic_ip.lower() == "none":
5031
          nic_dict['ip'] = None
5032
        else:
5033
          if not utils.IsValidIP(nic_ip):
5034
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5035
      # we can only check None bridges and assign the default one
5036
      nic_bridge = nic_dict.get('bridge', None)
5037
      if nic_bridge is None:
5038
        nic_dict['bridge'] = self.cfg.GetDefBridge()
5039
      # but we can validate MACs
5040
      nic_mac = nic_dict.get('mac', None)
5041
      if nic_mac is not None:
5042
        if self.cfg.IsMacInUse(nic_mac):
5043
          raise errors.OpPrereqError("MAC address %s already in use"
5044
                                     " in cluster" % nic_mac)
5045
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5046
          if not utils.IsValidMac(nic_mac):
5047
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5048
    if nic_addremove > 1:
5049
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5050
                                 " supported at a time")
5051

    
5052
  def ExpandNames(self):
5053
    self._ExpandAndLockInstance()
5054
    self.needed_locks[locking.LEVEL_NODE] = []
5055
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5056

    
5057
  def DeclareLocks(self, level):
5058
    if level == locking.LEVEL_NODE:
5059
      self._LockInstancesNodes()
5060

    
5061
  def BuildHooksEnv(self):
5062
    """Build hooks env.
5063

5064
    This runs on the master, primary and secondaries.
5065

5066
    """
5067
    args = dict()
5068
    if constants.BE_MEMORY in self.be_new:
5069
      args['memory'] = self.be_new[constants.BE_MEMORY]
5070
    if constants.BE_VCPUS in self.be_new:
5071
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5072
    # FIXME: readd disk/nic changes
5073
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5074
    nl = [self.cfg.GetMasterNode(),
5075
          self.instance.primary_node] + list(self.instance.secondary_nodes)
5076
    return env, nl, nl
5077

    
5078
  def CheckPrereq(self):
5079
    """Check prerequisites.
5080

5081
    This only checks the instance list against the existing names.
5082

5083
    """
5084
    force = self.force = self.op.force
5085

    
5086
    # checking the new params on the primary/secondary nodes
5087

    
5088
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5089
    assert self.instance is not None, \
5090
      "Cannot retrieve locked instance %s" % self.op.instance_name
5091
    pnode = self.instance.primary_node
5092
    nodelist = [pnode]
5093
    nodelist.extend(instance.secondary_nodes)
5094

    
5095
    # hvparams processing
5096
    if self.op.hvparams:
5097
      i_hvdict = copy.deepcopy(instance.hvparams)
5098
      for key, val in self.op.hvparams.iteritems():
5099
        if val == constants.VALUE_DEFAULT:
5100
          try:
5101
            del i_hvdict[key]
5102
          except KeyError:
5103
            pass
5104
        elif val == constants.VALUE_NONE:
5105
          i_hvdict[key] = None
5106
        else:
5107
          i_hvdict[key] = val
5108
      cluster = self.cfg.GetClusterInfo()
5109
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5110
                                i_hvdict)
5111
      # local check
5112
      hypervisor.GetHypervisor(
5113
        instance.hypervisor).CheckParameterSyntax(hv_new)
5114
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5115
      self.hv_new = hv_new # the new actual values
5116
      self.hv_inst = i_hvdict # the new dict (without defaults)
5117
    else:
5118
      self.hv_new = self.hv_inst = {}
5119

    
5120
    # beparams processing
5121
    if self.op.beparams:
5122
      i_bedict = copy.deepcopy(instance.beparams)
5123
      for key, val in self.op.beparams.iteritems():
5124
        if val == constants.VALUE_DEFAULT:
5125
          try:
5126
            del i_bedict[key]
5127
          except KeyError:
5128
            pass
5129
        else:
5130
          i_bedict[key] = val
5131
      cluster = self.cfg.GetClusterInfo()
5132
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5133
                                i_bedict)
5134
      self.be_new = be_new # the new actual values
5135
      self.be_inst = i_bedict # the new dict (without defaults)
5136
    else:
5137
      self.be_new = self.be_inst = {}
5138

    
5139
    self.warn = []
5140

    
5141
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5142
      mem_check_list = [pnode]
5143
      if be_new[constants.BE_AUTO_BALANCE]:
5144
        # either we changed auto_balance to yes or it was from before
5145
        mem_check_list.extend(instance.secondary_nodes)
5146
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5147
                                                  instance.hypervisor)
5148
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5149
                                         instance.hypervisor)
5150
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5151
        # Assume the primary node is unreachable and go ahead
5152
        self.warn.append("Can't get info from primary node %s" % pnode)
5153
      else:
5154
        if not instance_info.failed and instance_info.data:
5155
          current_mem = instance_info.data['memory']
5156
        else:
5157
          # Assume instance not running
5158
          # (there is a slight race condition here, but it's not very probable,
5159
          # and we have no other way to check)
5160
          current_mem = 0
5161
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5162
                    nodeinfo[pnode].data['memory_free'])
5163
        if miss_mem > 0:
5164
          raise errors.OpPrereqError("This change will prevent the instance"
5165
                                     " from starting, due to %d MB of memory"
5166
                                     " missing on its primary node" % miss_mem)
5167

    
5168
      if be_new[constants.BE_AUTO_BALANCE]:
5169
        for node, nres in instance.secondary_nodes.iteritems():
5170
          if nres.failed or not isinstance(nres.data, dict):
5171
            self.warn.append("Can't get info from secondary node %s" % node)
5172
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5173
            self.warn.append("Not enough memory to failover instance to"
5174
                             " secondary node %s" % node)
5175

    
5176
    # NIC processing
5177
    for nic_op, nic_dict in self.op.nics:
5178
      if nic_op == constants.DDM_REMOVE:
5179
        if not instance.nics:
5180
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5181
        continue
5182
      if nic_op != constants.DDM_ADD:
5183
        # an existing nic
5184
        if nic_op < 0 or nic_op >= len(instance.nics):
5185
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5186
                                     " are 0 to %d" %
5187
                                     (nic_op, len(instance.nics)))
5188
      nic_bridge = nic_dict.get('bridge', None)
5189
      if nic_bridge is not None:
5190
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5191
          msg = ("Bridge '%s' doesn't exist on one of"
5192
                 " the instance nodes" % nic_bridge)
5193
          if self.force:
5194
            self.warn.append(msg)
5195
          else:
5196
            raise errors.OpPrereqError(msg)
5197

    
5198
    # DISK processing
5199
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5200
      raise errors.OpPrereqError("Disk operations not supported for"
5201
                                 " diskless instances")
5202
    for disk_op, disk_dict in self.op.disks:
5203
      if disk_op == constants.DDM_REMOVE:
5204
        if len(instance.disks) == 1:
5205
          raise errors.OpPrereqError("Cannot remove the last disk of"
5206
                                     " an instance")
5207
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5208
        ins_l = ins_l[pnode]
5209
        if not type(ins_l) is list:
5210
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5211
        if instance.name in ins_l:
5212
          raise errors.OpPrereqError("Instance is running, can't remove"
5213
                                     " disks.")
5214

    
5215
      if (disk_op == constants.DDM_ADD and
5216
          len(instance.nics) >= constants.MAX_DISKS):
5217
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5218
                                   " add more" % constants.MAX_DISKS)
5219
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5220
        # an existing disk
5221
        if disk_op < 0 or disk_op >= len(instance.disks):
5222
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5223
                                     " are 0 to %d" %
5224
                                     (disk_op, len(instance.disks)))
5225

    
5226
    return
5227

    
5228
  def Exec(self, feedback_fn):
5229
    """Modifies an instance.
5230

5231
    All parameters take effect only at the next restart of the instance.
5232

5233
    """
5234
    # Process here the warnings from CheckPrereq, as we don't have a
5235
    # feedback_fn there.
5236
    for warn in self.warn:
5237
      feedback_fn("WARNING: %s" % warn)
5238

    
5239
    result = []
5240
    instance = self.instance
5241
    # disk changes
5242
    for disk_op, disk_dict in self.op.disks:
5243
      if disk_op == constants.DDM_REMOVE:
5244
        # remove the last disk
5245
        device = instance.disks.pop()
5246
        device_idx = len(instance.disks)
5247
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5248
          self.cfg.SetDiskID(disk, node)
5249
          result = self.rpc.call_blockdev_remove(node, disk)
5250
          if result.failed or not result.data:
5251
            self.proc.LogWarning("Could not remove disk/%d on node %s,"
5252
                                 " continuing anyway", device_idx, node)
5253
        result.append(("disk/%d" % device_idx, "remove"))
5254
      elif disk_op == constants.DDM_ADD:
5255
        # add a new disk
5256
        if instance.disk_template == constants.DT_FILE:
5257
          file_driver, file_path = instance.disks[0].logical_id
5258
          file_path = os.path.dirname(file_path)
5259
        else:
5260
          file_driver = file_path = None
5261
        disk_idx_base = len(instance.disks)
5262
        new_disk = _GenerateDiskTemplate(self,
5263
                                         instance.disk_template,
5264
                                         instance, instance.primary_node,
5265
                                         instance.secondary_nodes,
5266
                                         [disk_dict],
5267
                                         file_path,
5268
                                         file_driver,
5269
                                         disk_idx_base)[0]
5270
        new_disk.mode = disk_dict['mode']
5271
        instance.disks.append(new_disk)
5272
        info = _GetInstanceInfoText(instance)
5273

    
5274
        logging.info("Creating volume %s for instance %s",
5275
                     new_disk.iv_name, instance.name)
5276
        # Note: this needs to be kept in sync with _CreateDisks
5277
        #HARDCODE
5278
        for secondary_node in instance.secondary_nodes:
5279
          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5280
                                            new_disk, False, info):
5281
            self.LogWarning("Failed to create volume %s (%s) on"
5282
                            " secondary node %s!",
5283
                            new_disk.iv_name, new_disk, secondary_node)
5284
        #HARDCODE
5285
        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5286
                                        instance, new_disk, info):
5287
          self.LogWarning("Failed to create volume %s on primary!",
5288
                          new_disk.iv_name)
5289
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5290
                       (new_disk.size, new_disk.mode)))
5291
      else:
5292
        # change a given disk
5293
        instance.disks[disk_op].mode = disk_dict['mode']
5294
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5295
    # NIC changes
5296
    for nic_op, nic_dict in self.op.nics:
5297
      if nic_op == constants.DDM_REMOVE:
5298
        # remove the last nic
5299
        del instance.nics[-1]
5300
        result.append(("nic.%d" % len(instance.nics), "remove"))
5301
      elif nic_op == constants.DDM_ADD:
5302
        # add a new nic
5303
        if 'mac' not in nic_dict:
5304
          mac = constants.VALUE_GENERATE
5305
        else:
5306
          mac = nic_dict['mac']
5307
        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5308
          mac = self.cfg.GenerateMAC()
5309
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5310
                              bridge=nic_dict.get('bridge', None))
5311
        instance.nics.append(new_nic)
5312
        result.append(("nic.%d" % (len(instance.nics) - 1),
5313
                       "add:mac=%s,ip=%s,bridge=%s" %
5314
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
5315
      else:
5316
        # change a given nic
5317
        for key in 'mac', 'ip', 'bridge':
5318
          if key in nic_dict:
5319
            setattr(instance.nics[nic_op], key, nic_dict[key])
5320
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5321

    
5322
    # hvparams changes
5323
    if self.op.hvparams:
5324
      instance.hvparams = self.hv_new
5325
      for key, val in self.op.hvparams.iteritems():
5326
        result.append(("hv/%s" % key, val))
5327

    
5328
    # beparams changes
5329
    if self.op.beparams:
5330
      instance.beparams = self.be_inst
5331
      for key, val in self.op.beparams.iteritems():
5332
        result.append(("be/%s" % key, val))
5333

    
5334
    self.cfg.Update(instance)
5335

    
5336
    return result
5337

    
5338

    
5339
class LUQueryExports(NoHooksLU):
5340
  """Query the exports list
5341

5342
  """
5343
  _OP_REQP = ['nodes']
5344
  REQ_BGL = False
5345

    
5346
  def ExpandNames(self):
5347
    self.needed_locks = {}
5348
    self.share_locks[locking.LEVEL_NODE] = 1
5349
    if not self.op.nodes:
5350
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5351
    else:
5352
      self.needed_locks[locking.LEVEL_NODE] = \
5353
        _GetWantedNodes(self, self.op.nodes)
5354

    
5355
  def CheckPrereq(self):
5356
    """Check prerequisites.
5357

5358
    """
5359
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5360

    
5361
  def Exec(self, feedback_fn):
5362
    """Compute the list of all the exported system images.
5363

5364
    @rtype: dict
5365
    @return: a dictionary with the structure node->(export-list)
5366
        where export-list is a list of the instances exported on
5367
        that node.
5368

5369
    """
5370
    rpcresult = self.rpc.call_export_list(self.nodes)
5371
    result = {}
5372
    for node in rpcresult:
5373
      if rpcresult[node].failed:
5374
        result[node] = False
5375
      else:
5376
        result[node] = rpcresult[node].data
5377

    
5378
    return result
5379

    
5380

    
5381
class LUExportInstance(LogicalUnit):
5382
  """Export an instance to an image in the cluster.
5383

5384
  """
5385
  HPATH = "instance-export"
5386
  HTYPE = constants.HTYPE_INSTANCE
5387
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
5388
  REQ_BGL = False
5389

    
5390
  def ExpandNames(self):
5391
    self._ExpandAndLockInstance()
5392
    # FIXME: lock only instance primary and destination node
5393
    #
5394
    # Sad but true, for now we have do lock all nodes, as we don't know where
5395
    # the previous export might be, and and in this LU we search for it and
5396
    # remove it from its current node. In the future we could fix this by:
5397
    #  - making a tasklet to search (share-lock all), then create the new one,
5398
    #    then one to remove, after
5399
    #  - removing the removal operation altoghether
5400
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5401

    
5402
  def DeclareLocks(self, level):
5403
    """Last minute lock declaration."""
5404
    # All nodes are locked anyway, so nothing to do here.
5405

    
5406
  def BuildHooksEnv(self):
5407
    """Build hooks env.
5408

5409
    This will run on the master, primary node and target node.
5410

5411
    """
5412
    env = {
5413
      "EXPORT_NODE": self.op.target_node,
5414
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5415
      }
5416
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5417
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5418
          self.op.target_node]
5419
    return env, nl, nl
5420

    
5421
  def CheckPrereq(self):
5422
    """Check prerequisites.
5423

5424
    This checks that the instance and node names are valid.
5425

5426
    """
5427
    instance_name = self.op.instance_name
5428
    self.instance = self.cfg.GetInstanceInfo(instance_name)
5429
    assert self.instance is not None, \
5430
          "Cannot retrieve locked instance %s" % self.op.instance_name
5431

    
5432
    self.dst_node = self.cfg.GetNodeInfo(
5433
      self.cfg.ExpandNodeName(self.op.target_node))
5434

    
5435
    if self.dst_node is None:
5436
      # This is wrong node name, not a non-locked node
5437
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5438

    
5439
    # instance disk type verification
5440
    for disk in self.instance.disks:
5441
      if disk.dev_type == constants.LD_FILE:
5442
        raise errors.OpPrereqError("Export not supported for instances with"
5443
                                   " file-based disks")
5444

    
5445
  def Exec(self, feedback_fn):
5446
    """Export an instance to an image in the cluster.
5447

5448
    """
5449
    instance = self.instance
5450
    dst_node = self.dst_node
5451
    src_node = instance.primary_node
5452
    if self.op.shutdown:
5453
      # shutdown the instance, but not the disks
5454
      result = self.rpc.call_instance_shutdown(src_node, instance)
5455
      result.Raise()
5456
      if not result.data:
5457
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5458
                                 (instance.name, src_node))
5459

    
5460
    vgname = self.cfg.GetVGName()
5461

    
5462
    snap_disks = []
5463

    
5464
    try:
5465
      for disk in instance.disks:
5466
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5467
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5468
        if new_dev_name.failed or not new_dev_name.data:
5469
          self.LogWarning("Could not snapshot block device %s on node %s",
5470
                          disk.logical_id[1], src_node)
5471
          snap_disks.append(False)
5472
        else:
5473
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5474
                                 logical_id=(vgname, new_dev_name.data),
5475
                                 physical_id=(vgname, new_dev_name.data),
5476
                                 iv_name=disk.iv_name)
5477
          snap_disks.append(new_dev)
5478

    
5479
    finally:
5480
      if self.op.shutdown and instance.status == "up":
5481
        result = self.rpc.call_instance_start(src_node, instance, None)
5482
        if result.failed or not result.data:
5483
          _ShutdownInstanceDisks(self, instance)
5484
          raise errors.OpExecError("Could not start instance")
5485

    
5486
    # TODO: check for size
5487

    
5488
    cluster_name = self.cfg.GetClusterName()
5489
    for idx, dev in enumerate(snap_disks):
5490
      if dev:
5491
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5492
                                               instance, cluster_name, idx)
5493
        if result.failed or not result.data:
5494
          self.LogWarning("Could not export block device %s from node %s to"
5495
                          " node %s", dev.logical_id[1], src_node,
5496
                          dst_node.name)
5497
        result = self.rpc.call_blockdev_remove(src_node, dev)
5498
        if result.failed or not result.data:
5499
          self.LogWarning("Could not remove snapshot block device %s from node"
5500
                          " %s", dev.logical_id[1], src_node)
5501

    
5502
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5503
    if result.failed or not result.data:
5504
      self.LogWarning("Could not finalize export for instance %s on node %s",
5505
                      instance.name, dst_node.name)
5506

    
5507
    nodelist = self.cfg.GetNodeList()
5508
    nodelist.remove(dst_node.name)
5509

    
5510
    # on one-node clusters nodelist will be empty after the removal
5511
    # if we proceed the backup would be removed because OpQueryExports
5512
    # substitutes an empty list with the full cluster node list.
5513
    if nodelist:
5514
      exportlist = self.rpc.call_export_list(nodelist)
5515
      for node in exportlist:
5516
        if exportlist[node].failed:
5517
          continue
5518
        if instance.name in exportlist[node].data:
5519
          if not self.rpc.call_export_remove(node, instance.name):
5520
            self.LogWarning("Could not remove older export for instance %s"
5521
                            " on node %s", instance.name, node)
5522

    
5523

    
5524
class LURemoveExport(NoHooksLU):
5525
  """Remove exports related to the named instance.
5526

5527
  """
5528
  _OP_REQP = ["instance_name"]
5529
  REQ_BGL = False
5530

    
5531
  def ExpandNames(self):
5532
    self.needed_locks = {}
5533
    # We need all nodes to be locked in order for RemoveExport to work, but we
5534
    # don't need to lock the instance itself, as nothing will happen to it (and
5535
    # we can remove exports also for a removed instance)
5536
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5537

    
5538
  def CheckPrereq(self):
5539
    """Check prerequisites.
5540
    """
5541
    pass
5542

    
5543
  def Exec(self, feedback_fn):
5544
    """Remove any export.
5545

5546
    """
5547
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5548
    # If the instance was not found we'll try with the name that was passed in.
5549
    # This will only work if it was an FQDN, though.
5550
    fqdn_warn = False
5551
    if not instance_name:
5552
      fqdn_warn = True
5553
      instance_name = self.op.instance_name
5554

    
5555
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5556
      locking.LEVEL_NODE])
5557
    found = False
5558
    for node in exportlist:
5559
      if exportlist[node].failed:
5560
        self.LogWarning("Failed to query node %s, continuing" % node)
5561
        continue
5562
      if instance_name in exportlist[node].data:
5563
        found = True
5564
        result = self.rpc.call_export_remove(node, instance_name)
5565
        if result.failed or not result.data:
5566
          logging.error("Could not remove export for instance %s"
5567
                        " on node %s", instance_name, node)
5568

    
5569
    if fqdn_warn and not found:
5570
      feedback_fn("Export not found. If trying to remove an export belonging"
5571
                  " to a deleted instance please use its Fully Qualified"
5572
                  " Domain Name.")
5573

    
5574

    
5575
class TagsLU(NoHooksLU):
5576
  """Generic tags LU.
5577

5578
  This is an abstract class which is the parent of all the other tags LUs.
5579

5580
  """
5581

    
5582
  def ExpandNames(self):
5583
    self.needed_locks = {}
5584
    if self.op.kind == constants.TAG_NODE:
5585
      name = self.cfg.ExpandNodeName(self.op.name)
5586
      if name is None:
5587
        raise errors.OpPrereqError("Invalid node name (%s)" %
5588
                                   (self.op.name,))
5589
      self.op.name = name
5590
      self.needed_locks[locking.LEVEL_NODE] = name
5591
    elif self.op.kind == constants.TAG_INSTANCE:
5592
      name = self.cfg.ExpandInstanceName(self.op.name)
5593
      if name is None:
5594
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5595
                                   (self.op.name,))
5596
      self.op.name = name
5597
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5598

    
5599
  def CheckPrereq(self):
5600
    """Check prerequisites.
5601

5602
    """
5603
    if self.op.kind == constants.TAG_CLUSTER:
5604
      self.target = self.cfg.GetClusterInfo()
5605
    elif self.op.kind == constants.TAG_NODE:
5606
      self.target = self.cfg.GetNodeInfo(self.op.name)
5607
    elif self.op.kind == constants.TAG_INSTANCE:
5608
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5609
    else:
5610
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5611
                                 str(self.op.kind))
5612

    
5613

    
5614
class LUGetTags(TagsLU):
5615
  """Returns the tags of a given object.
5616

5617
  """
5618
  _OP_REQP = ["kind", "name"]
5619
  REQ_BGL = False
5620

    
5621
  def Exec(self, feedback_fn):
5622
    """Returns the tag list.
5623

5624
    """
5625
    return list(self.target.GetTags())
5626

    
5627

    
5628
class LUSearchTags(NoHooksLU):
5629
  """Searches the tags for a given pattern.
5630

5631
  """
5632
  _OP_REQP = ["pattern"]
5633
  REQ_BGL = False
5634

    
5635
  def ExpandNames(self):
5636
    self.needed_locks = {}
5637

    
5638
  def CheckPrereq(self):
5639
    """Check prerequisites.
5640

5641
    This checks the pattern passed for validity by compiling it.
5642

5643
    """
5644
    try:
5645
      self.re = re.compile(self.op.pattern)
5646
    except re.error, err:
5647
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5648
                                 (self.op.pattern, err))
5649

    
5650
  def Exec(self, feedback_fn):
5651
    """Returns the tag list.
5652

5653
    """
5654
    cfg = self.cfg
5655
    tgts = [("/cluster", cfg.GetClusterInfo())]
5656
    ilist = cfg.GetAllInstancesInfo().values()
5657
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5658
    nlist = cfg.GetAllNodesInfo().values()
5659
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5660
    results = []
5661
    for path, target in tgts:
5662
      for tag in target.GetTags():
5663
        if self.re.search(tag):
5664
          results.append((path, tag))
5665
    return results
5666

    
5667

    
5668
class LUAddTags(TagsLU):
5669
  """Sets a tag on a given object.
5670

5671
  """
5672
  _OP_REQP = ["kind", "name", "tags"]
5673
  REQ_BGL = False
5674

    
5675
  def CheckPrereq(self):
5676
    """Check prerequisites.
5677

5678
    This checks the type and length of the tag name and value.
5679

5680
    """
5681
    TagsLU.CheckPrereq(self)
5682
    for tag in self.op.tags:
5683
      objects.TaggableObject.ValidateTag(tag)
5684

    
5685
  def Exec(self, feedback_fn):
5686
    """Sets the tag.
5687

5688
    """
5689
    try:
5690
      for tag in self.op.tags:
5691
        self.target.AddTag(tag)
5692
    except errors.TagError, err:
5693
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5694
    try:
5695
      self.cfg.Update(self.target)
5696
    except errors.ConfigurationError:
5697
      raise errors.OpRetryError("There has been a modification to the"
5698
                                " config file and the operation has been"
5699
                                " aborted. Please retry.")
5700

    
5701

    
5702
class LUDelTags(TagsLU):
5703
  """Delete a list of tags from a given object.
5704

5705
  """
5706
  _OP_REQP = ["kind", "name", "tags"]
5707
  REQ_BGL = False
5708

    
5709
  def CheckPrereq(self):
5710
    """Check prerequisites.
5711

5712
    This checks that we have the given tag.
5713

5714
    """
5715
    TagsLU.CheckPrereq(self)
5716
    for tag in self.op.tags:
5717
      objects.TaggableObject.ValidateTag(tag)
5718
    del_tags = frozenset(self.op.tags)
5719
    cur_tags = self.target.GetTags()
5720
    if not del_tags <= cur_tags:
5721
      diff_tags = del_tags - cur_tags
5722
      diff_names = ["'%s'" % tag for tag in diff_tags]
5723
      diff_names.sort()
5724
      raise errors.OpPrereqError("Tag(s) %s not found" %
5725
                                 (",".join(diff_names)))
5726

    
5727
  def Exec(self, feedback_fn):
5728
    """Remove the tag from the object.
5729

5730
    """
5731
    for tag in self.op.tags:
5732
      self.target.RemoveTag(tag)
5733
    try:
5734
      self.cfg.Update(self.target)
5735
    except errors.ConfigurationError:
5736
      raise errors.OpRetryError("There has been a modification to the"
5737
                                " config file and the operation has been"
5738
                                " aborted. Please retry.")
5739

    
5740

    
5741
class LUTestDelay(NoHooksLU):
5742
  """Sleep for a specified amount of time.
5743

5744
  This LU sleeps on the master and/or nodes for a specified amount of
5745
  time.
5746

5747
  """
5748
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5749
  REQ_BGL = False
5750

    
5751
  def ExpandNames(self):
5752
    """Expand names and set required locks.
5753

5754
    This expands the node list, if any.
5755

5756
    """
5757
    self.needed_locks = {}
5758
    if self.op.on_nodes:
5759
      # _GetWantedNodes can be used here, but is not always appropriate to use
5760
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5761
      # more information.
5762
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5763
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5764

    
5765
  def CheckPrereq(self):
5766
    """Check prerequisites.
5767

5768
    """
5769

    
5770
  def Exec(self, feedback_fn):
5771
    """Do the actual sleep.
5772

5773
    """
5774
    if self.op.on_master:
5775
      if not utils.TestDelay(self.op.duration):
5776
        raise errors.OpExecError("Error during master delay test")
5777
    if self.op.on_nodes:
5778
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5779
      if not result:
5780
        raise errors.OpExecError("Complete failure from rpc call")
5781
      for node, node_result in result.items():
5782
        node_result.Raise()
5783
        if not node_result.data:
5784
          raise errors.OpExecError("Failure during rpc call to node %s,"
5785
                                   " result: %s" % (node, node_result.data))
5786

    
5787

    
5788
class IAllocator(object):
5789
  """IAllocator framework.
5790

5791
  An IAllocator instance has three sets of attributes:
5792
    - cfg that is needed to query the cluster
5793
    - input data (all members of the _KEYS class attribute are required)
5794
    - four buffer attributes (in|out_data|text), that represent the
5795
      input (to the external script) in text and data structure format,
5796
      and the output from it, again in two formats
5797
    - the result variables from the script (success, info, nodes) for
5798
      easy usage
5799

5800
  """
5801
  _ALLO_KEYS = [
5802
    "mem_size", "disks", "disk_template",
5803
    "os", "tags", "nics", "vcpus", "hypervisor",
5804
    ]
5805
  _RELO_KEYS = [
5806
    "relocate_from",
5807
    ]
5808

    
5809
  def __init__(self, lu, mode, name, **kwargs):
5810
    self.lu = lu
5811
    # init buffer variables
5812
    self.in_text = self.out_text = self.in_data = self.out_data = None
5813
    # init all input fields so that pylint is happy
5814
    self.mode = mode
5815
    self.name = name
5816
    self.mem_size = self.disks = self.disk_template = None
5817
    self.os = self.tags = self.nics = self.vcpus = None
5818
    self.relocate_from = None
5819
    # computed fields
5820
    self.required_nodes = None
5821
    # init result fields
5822
    self.success = self.info = self.nodes = None
5823
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5824
      keyset = self._ALLO_KEYS
5825
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5826
      keyset = self._RELO_KEYS
5827
    else:
5828
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5829
                                   " IAllocator" % self.mode)
5830
    for key in kwargs:
5831
      if key not in keyset:
5832
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5833
                                     " IAllocator" % key)
5834
      setattr(self, key, kwargs[key])
5835
    for key in keyset:
5836
      if key not in kwargs:
5837
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5838
                                     " IAllocator" % key)
5839
    self._BuildInputData()
5840

    
5841
  def _ComputeClusterData(self):
5842
    """Compute the generic allocator input data.
5843

5844
    This is the data that is independent of the actual operation.
5845

5846
    """
5847
    cfg = self.lu.cfg
5848
    cluster_info = cfg.GetClusterInfo()
5849
    # cluster data
5850
    data = {
5851
      "version": 1,
5852
      "cluster_name": cfg.GetClusterName(),
5853
      "cluster_tags": list(cluster_info.GetTags()),
5854
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5855
      # we don't have job IDs
5856
      }
5857
    iinfo = cfg.GetAllInstancesInfo().values()
5858
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5859

    
5860
    # node data
5861
    node_results = {}
5862
    node_list = cfg.GetNodeList()
5863

    
5864
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5865
      hypervisor = self.hypervisor
5866
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5867
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5868

    
5869
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5870
                                           hypervisor)
5871
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5872
                       cluster_info.enabled_hypervisors)
5873
    for nname in node_list:
5874
      ninfo = cfg.GetNodeInfo(nname)
5875
      node_data[nname].Raise()
5876
      if not isinstance(node_data[nname].data, dict):
5877
        raise errors.OpExecError("Can't get data for node %s" % nname)
5878
      remote_info = node_data[nname].data
5879
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5880
                   'vg_size', 'vg_free', 'cpu_total']:
5881
        if attr not in remote_info:
5882
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5883
                                   (nname, attr))
5884
        try:
5885
          remote_info[attr] = int(remote_info[attr])
5886
        except ValueError, err:
5887
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5888
                                   " %s" % (nname, attr, str(err)))
5889
      # compute memory used by primary instances
5890
      i_p_mem = i_p_up_mem = 0
5891
      for iinfo, beinfo in i_list:
5892
        if iinfo.primary_node == nname:
5893
          i_p_mem += beinfo[constants.BE_MEMORY]
5894
          if iinfo.name not in node_iinfo[nname]:
5895
            i_used_mem = 0
5896
          else:
5897
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5898
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5899
          remote_info['memory_free'] -= max(0, i_mem_diff)
5900

    
5901
          if iinfo.status == "up":
5902
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5903

    
5904
      # compute memory used by instances
5905
      pnr = {
5906
        "tags": list(ninfo.GetTags()),
5907
        "total_memory": remote_info['memory_total'],
5908
        "reserved_memory": remote_info['memory_dom0'],
5909
        "free_memory": remote_info['memory_free'],
5910
        "i_pri_memory": i_p_mem,
5911
        "i_pri_up_memory": i_p_up_mem,
5912
        "total_disk": remote_info['vg_size'],
5913
        "free_disk": remote_info['vg_free'],
5914
        "primary_ip": ninfo.primary_ip,
5915
        "secondary_ip": ninfo.secondary_ip,
5916
        "total_cpus": remote_info['cpu_total'],
5917
        }
5918
      node_results[nname] = pnr
5919
    data["nodes"] = node_results
5920

    
5921
    # instance data
5922
    instance_data = {}
5923
    for iinfo, beinfo in i_list:
5924
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5925
                  for n in iinfo.nics]
5926
      pir = {
5927
        "tags": list(iinfo.GetTags()),
5928
        "should_run": iinfo.status == "up",
5929
        "vcpus": beinfo[constants.BE_VCPUS],
5930
        "memory": beinfo[constants.BE_MEMORY],
5931
        "os": iinfo.os,
5932
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5933
        "nics": nic_data,
5934
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5935
        "disk_template": iinfo.disk_template,
5936
        "hypervisor": iinfo.hypervisor,
5937
        }
5938
      instance_data[iinfo.name] = pir
5939

    
5940
    data["instances"] = instance_data
5941

    
5942
    self.in_data = data
5943

    
5944
  def _AddNewInstance(self):
5945
    """Add new instance data to allocator structure.
5946

5947
    This in combination with _AllocatorGetClusterData will create the
5948
    correct structure needed as input for the allocator.
5949

5950
    The checks for the completeness of the opcode must have already been
5951
    done.
5952

5953
    """
5954
    data = self.in_data
5955
    if len(self.disks) != 2:
5956
      raise errors.OpExecError("Only two-disk configurations supported")
5957

    
5958
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5959

    
5960
    if self.disk_template in constants.DTS_NET_MIRROR:
5961
      self.required_nodes = 2
5962
    else:
5963
      self.required_nodes = 1
5964
    request = {
5965
      "type": "allocate",
5966
      "name": self.name,
5967
      "disk_template": self.disk_template,
5968
      "tags": self.tags,
5969
      "os": self.os,
5970
      "vcpus": self.vcpus,
5971
      "memory": self.mem_size,
5972
      "disks": self.disks,
5973
      "disk_space_total": disk_space,
5974
      "nics": self.nics,
5975
      "required_nodes": self.required_nodes,
5976
      }
5977
    data["request"] = request
5978

    
5979
  def _AddRelocateInstance(self):
5980
    """Add relocate instance data to allocator structure.
5981

5982
    This in combination with _IAllocatorGetClusterData will create the
5983
    correct structure needed as input for the allocator.
5984

5985
    The checks for the completeness of the opcode must have already been
5986
    done.
5987

5988
    """
5989
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5990
    if instance is None:
5991
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5992
                                   " IAllocator" % self.name)
5993

    
5994
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5995
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5996

    
5997
    if len(instance.secondary_nodes) != 1:
5998
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5999

    
6000
    self.required_nodes = 1
6001
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6002
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6003

    
6004
    request = {
6005
      "type": "relocate",
6006
      "name": self.name,
6007
      "disk_space_total": disk_space,
6008
      "required_nodes": self.required_nodes,
6009
      "relocate_from": self.relocate_from,
6010
      }
6011
    self.in_data["request"] = request
6012

    
6013
  def _BuildInputData(self):
6014
    """Build input data structures.
6015

6016
    """
6017
    self._ComputeClusterData()
6018

    
6019
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6020
      self._AddNewInstance()
6021
    else:
6022
      self._AddRelocateInstance()
6023

    
6024
    self.in_text = serializer.Dump(self.in_data)
6025

    
6026
  def Run(self, name, validate=True, call_fn=None):
6027
    """Run an instance allocator and return the results.
6028

6029
    """
6030
    if call_fn is None:
6031
      call_fn = self.lu.rpc.call_iallocator_runner
6032
    data = self.in_text
6033

    
6034
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6035
    result.Raise()
6036

    
6037
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6038
      raise errors.OpExecError("Invalid result from master iallocator runner")
6039

    
6040
    rcode, stdout, stderr, fail = result.data
6041

    
6042
    if rcode == constants.IARUN_NOTFOUND:
6043
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6044
    elif rcode == constants.IARUN_FAILURE:
6045
      raise errors.OpExecError("Instance allocator call failed: %s,"
6046
                               " output: %s" % (fail, stdout+stderr))
6047
    self.out_text = stdout
6048
    if validate:
6049
      self._ValidateResult()
6050

    
6051
  def _ValidateResult(self):
6052
    """Process the allocator results.
6053

6054
    This will process and if successful save the result in
6055
    self.out_data and the other parameters.
6056

6057
    """
6058
    try:
6059
      rdict = serializer.Load(self.out_text)
6060
    except Exception, err:
6061
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6062

    
6063
    if not isinstance(rdict, dict):
6064
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6065

    
6066
    for key in "success", "info", "nodes":
6067
      if key not in rdict:
6068
        raise errors.OpExecError("Can't parse iallocator results:"
6069
                                 " missing key '%s'" % key)
6070
      setattr(self, key, rdict[key])
6071

    
6072
    if not isinstance(rdict["nodes"], list):
6073
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6074
                               " is not a list")
6075
    self.out_data = rdict
6076

    
6077

    
6078
class LUTestAllocator(NoHooksLU):
6079
  """Run allocator tests.
6080

6081
  This LU runs the allocator tests
6082

6083
  """
6084
  _OP_REQP = ["direction", "mode", "name"]
6085

    
6086
  def CheckPrereq(self):
6087
    """Check prerequisites.
6088

6089
    This checks the opcode parameters depending on the director and mode test.
6090

6091
    """
6092
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6093
      for attr in ["name", "mem_size", "disks", "disk_template",
6094
                   "os", "tags", "nics", "vcpus"]:
6095
        if not hasattr(self.op, attr):
6096
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6097
                                     attr)
6098
      iname = self.cfg.ExpandInstanceName(self.op.name)
6099
      if iname is not None:
6100
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6101
                                   iname)
6102
      if not isinstance(self.op.nics, list):
6103
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6104
      for row in self.op.nics:
6105
        if (not isinstance(row, dict) or
6106
            "mac" not in row or
6107
            "ip" not in row or
6108
            "bridge" not in row):
6109
          raise errors.OpPrereqError("Invalid contents of the"
6110
                                     " 'nics' parameter")
6111
      if not isinstance(self.op.disks, list):
6112
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6113
      if len(self.op.disks) != 2:
6114
        raise errors.OpPrereqError("Only two-disk configurations supported")
6115
      for row in self.op.disks:
6116
        if (not isinstance(row, dict) or
6117
            "size" not in row or
6118
            not isinstance(row["size"], int) or
6119
            "mode" not in row or
6120
            row["mode"] not in ['r', 'w']):
6121
          raise errors.OpPrereqError("Invalid contents of the"
6122
                                     " 'disks' parameter")
6123
      if self.op.hypervisor is None:
6124
        self.op.hypervisor = self.cfg.GetHypervisorType()
6125
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6126
      if not hasattr(self.op, "name"):
6127
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6128
      fname = self.cfg.ExpandInstanceName(self.op.name)
6129
      if fname is None:
6130
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6131
                                   self.op.name)
6132
      self.op.name = fname
6133
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6134
    else:
6135
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6136
                                 self.op.mode)
6137

    
6138
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6139
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6140
        raise errors.OpPrereqError("Missing allocator name")
6141
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6142
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6143
                                 self.op.direction)
6144

    
6145
  def Exec(self, feedback_fn):
6146
    """Run the allocator test.
6147

6148
    """
6149
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6150
      ial = IAllocator(self,
6151
                       mode=self.op.mode,
6152
                       name=self.op.name,
6153
                       mem_size=self.op.mem_size,
6154
                       disks=self.op.disks,
6155
                       disk_template=self.op.disk_template,
6156
                       os=self.op.os,
6157
                       tags=self.op.tags,
6158
                       nics=self.op.nics,
6159
                       vcpus=self.op.vcpus,
6160
                       hypervisor=self.op.hypervisor,
6161
                       )
6162
    else:
6163
      ial = IAllocator(self,
6164
                       mode=self.op.mode,
6165
                       name=self.op.name,
6166
                       relocate_from=list(self.relocate_from),
6167
                       )
6168

    
6169
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6170
      result = ial.in_text
6171
    else:
6172
      ial.Run(self.op.allocator, validate=False)
6173
      result = ial.out_text
6174
    return result