Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 0fff97e9

History | View | Annotate | Download (213.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = lu.cfg.GetInstanceList()
396
  return utils.NiceSort(wanted)
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419
                          memory, vcpus, nics):
420
  """Builds instance related env variables for hooks
421

422
  This builds the hook environment from individual variables.
423

424
  @type name: string
425
  @param name: the name of the instance
426
  @type primary_node: string
427
  @param primary_node: the name of the instance's primary node
428
  @type secondary_nodes: list
429
  @param secondary_nodes: list of secondary nodes as strings
430
  @type os_type: string
431
  @param os_type: the name of the instance's OS
432
  @type status: string
433
  @param status: the desired status of the instances
434
  @type memory: string
435
  @param memory: the memory size of the instance
436
  @type vcpus: string
437
  @param vcpus: the count of VCPUs the instance has
438
  @type nics: list
439
  @param nics: list of tuples (ip, bridge, mac) representing
440
      the NICs the instance  has
441
  @rtype: dict
442
  @return: the hook environment for this instance
443

444
  """
445
  env = {
446
    "OP_TARGET": name,
447
    "INSTANCE_NAME": name,
448
    "INSTANCE_PRIMARY": primary_node,
449
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450
    "INSTANCE_OS_TYPE": os_type,
451
    "INSTANCE_STATUS": status,
452
    "INSTANCE_MEMORY": memory,
453
    "INSTANCE_VCPUS": vcpus,
454
  }
455

    
456
  if nics:
457
    nic_count = len(nics)
458
    for idx, (ip, bridge, mac) in enumerate(nics):
459
      if ip is None:
460
        ip = ""
461
      env["INSTANCE_NIC%d_IP" % idx] = ip
462
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
464
  else:
465
    nic_count = 0
466

    
467
  env["INSTANCE_NIC_COUNT"] = nic_count
468

    
469
  return env
470

    
471

    
472
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473
  """Builds instance related env variables for hooks from an object.
474

475
  @type lu: L{LogicalUnit}
476
  @param lu: the logical unit on whose behalf we execute
477
  @type instance: L{objects.Instance}
478
  @param instance: the instance for which we should build the
479
      environment
480
  @type override: dict
481
  @param override: dictionary with key/values that will override
482
      our values
483
  @rtype: dict
484
  @return: the hook environment dictionary
485

486
  """
487
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
488
  args = {
489
    'name': instance.name,
490
    'primary_node': instance.primary_node,
491
    'secondary_nodes': instance.secondary_nodes,
492
    'os_type': instance.os,
493
    'status': instance.os,
494
    'memory': bep[constants.BE_MEMORY],
495
    'vcpus': bep[constants.BE_VCPUS],
496
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
497
  }
498
  if override:
499
    args.update(override)
500
  return _BuildInstanceHookEnv(**args)
501

    
502

    
503
def _CheckInstanceBridgesExist(lu, instance):
504
  """Check that the brigdes needed by an instance exist.
505

506
  """
507
  # check bridges existance
508
  brlist = [nic.bridge for nic in instance.nics]
509
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
510
  result.Raise()
511
  if not result.data:
512
    raise errors.OpPrereqError("One or more target bridges %s does not"
513
                               " exist on destination node '%s'" %
514
                               (brlist, instance.primary_node))
515

    
516

    
517
class LUDestroyCluster(NoHooksLU):
518
  """Logical unit for destroying the cluster.
519

520
  """
521
  _OP_REQP = []
522

    
523
  def CheckPrereq(self):
524
    """Check prerequisites.
525

526
    This checks whether the cluster is empty.
527

528
    Any errors are signalled by raising errors.OpPrereqError.
529

530
    """
531
    master = self.cfg.GetMasterNode()
532

    
533
    nodelist = self.cfg.GetNodeList()
534
    if len(nodelist) != 1 or nodelist[0] != master:
535
      raise errors.OpPrereqError("There are still %d node(s) in"
536
                                 " this cluster." % (len(nodelist) - 1))
537
    instancelist = self.cfg.GetInstanceList()
538
    if instancelist:
539
      raise errors.OpPrereqError("There are still %d instance(s) in"
540
                                 " this cluster." % len(instancelist))
541

    
542
  def Exec(self, feedback_fn):
543
    """Destroys the cluster.
544

545
    """
546
    master = self.cfg.GetMasterNode()
547
    result = self.rpc.call_node_stop_master(master, False)
548
    result.Raise()
549
    if not result.data:
550
      raise errors.OpExecError("Could not disable the master role")
551
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552
    utils.CreateBackup(priv_key)
553
    utils.CreateBackup(pub_key)
554
    return master
555

    
556

    
557
class LUVerifyCluster(LogicalUnit):
558
  """Verifies the cluster status.
559

560
  """
561
  HPATH = "cluster-verify"
562
  HTYPE = constants.HTYPE_CLUSTER
563
  _OP_REQP = ["skip_checks"]
564
  REQ_BGL = False
565

    
566
  def ExpandNames(self):
567
    self.needed_locks = {
568
      locking.LEVEL_NODE: locking.ALL_SET,
569
      locking.LEVEL_INSTANCE: locking.ALL_SET,
570
    }
571
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
572

    
573
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574
                  node_result, feedback_fn, master_files):
575
    """Run multiple tests against a node.
576

577
    Test list:
578

579
      - compares ganeti version
580
      - checks vg existance and size > 20G
581
      - checks config file checksum
582
      - checks ssh to other nodes
583

584
    @type nodeinfo: L{objects.Node}
585
    @param nodeinfo: the node to check
586
    @param file_list: required list of files
587
    @param local_cksum: dictionary of local files and their checksums
588
    @param node_result: the results from the node
589
    @param feedback_fn: function used to accumulate results
590
    @param master_files: list of files that only masters should have
591

592
    """
593
    node = nodeinfo.name
594

    
595
    # main result, node_result should be a non-empty dict
596
    if not node_result or not isinstance(node_result, dict):
597
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
598
      return True
599

    
600
    # compares ganeti version
601
    local_version = constants.PROTOCOL_VERSION
602
    remote_version = node_result.get('version', None)
603
    if not remote_version:
604
      feedback_fn("  - ERROR: connection to %s failed" % (node))
605
      return True
606

    
607
    if local_version != remote_version:
608
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
609
                      (local_version, node, remote_version))
610
      return True
611

    
612
    # checks vg existance and size > 20G
613

    
614
    bad = False
615
    vglist = node_result.get(constants.NV_VGLIST, None)
616
    if not vglist:
617
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
618
                      (node,))
619
      bad = True
620
    else:
621
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622
                                            constants.MIN_VG_SIZE)
623
      if vgstatus:
624
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
625
        bad = True
626

    
627
    # checks config file checksum
628

    
629
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
630
    if not isinstance(remote_cksum, dict):
631
      bad = True
632
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
633
    else:
634
      for file_name in file_list:
635
        node_is_mc = nodeinfo.master_candidate
636
        must_have_file = file_name not in master_files
637
        if file_name not in remote_cksum:
638
          if node_is_mc or must_have_file:
639
            bad = True
640
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
641
        elif remote_cksum[file_name] != local_cksum[file_name]:
642
          if node_is_mc or must_have_file:
643
            bad = True
644
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
645
          else:
646
            # not candidate and this is not a must-have file
647
            bad = True
648
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
649
                        " '%s'" % file_name)
650
        else:
651
          # all good, except non-master/non-must have combination
652
          if not node_is_mc and not must_have_file:
653
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
654
                        " candidates" % file_name)
655

    
656
    # checks ssh to any
657

    
658
    if constants.NV_NODELIST not in node_result:
659
      bad = True
660
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
661
    else:
662
      if node_result[constants.NV_NODELIST]:
663
        bad = True
664
        for node in node_result[constants.NV_NODELIST]:
665
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
666
                          (node, node_result[constants.NV_NODELIST][node]))
667

    
668
    if constants.NV_NODENETTEST not in node_result:
669
      bad = True
670
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
671
    else:
672
      if node_result[constants.NV_NODENETTEST]:
673
        bad = True
674
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
675
        for node in nlist:
676
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
677
                          (node, node_result[constants.NV_NODENETTEST][node]))
678

    
679
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680
    if isinstance(hyp_result, dict):
681
      for hv_name, hv_result in hyp_result.iteritems():
682
        if hv_result is not None:
683
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
684
                      (hv_name, hv_result))
685
    return bad
686

    
687
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688
                      node_instance, feedback_fn):
689
    """Verify an instance.
690

691
    This function checks to see if the required block devices are
692
    available on the instance's node.
693

694
    """
695
    bad = False
696

    
697
    node_current = instanceconfig.primary_node
698

    
699
    node_vol_should = {}
700
    instanceconfig.MapLVsByNode(node_vol_should)
701

    
702
    for node in node_vol_should:
703
      for volume in node_vol_should[node]:
704
        if node not in node_vol_is or volume not in node_vol_is[node]:
705
          feedback_fn("  - ERROR: volume %s missing on node %s" %
706
                          (volume, node))
707
          bad = True
708

    
709
    if not instanceconfig.status == 'down':
710
      if (node_current not in node_instance or
711
          not instance in node_instance[node_current]):
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758
    """Verify N+1 Memory Resilience.
759

760
    Check that if one single node dies we can still start all the instances it
761
    was primary for.
762

763
    """
764
    bad = False
765

    
766
    for node, nodeinfo in node_info.iteritems():
767
      # This code checks that every node which is now listed as secondary has
768
      # enough memory to host all instances it is supposed to should a single
769
      # other node in the cluster fail.
770
      # FIXME: not ready for failover to an arbitrary node
771
      # FIXME: does not support file-backed instances
772
      # WARNING: we currently take into account down instances as well as up
773
      # ones, considering that even if they're down someone might want to start
774
      # them even in the event of a node failure.
775
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776
        needed_mem = 0
777
        for instance in instances:
778
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779
          if bep[constants.BE_AUTO_BALANCE]:
780
            needed_mem += bep[constants.BE_MEMORY]
781
        if nodeinfo['mfree'] < needed_mem:
782
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
783
                      " failovers should node %s fail" % (node, prinode))
784
          bad = True
785
    return bad
786

    
787
  def CheckPrereq(self):
788
    """Check prerequisites.
789

790
    Transform the list of checks we're going to skip into a set and check that
791
    all its members are valid.
792

793
    """
794
    self.skip_set = frozenset(self.op.skip_checks)
795
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
797

    
798
  def BuildHooksEnv(self):
799
    """Build hooks env.
800

801
    Cluster-Verify hooks just rone in the post phase and their failure makes
802
    the output be logged in the verify output and the verification to fail.
803

804
    """
805
    all_nodes = self.cfg.GetNodeList()
806
    # TODO: populate the environment with useful information for verify hooks
807
    env = {}
808
    return env, [], all_nodes
809

    
810
  def Exec(self, feedback_fn):
811
    """Verify integrity of cluster, performing various test on nodes.
812

813
    """
814
    bad = False
815
    feedback_fn("* Verifying global settings")
816
    for msg in self.cfg.VerifyConfig():
817
      feedback_fn("  - ERROR: %s" % msg)
818

    
819
    vg_name = self.cfg.GetVGName()
820
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
822
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824
    i_non_redundant = [] # Non redundant instances
825
    i_non_a_balanced = [] # Non auto-balanced instances
826
    node_volume = {}
827
    node_instance = {}
828
    node_info = {}
829
    instance_cfg = {}
830

    
831
    # FIXME: verify OS list
832
    # do local checksums
833
    master_files = [constants.CLUSTER_CONF_FILE]
834

    
835
    file_names = ssconf.SimpleStore().GetFileList()
836
    file_names.append(constants.SSL_CERT_FILE)
837
    file_names.extend(master_files)
838

    
839
    local_checksums = utils.FingerprintFiles(file_names)
840

    
841
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842
    node_verify_param = {
843
      constants.NV_FILELIST: file_names,
844
      constants.NV_NODELIST: nodelist,
845
      constants.NV_HYPERVISOR: hypervisors,
846
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847
                                  node.secondary_ip) for node in nodeinfo],
848
      constants.NV_LVLIST: vg_name,
849
      constants.NV_INSTANCELIST: hypervisors,
850
      constants.NV_VGLIST: None,
851
      constants.NV_VERSION: None,
852
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
853
      }
854
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855
                                           self.cfg.GetClusterName())
856

    
857
    cluster = self.cfg.GetClusterInfo()
858
    master_node = self.cfg.GetMasterNode()
859
    for node_i in nodeinfo:
860
      node = node_i.name
861
      nresult = all_nvinfo[node].data
862

    
863
      if node == master_node:
864
        ntype = "master"
865
      elif node_i.master_candidate:
866
        ntype = "master candidate"
867
      else:
868
        ntype = "regular"
869
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
870

    
871
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
872
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
873
        bad = True
874
        continue
875

    
876
      result = self._VerifyNode(node_i, file_names, local_checksums,
877
                                nresult, feedback_fn, master_files)
878
      bad = bad or result
879

    
880
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881
      if isinstance(lvdata, basestring):
882
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
883
                    (node, lvdata.encode('string_escape')))
884
        bad = True
885
        node_volume[node] = {}
886
      elif not isinstance(lvdata, dict):
887
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
888
        bad = True
889
        continue
890
      else:
891
        node_volume[node] = lvdata
892

    
893
      # node_instance
894
      idata = nresult.get(constants.NV_INSTANCELIST, None)
895
      if not isinstance(idata, list):
896
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
897
                    (node,))
898
        bad = True
899
        continue
900

    
901
      node_instance[node] = idata
902

    
903
      # node_info
904
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
905
      if not isinstance(nodeinfo, dict):
906
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
907
        bad = True
908
        continue
909

    
910
      try:
911
        node_info[node] = {
912
          "mfree": int(nodeinfo['memory_free']),
913
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
914
          "pinst": [],
915
          "sinst": [],
916
          # dictionary holding all instances this node is secondary for,
917
          # grouped by their primary node. Each key is a cluster node, and each
918
          # value is a list of instances which have the key as primary and the
919
          # current node as secondary.  this is handy to calculate N+1 memory
920
          # availability if you can only failover from a primary to its
921
          # secondary.
922
          "sinst-by-pnode": {},
923
        }
924
      except ValueError:
925
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926
        bad = True
927
        continue
928

    
929
    node_vol_should = {}
930

    
931
    for instance in instancelist:
932
      feedback_fn("* Verifying instance %s" % instance)
933
      inst_config = self.cfg.GetInstanceInfo(instance)
934
      result =  self._VerifyInstance(instance, inst_config, node_volume,
935
                                     node_instance, feedback_fn)
936
      bad = bad or result
937

    
938
      inst_config.MapLVsByNode(node_vol_should)
939

    
940
      instance_cfg[instance] = inst_config
941

    
942
      pnode = inst_config.primary_node
943
      if pnode in node_info:
944
        node_info[pnode]['pinst'].append(instance)
945
      else:
946
        feedback_fn("  - ERROR: instance %s, connection to primary node"
947
                    " %s failed" % (instance, pnode))
948
        bad = True
949

    
950
      # If the instance is non-redundant we cannot survive losing its primary
951
      # node, so we are not N+1 compliant. On the other hand we have no disk
952
      # templates with more than one secondary so that situation is not well
953
      # supported either.
954
      # FIXME: does not support file-backed instances
955
      if len(inst_config.secondary_nodes) == 0:
956
        i_non_redundant.append(instance)
957
      elif len(inst_config.secondary_nodes) > 1:
958
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
959
                    % instance)
960

    
961
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962
        i_non_a_balanced.append(instance)
963

    
964
      for snode in inst_config.secondary_nodes:
965
        if snode in node_info:
966
          node_info[snode]['sinst'].append(instance)
967
          if pnode not in node_info[snode]['sinst-by-pnode']:
968
            node_info[snode]['sinst-by-pnode'][pnode] = []
969
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
970
        else:
971
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
972
                      " %s failed" % (instance, snode))
973

    
974
    feedback_fn("* Verifying orphan volumes")
975
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976
                                       feedback_fn)
977
    bad = bad or result
978

    
979
    feedback_fn("* Verifying remaining instances")
980
    result = self._VerifyOrphanInstances(instancelist, node_instance,
981
                                         feedback_fn)
982
    bad = bad or result
983

    
984
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985
      feedback_fn("* Verifying N+1 Memory redundancy")
986
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
987
      bad = bad or result
988

    
989
    feedback_fn("* Other Notes")
990
    if i_non_redundant:
991
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
992
                  % len(i_non_redundant))
993

    
994
    if i_non_a_balanced:
995
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
996
                  % len(i_non_a_balanced))
997

    
998
    return not bad
999

    
1000
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001
    """Analize the post-hooks' result
1002

1003
    This method analyses the hook result, handles it, and sends some
1004
    nicely-formatted feedback back to the user.
1005

1006
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008
    @param hooks_results: the results of the multi-node hooks rpc call
1009
    @param feedback_fn: function used send feedback back to the caller
1010
    @param lu_result: previous Exec result
1011
    @return: the new Exec result, based on the previous result
1012
        and hook results
1013

1014
    """
1015
    # We only really run POST phase hooks, and are only interested in
1016
    # their results
1017
    if phase == constants.HOOKS_PHASE_POST:
1018
      # Used to change hooks' output to proper indentation
1019
      indent_re = re.compile('^', re.M)
1020
      feedback_fn("* Hooks Results")
1021
      if not hooks_results:
1022
        feedback_fn("  - ERROR: general communication failure")
1023
        lu_result = 1
1024
      else:
1025
        for node_name in hooks_results:
1026
          show_node_header = True
1027
          res = hooks_results[node_name]
1028
          if res.failed or res.data is False or not isinstance(res.data, list):
1029
            feedback_fn("    Communication failure in hooks execution")
1030
            lu_result = 1
1031
            continue
1032
          for script, hkr, output in res.data:
1033
            if hkr == constants.HKR_FAIL:
1034
              # The node header is only shown once, if there are
1035
              # failing hooks on that node
1036
              if show_node_header:
1037
                feedback_fn("  Node %s:" % node_name)
1038
                show_node_header = False
1039
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1040
              output = indent_re.sub('      ', output)
1041
              feedback_fn("%s" % output)
1042
              lu_result = 1
1043

    
1044
      return lu_result
1045

    
1046

    
1047
class LUVerifyDisks(NoHooksLU):
1048
  """Verifies the cluster disks status.
1049

1050
  """
1051
  _OP_REQP = []
1052
  REQ_BGL = False
1053

    
1054
  def ExpandNames(self):
1055
    self.needed_locks = {
1056
      locking.LEVEL_NODE: locking.ALL_SET,
1057
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1058
    }
1059
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1060

    
1061
  def CheckPrereq(self):
1062
    """Check prerequisites.
1063

1064
    This has no prerequisites.
1065

1066
    """
1067
    pass
1068

    
1069
  def Exec(self, feedback_fn):
1070
    """Verify integrity of cluster disks.
1071

1072
    """
1073
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1074

    
1075
    vg_name = self.cfg.GetVGName()
1076
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1077
    instances = [self.cfg.GetInstanceInfo(name)
1078
                 for name in self.cfg.GetInstanceList()]
1079

    
1080
    nv_dict = {}
1081
    for inst in instances:
1082
      inst_lvs = {}
1083
      if (inst.status != "up" or
1084
          inst.disk_template not in constants.DTS_NET_MIRROR):
1085
        continue
1086
      inst.MapLVsByNode(inst_lvs)
1087
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088
      for node, vol_list in inst_lvs.iteritems():
1089
        for vol in vol_list:
1090
          nv_dict[(node, vol)] = inst
1091

    
1092
    if not nv_dict:
1093
      return result
1094

    
1095
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1096

    
1097
    to_act = set()
1098
    for node in nodes:
1099
      # node_volume
1100
      lvs = node_lvs[node]
1101
      if lvs.failed:
1102
        self.LogWarning("Connection to node %s failed: %s" %
1103
                        (node, lvs.data))
1104
        continue
1105
      lvs = lvs.data
1106
      if isinstance(lvs, basestring):
1107
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108
        res_nlvm[node] = lvs
1109
      elif not isinstance(lvs, dict):
1110
        logging.warning("Connection to node %s failed or invalid data"
1111
                        " returned", node)
1112
        res_nodes.append(node)
1113
        continue
1114

    
1115
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116
        inst = nv_dict.pop((node, lv_name), None)
1117
        if (not lv_online and inst is not None
1118
            and inst.name not in res_instances):
1119
          res_instances.append(inst.name)
1120

    
1121
    # any leftover items in nv_dict are missing LVs, let's arrange the
1122
    # data better
1123
    for key, inst in nv_dict.iteritems():
1124
      if inst.name not in res_missing:
1125
        res_missing[inst.name] = []
1126
      res_missing[inst.name].append(key)
1127

    
1128
    return result
1129

    
1130

    
1131
class LURenameCluster(LogicalUnit):
1132
  """Rename the cluster.
1133

1134
  """
1135
  HPATH = "cluster-rename"
1136
  HTYPE = constants.HTYPE_CLUSTER
1137
  _OP_REQP = ["name"]
1138

    
1139
  def BuildHooksEnv(self):
1140
    """Build hooks env.
1141

1142
    """
1143
    env = {
1144
      "OP_TARGET": self.cfg.GetClusterName(),
1145
      "NEW_NAME": self.op.name,
1146
      }
1147
    mn = self.cfg.GetMasterNode()
1148
    return env, [mn], [mn]
1149

    
1150
  def CheckPrereq(self):
1151
    """Verify that the passed name is a valid one.
1152

1153
    """
1154
    hostname = utils.HostInfo(self.op.name)
1155

    
1156
    new_name = hostname.name
1157
    self.ip = new_ip = hostname.ip
1158
    old_name = self.cfg.GetClusterName()
1159
    old_ip = self.cfg.GetMasterIP()
1160
    if new_name == old_name and new_ip == old_ip:
1161
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162
                                 " cluster has changed")
1163
    if new_ip != old_ip:
1164
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166
                                   " reachable on the network. Aborting." %
1167
                                   new_ip)
1168

    
1169
    self.op.name = new_name
1170

    
1171
  def Exec(self, feedback_fn):
1172
    """Rename the cluster.
1173

1174
    """
1175
    clustername = self.op.name
1176
    ip = self.ip
1177

    
1178
    # shutdown the master IP
1179
    master = self.cfg.GetMasterNode()
1180
    result = self.rpc.call_node_stop_master(master, False)
1181
    if result.failed or not result.data:
1182
      raise errors.OpExecError("Could not disable the master role")
1183

    
1184
    try:
1185
      cluster = self.cfg.GetClusterInfo()
1186
      cluster.cluster_name = clustername
1187
      cluster.master_ip = ip
1188
      self.cfg.Update(cluster)
1189

    
1190
      # update the known hosts file
1191
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192
      node_list = self.cfg.GetNodeList()
1193
      try:
1194
        node_list.remove(master)
1195
      except ValueError:
1196
        pass
1197
      result = self.rpc.call_upload_file(node_list,
1198
                                         constants.SSH_KNOWN_HOSTS_FILE)
1199
      for to_node, to_result in result.iteritems():
1200
        if to_result.failed or not to_result.data:
1201
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1202

    
1203
    finally:
1204
      result = self.rpc.call_node_start_master(master, False)
1205
      if result.failed or not result.data:
1206
        self.LogWarning("Could not re-enable the master role on"
1207
                        " the master, please restart manually.")
1208

    
1209

    
1210
def _RecursiveCheckIfLVMBased(disk):
1211
  """Check if the given disk or its children are lvm-based.
1212

1213
  @type disk: L{objects.Disk}
1214
  @param disk: the disk to check
1215
  @rtype: booleean
1216
  @return: boolean indicating whether a LD_LV dev_type was found or not
1217

1218
  """
1219
  if disk.children:
1220
    for chdisk in disk.children:
1221
      if _RecursiveCheckIfLVMBased(chdisk):
1222
        return True
1223
  return disk.dev_type == constants.LD_LV
1224

    
1225

    
1226
class LUSetClusterParams(LogicalUnit):
1227
  """Change the parameters of the cluster.
1228

1229
  """
1230
  HPATH = "cluster-modify"
1231
  HTYPE = constants.HTYPE_CLUSTER
1232
  _OP_REQP = []
1233
  REQ_BGL = False
1234

    
1235
  def CheckParameters(self):
1236
    """Check parameters
1237

1238
    """
1239
    if not hasattr(self.op, "candidate_pool_size"):
1240
      self.op.candidate_pool_size = None
1241
    if self.op.candidate_pool_size is not None:
1242
      try:
1243
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244
      except ValueError, err:
1245
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1246
                                   str(err))
1247
      if self.op.candidate_pool_size < 1:
1248
        raise errors.OpPrereqError("At least one master candidate needed")
1249

    
1250
  def ExpandNames(self):
1251
    # FIXME: in the future maybe other cluster params won't require checking on
1252
    # all nodes to be modified.
1253
    self.needed_locks = {
1254
      locking.LEVEL_NODE: locking.ALL_SET,
1255
    }
1256
    self.share_locks[locking.LEVEL_NODE] = 1
1257

    
1258
  def BuildHooksEnv(self):
1259
    """Build hooks env.
1260

1261
    """
1262
    env = {
1263
      "OP_TARGET": self.cfg.GetClusterName(),
1264
      "NEW_VG_NAME": self.op.vg_name,
1265
      }
1266
    mn = self.cfg.GetMasterNode()
1267
    return env, [mn], [mn]
1268

    
1269
  def CheckPrereq(self):
1270
    """Check prerequisites.
1271

1272
    This checks whether the given params don't conflict and
1273
    if the given volume group is valid.
1274

1275
    """
1276
    # FIXME: This only works because there is only one parameter that can be
1277
    # changed or removed.
1278
    if self.op.vg_name is not None and not self.op.vg_name:
1279
      instances = self.cfg.GetAllInstancesInfo().values()
1280
      for inst in instances:
1281
        for disk in inst.disks:
1282
          if _RecursiveCheckIfLVMBased(disk):
1283
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1284
                                       " lvm-based instances exist")
1285

    
1286
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1287

    
1288
    # if vg_name not None, checks given volume group on all nodes
1289
    if self.op.vg_name:
1290
      vglist = self.rpc.call_vg_list(node_list)
1291
      for node in node_list:
1292
        if vglist[node].failed:
1293
          # ignoring down node
1294
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1295
          continue
1296
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1297
                                              self.op.vg_name,
1298
                                              constants.MIN_VG_SIZE)
1299
        if vgstatus:
1300
          raise errors.OpPrereqError("Error on node '%s': %s" %
1301
                                     (node, vgstatus))
1302

    
1303
    self.cluster = cluster = self.cfg.GetClusterInfo()
1304
    # validate beparams changes
1305
    if self.op.beparams:
1306
      utils.CheckBEParams(self.op.beparams)
1307
      self.new_beparams = cluster.FillDict(
1308
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1309

    
1310
    # hypervisor list/parameters
1311
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312
    if self.op.hvparams:
1313
      if not isinstance(self.op.hvparams, dict):
1314
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315
      for hv_name, hv_dict in self.op.hvparams.items():
1316
        if hv_name not in self.new_hvparams:
1317
          self.new_hvparams[hv_name] = hv_dict
1318
        else:
1319
          self.new_hvparams[hv_name].update(hv_dict)
1320

    
1321
    if self.op.enabled_hypervisors is not None:
1322
      self.hv_list = self.op.enabled_hypervisors
1323
    else:
1324
      self.hv_list = cluster.enabled_hypervisors
1325

    
1326
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327
      # either the enabled list has changed, or the parameters have, validate
1328
      for hv_name, hv_params in self.new_hvparams.items():
1329
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330
            (self.op.enabled_hypervisors and
1331
             hv_name in self.op.enabled_hypervisors)):
1332
          # either this is a new hypervisor, or its parameters have changed
1333
          hv_class = hypervisor.GetHypervisor(hv_name)
1334
          hv_class.CheckParameterSyntax(hv_params)
1335
          _CheckHVParams(self, node_list, hv_name, hv_params)
1336

    
1337
  def Exec(self, feedback_fn):
1338
    """Change the parameters of the cluster.
1339

1340
    """
1341
    if self.op.vg_name is not None:
1342
      if self.op.vg_name != self.cfg.GetVGName():
1343
        self.cfg.SetVGName(self.op.vg_name)
1344
      else:
1345
        feedback_fn("Cluster LVM configuration already in desired"
1346
                    " state, not changing")
1347
    if self.op.hvparams:
1348
      self.cluster.hvparams = self.new_hvparams
1349
    if self.op.enabled_hypervisors is not None:
1350
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351
    if self.op.beparams:
1352
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353
    if self.op.candidate_pool_size is not None:
1354
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1355

    
1356
    self.cfg.Update(self.cluster)
1357

    
1358
    # we want to update nodes after the cluster so that if any errors
1359
    # happen, we have recorded and saved the cluster info
1360
    if self.op.candidate_pool_size is not None:
1361
      node_info = self.cfg.GetAllNodesInfo().values()
1362
      num_candidates = len([node for node in node_info
1363
                            if node.master_candidate])
1364
      num_nodes = len(node_info)
1365
      if num_candidates < self.op.candidate_pool_size:
1366
        random.shuffle(node_info)
1367
        for node in node_info:
1368
          if num_candidates >= self.op.candidate_pool_size:
1369
            break
1370
          if node.master_candidate:
1371
            continue
1372
          node.master_candidate = True
1373
          self.LogInfo("Promoting node %s to master candidate", node.name)
1374
          self.cfg.Update(node)
1375
          self.context.ReaddNode(node)
1376
          num_candidates += 1
1377
      elif num_candidates > self.op.candidate_pool_size:
1378
        self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379
                     " of candidate_pool_size (%d)" %
1380
                     (num_candidates, self.op.candidate_pool_size))
1381

    
1382

    
1383
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384
  """Sleep and poll for an instance's disk to sync.
1385

1386
  """
1387
  if not instance.disks:
1388
    return True
1389

    
1390
  if not oneshot:
1391
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1392

    
1393
  node = instance.primary_node
1394

    
1395
  for dev in instance.disks:
1396
    lu.cfg.SetDiskID(dev, node)
1397

    
1398
  retries = 0
1399
  while True:
1400
    max_time = 0
1401
    done = True
1402
    cumul_degraded = False
1403
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404
    if rstats.failed or not rstats.data:
1405
      lu.LogWarning("Can't get any data from node %s", node)
1406
      retries += 1
1407
      if retries >= 10:
1408
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1409
                                 " aborting." % node)
1410
      time.sleep(6)
1411
      continue
1412
    rstats = rstats.data
1413
    retries = 0
1414
    for i in range(len(rstats)):
1415
      mstat = rstats[i]
1416
      if mstat is None:
1417
        lu.LogWarning("Can't compute data for node %s/%s",
1418
                           node, instance.disks[i].iv_name)
1419
        continue
1420
      # we ignore the ldisk parameter
1421
      perc_done, est_time, is_degraded, _ = mstat
1422
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423
      if perc_done is not None:
1424
        done = False
1425
        if est_time is not None:
1426
          rem_time = "%d estimated seconds remaining" % est_time
1427
          max_time = est_time
1428
        else:
1429
          rem_time = "no time estimate"
1430
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431
                        (instance.disks[i].iv_name, perc_done, rem_time))
1432
    if done or oneshot:
1433
      break
1434

    
1435
    time.sleep(min(60, max_time))
1436

    
1437
  if done:
1438
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439
  return not cumul_degraded
1440

    
1441

    
1442
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443
  """Check that mirrors are not degraded.
1444

1445
  The ldisk parameter, if True, will change the test from the
1446
  is_degraded attribute (which represents overall non-ok status for
1447
  the device(s)) to the ldisk (representing the local storage status).
1448

1449
  """
1450
  lu.cfg.SetDiskID(dev, node)
1451
  if ldisk:
1452
    idx = 6
1453
  else:
1454
    idx = 5
1455

    
1456
  result = True
1457
  if on_primary or dev.AssembleOnSecondary():
1458
    rstats = lu.rpc.call_blockdev_find(node, dev)
1459
    if rstats.failed or not rstats.data:
1460
      logging.warning("Node %s: disk degraded, not found or node down", node)
1461
      result = False
1462
    else:
1463
      result = result and (not rstats.data[idx])
1464
  if dev.children:
1465
    for child in dev.children:
1466
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1467

    
1468
  return result
1469

    
1470

    
1471
class LUDiagnoseOS(NoHooksLU):
1472
  """Logical unit for OS diagnose/query.
1473

1474
  """
1475
  _OP_REQP = ["output_fields", "names"]
1476
  REQ_BGL = False
1477
  _FIELDS_STATIC = utils.FieldSet()
1478
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1479

    
1480
  def ExpandNames(self):
1481
    if self.op.names:
1482
      raise errors.OpPrereqError("Selective OS query not supported")
1483

    
1484
    _CheckOutputFields(static=self._FIELDS_STATIC,
1485
                       dynamic=self._FIELDS_DYNAMIC,
1486
                       selected=self.op.output_fields)
1487

    
1488
    # Lock all nodes, in shared mode
1489
    self.needed_locks = {}
1490
    self.share_locks[locking.LEVEL_NODE] = 1
1491
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1492

    
1493
  def CheckPrereq(self):
1494
    """Check prerequisites.
1495

1496
    """
1497

    
1498
  @staticmethod
1499
  def _DiagnoseByOS(node_list, rlist):
1500
    """Remaps a per-node return list into an a per-os per-node dictionary
1501

1502
    @param node_list: a list with the names of all nodes
1503
    @param rlist: a map with node names as keys and OS objects as values
1504

1505
    @rtype: dict
1506
    @returns: a dictionary with osnames as keys and as value another map, with
1507
        nodes as keys and list of OS objects as values, eg::
1508

1509
          {"debian-etch": {"node1": [<object>,...],
1510
                           "node2": [<object>,]}
1511
          }
1512

1513
    """
1514
    all_os = {}
1515
    for node_name, nr in rlist.iteritems():
1516
      if nr.failed or not nr.data:
1517
        continue
1518
      for os_obj in nr.data:
1519
        if os_obj.name not in all_os:
1520
          # build a list of nodes for this os containing empty lists
1521
          # for each node in node_list
1522
          all_os[os_obj.name] = {}
1523
          for nname in node_list:
1524
            all_os[os_obj.name][nname] = []
1525
        all_os[os_obj.name][node_name].append(os_obj)
1526
    return all_os
1527

    
1528
  def Exec(self, feedback_fn):
1529
    """Compute the list of OSes.
1530

1531
    """
1532
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1533
    node_data = self.rpc.call_os_diagnose(node_list)
1534
    if node_data == False:
1535
      raise errors.OpExecError("Can't gather the list of OSes")
1536
    pol = self._DiagnoseByOS(node_list, node_data)
1537
    output = []
1538
    for os_name, os_data in pol.iteritems():
1539
      row = []
1540
      for field in self.op.output_fields:
1541
        if field == "name":
1542
          val = os_name
1543
        elif field == "valid":
1544
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1545
        elif field == "node_status":
1546
          val = {}
1547
          for node_name, nos_list in os_data.iteritems():
1548
            val[node_name] = [(v.status, v.path) for v in nos_list]
1549
        else:
1550
          raise errors.ParameterError(field)
1551
        row.append(val)
1552
      output.append(row)
1553

    
1554
    return output
1555

    
1556

    
1557
class LURemoveNode(LogicalUnit):
1558
  """Logical unit for removing a node.
1559

1560
  """
1561
  HPATH = "node-remove"
1562
  HTYPE = constants.HTYPE_NODE
1563
  _OP_REQP = ["node_name"]
1564

    
1565
  def BuildHooksEnv(self):
1566
    """Build hooks env.
1567

1568
    This doesn't run on the target node in the pre phase as a failed
1569
    node would then be impossible to remove.
1570

1571
    """
1572
    env = {
1573
      "OP_TARGET": self.op.node_name,
1574
      "NODE_NAME": self.op.node_name,
1575
      }
1576
    all_nodes = self.cfg.GetNodeList()
1577
    all_nodes.remove(self.op.node_name)
1578
    return env, all_nodes, all_nodes
1579

    
1580
  def CheckPrereq(self):
1581
    """Check prerequisites.
1582

1583
    This checks:
1584
     - the node exists in the configuration
1585
     - it does not have primary or secondary instances
1586
     - it's not the master
1587

1588
    Any errors are signalled by raising errors.OpPrereqError.
1589

1590
    """
1591
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1592
    if node is None:
1593
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1594

    
1595
    instance_list = self.cfg.GetInstanceList()
1596

    
1597
    masternode = self.cfg.GetMasterNode()
1598
    if node.name == masternode:
1599
      raise errors.OpPrereqError("Node is the master node,"
1600
                                 " you need to failover first.")
1601

    
1602
    for instance_name in instance_list:
1603
      instance = self.cfg.GetInstanceInfo(instance_name)
1604
      if node.name == instance.primary_node:
1605
        raise errors.OpPrereqError("Instance %s still running on the node,"
1606
                                   " please remove first." % instance_name)
1607
      if node.name in instance.secondary_nodes:
1608
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609
                                   " please remove first." % instance_name)
1610
    self.op.node_name = node.name
1611
    self.node = node
1612

    
1613
  def Exec(self, feedback_fn):
1614
    """Removes the node from the cluster.
1615

1616
    """
1617
    node = self.node
1618
    logging.info("Stopping the node daemon and removing configs from node %s",
1619
                 node.name)
1620

    
1621
    self.context.RemoveNode(node.name)
1622

    
1623
    self.rpc.call_node_leave_cluster(node.name)
1624

    
1625

    
1626
class LUQueryNodes(NoHooksLU):
1627
  """Logical unit for querying nodes.
1628

1629
  """
1630
  _OP_REQP = ["output_fields", "names"]
1631
  REQ_BGL = False
1632
  _FIELDS_DYNAMIC = utils.FieldSet(
1633
    "dtotal", "dfree",
1634
    "mtotal", "mnode", "mfree",
1635
    "bootid",
1636
    "ctotal",
1637
    )
1638

    
1639
  _FIELDS_STATIC = utils.FieldSet(
1640
    "name", "pinst_cnt", "sinst_cnt",
1641
    "pinst_list", "sinst_list",
1642
    "pip", "sip", "tags",
1643
    "serial_no",
1644
    "master_candidate",
1645
    "master",
1646
    )
1647

    
1648
  def ExpandNames(self):
1649
    _CheckOutputFields(static=self._FIELDS_STATIC,
1650
                       dynamic=self._FIELDS_DYNAMIC,
1651
                       selected=self.op.output_fields)
1652

    
1653
    self.needed_locks = {}
1654
    self.share_locks[locking.LEVEL_NODE] = 1
1655

    
1656
    if self.op.names:
1657
      self.wanted = _GetWantedNodes(self, self.op.names)
1658
    else:
1659
      self.wanted = locking.ALL_SET
1660

    
1661
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1662
    if self.do_locking:
1663
      # if we don't request only static fields, we need to lock the nodes
1664
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1665

    
1666

    
1667
  def CheckPrereq(self):
1668
    """Check prerequisites.
1669

1670
    """
1671
    # The validation of the node list is done in the _GetWantedNodes,
1672
    # if non empty, and if empty, there's no validation to do
1673
    pass
1674

    
1675
  def Exec(self, feedback_fn):
1676
    """Computes the list of nodes and their attributes.
1677

1678
    """
1679
    all_info = self.cfg.GetAllNodesInfo()
1680
    if self.do_locking:
1681
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1682
    elif self.wanted != locking.ALL_SET:
1683
      nodenames = self.wanted
1684
      missing = set(nodenames).difference(all_info.keys())
1685
      if missing:
1686
        raise errors.OpExecError(
1687
          "Some nodes were removed before retrieving their data: %s" % missing)
1688
    else:
1689
      nodenames = all_info.keys()
1690

    
1691
    nodenames = utils.NiceSort(nodenames)
1692
    nodelist = [all_info[name] for name in nodenames]
1693

    
1694
    # begin data gathering
1695

    
1696
    if self.do_locking:
1697
      live_data = {}
1698
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1699
                                          self.cfg.GetHypervisorType())
1700
      for name in nodenames:
1701
        nodeinfo = node_data[name]
1702
        if not nodeinfo.failed and nodeinfo.data:
1703
          nodeinfo = nodeinfo.data
1704
          fn = utils.TryConvert
1705
          live_data[name] = {
1706
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1707
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1708
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1709
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1710
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1711
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1712
            "bootid": nodeinfo.get('bootid', None),
1713
            }
1714
        else:
1715
          live_data[name] = {}
1716
    else:
1717
      live_data = dict.fromkeys(nodenames, {})
1718

    
1719
    node_to_primary = dict([(name, set()) for name in nodenames])
1720
    node_to_secondary = dict([(name, set()) for name in nodenames])
1721

    
1722
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1723
                             "sinst_cnt", "sinst_list"))
1724
    if inst_fields & frozenset(self.op.output_fields):
1725
      instancelist = self.cfg.GetInstanceList()
1726

    
1727
      for instance_name in instancelist:
1728
        inst = self.cfg.GetInstanceInfo(instance_name)
1729
        if inst.primary_node in node_to_primary:
1730
          node_to_primary[inst.primary_node].add(inst.name)
1731
        for secnode in inst.secondary_nodes:
1732
          if secnode in node_to_secondary:
1733
            node_to_secondary[secnode].add(inst.name)
1734

    
1735
    master_node = self.cfg.GetMasterNode()
1736

    
1737
    # end data gathering
1738

    
1739
    output = []
1740
    for node in nodelist:
1741
      node_output = []
1742
      for field in self.op.output_fields:
1743
        if field == "name":
1744
          val = node.name
1745
        elif field == "pinst_list":
1746
          val = list(node_to_primary[node.name])
1747
        elif field == "sinst_list":
1748
          val = list(node_to_secondary[node.name])
1749
        elif field == "pinst_cnt":
1750
          val = len(node_to_primary[node.name])
1751
        elif field == "sinst_cnt":
1752
          val = len(node_to_secondary[node.name])
1753
        elif field == "pip":
1754
          val = node.primary_ip
1755
        elif field == "sip":
1756
          val = node.secondary_ip
1757
        elif field == "tags":
1758
          val = list(node.GetTags())
1759
        elif field == "serial_no":
1760
          val = node.serial_no
1761
        elif field == "master_candidate":
1762
          val = node.master_candidate
1763
        elif field == "master":
1764
          val = node.name == master_node
1765
        elif self._FIELDS_DYNAMIC.Matches(field):
1766
          val = live_data[node.name].get(field, None)
1767
        else:
1768
          raise errors.ParameterError(field)
1769
        node_output.append(val)
1770
      output.append(node_output)
1771

    
1772
    return output
1773

    
1774

    
1775
class LUQueryNodeVolumes(NoHooksLU):
1776
  """Logical unit for getting volumes on node(s).
1777

1778
  """
1779
  _OP_REQP = ["nodes", "output_fields"]
1780
  REQ_BGL = False
1781
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1782
  _FIELDS_STATIC = utils.FieldSet("node")
1783

    
1784
  def ExpandNames(self):
1785
    _CheckOutputFields(static=self._FIELDS_STATIC,
1786
                       dynamic=self._FIELDS_DYNAMIC,
1787
                       selected=self.op.output_fields)
1788

    
1789
    self.needed_locks = {}
1790
    self.share_locks[locking.LEVEL_NODE] = 1
1791
    if not self.op.nodes:
1792
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1793
    else:
1794
      self.needed_locks[locking.LEVEL_NODE] = \
1795
        _GetWantedNodes(self, self.op.nodes)
1796

    
1797
  def CheckPrereq(self):
1798
    """Check prerequisites.
1799

1800
    This checks that the fields required are valid output fields.
1801

1802
    """
1803
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1804

    
1805
  def Exec(self, feedback_fn):
1806
    """Computes the list of nodes and their attributes.
1807

1808
    """
1809
    nodenames = self.nodes
1810
    volumes = self.rpc.call_node_volumes(nodenames)
1811

    
1812
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1813
             in self.cfg.GetInstanceList()]
1814

    
1815
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1816

    
1817
    output = []
1818
    for node in nodenames:
1819
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1820
        continue
1821

    
1822
      node_vols = volumes[node].data[:]
1823
      node_vols.sort(key=lambda vol: vol['dev'])
1824

    
1825
      for vol in node_vols:
1826
        node_output = []
1827
        for field in self.op.output_fields:
1828
          if field == "node":
1829
            val = node
1830
          elif field == "phys":
1831
            val = vol['dev']
1832
          elif field == "vg":
1833
            val = vol['vg']
1834
          elif field == "name":
1835
            val = vol['name']
1836
          elif field == "size":
1837
            val = int(float(vol['size']))
1838
          elif field == "instance":
1839
            for inst in ilist:
1840
              if node not in lv_by_node[inst]:
1841
                continue
1842
              if vol['name'] in lv_by_node[inst][node]:
1843
                val = inst.name
1844
                break
1845
            else:
1846
              val = '-'
1847
          else:
1848
            raise errors.ParameterError(field)
1849
          node_output.append(str(val))
1850

    
1851
        output.append(node_output)
1852

    
1853
    return output
1854

    
1855

    
1856
class LUAddNode(LogicalUnit):
1857
  """Logical unit for adding node to the cluster.
1858

1859
  """
1860
  HPATH = "node-add"
1861
  HTYPE = constants.HTYPE_NODE
1862
  _OP_REQP = ["node_name"]
1863

    
1864
  def BuildHooksEnv(self):
1865
    """Build hooks env.
1866

1867
    This will run on all nodes before, and on all nodes + the new node after.
1868

1869
    """
1870
    env = {
1871
      "OP_TARGET": self.op.node_name,
1872
      "NODE_NAME": self.op.node_name,
1873
      "NODE_PIP": self.op.primary_ip,
1874
      "NODE_SIP": self.op.secondary_ip,
1875
      }
1876
    nodes_0 = self.cfg.GetNodeList()
1877
    nodes_1 = nodes_0 + [self.op.node_name, ]
1878
    return env, nodes_0, nodes_1
1879

    
1880
  def CheckPrereq(self):
1881
    """Check prerequisites.
1882

1883
    This checks:
1884
     - the new node is not already in the config
1885
     - it is resolvable
1886
     - its parameters (single/dual homed) matches the cluster
1887

1888
    Any errors are signalled by raising errors.OpPrereqError.
1889

1890
    """
1891
    node_name = self.op.node_name
1892
    cfg = self.cfg
1893

    
1894
    dns_data = utils.HostInfo(node_name)
1895

    
1896
    node = dns_data.name
1897
    primary_ip = self.op.primary_ip = dns_data.ip
1898
    secondary_ip = getattr(self.op, "secondary_ip", None)
1899
    if secondary_ip is None:
1900
      secondary_ip = primary_ip
1901
    if not utils.IsValidIP(secondary_ip):
1902
      raise errors.OpPrereqError("Invalid secondary IP given")
1903
    self.op.secondary_ip = secondary_ip
1904

    
1905
    node_list = cfg.GetNodeList()
1906
    if not self.op.readd and node in node_list:
1907
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1908
                                 node)
1909
    elif self.op.readd and node not in node_list:
1910
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1911

    
1912
    for existing_node_name in node_list:
1913
      existing_node = cfg.GetNodeInfo(existing_node_name)
1914

    
1915
      if self.op.readd and node == existing_node_name:
1916
        if (existing_node.primary_ip != primary_ip or
1917
            existing_node.secondary_ip != secondary_ip):
1918
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1919
                                     " address configuration as before")
1920
        continue
1921

    
1922
      if (existing_node.primary_ip == primary_ip or
1923
          existing_node.secondary_ip == primary_ip or
1924
          existing_node.primary_ip == secondary_ip or
1925
          existing_node.secondary_ip == secondary_ip):
1926
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1927
                                   " existing node %s" % existing_node.name)
1928

    
1929
    # check that the type of the node (single versus dual homed) is the
1930
    # same as for the master
1931
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1932
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1933
    newbie_singlehomed = secondary_ip == primary_ip
1934
    if master_singlehomed != newbie_singlehomed:
1935
      if master_singlehomed:
1936
        raise errors.OpPrereqError("The master has no private ip but the"
1937
                                   " new node has one")
1938
      else:
1939
        raise errors.OpPrereqError("The master has a private ip but the"
1940
                                   " new node doesn't have one")
1941

    
1942
    # checks reachablity
1943
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1944
      raise errors.OpPrereqError("Node not reachable by ping")
1945

    
1946
    if not newbie_singlehomed:
1947
      # check reachability from my secondary ip to newbie's secondary ip
1948
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1949
                           source=myself.secondary_ip):
1950
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1951
                                   " based ping to noded port")
1952

    
1953
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1954
    node_info = self.cfg.GetAllNodesInfo().values()
1955
    num_candidates = len([n for n in node_info
1956
                          if n.master_candidate])
1957
    master_candidate = num_candidates < cp_size
1958

    
1959
    self.new_node = objects.Node(name=node,
1960
                                 primary_ip=primary_ip,
1961
                                 secondary_ip=secondary_ip,
1962
                                 master_candidate=master_candidate)
1963

    
1964
  def Exec(self, feedback_fn):
1965
    """Adds the new node to the cluster.
1966

1967
    """
1968
    new_node = self.new_node
1969
    node = new_node.name
1970

    
1971
    # check connectivity
1972
    result = self.rpc.call_version([node])[node]
1973
    result.Raise()
1974
    if result.data:
1975
      if constants.PROTOCOL_VERSION == result.data:
1976
        logging.info("Communication to node %s fine, sw version %s match",
1977
                     node, result.data)
1978
      else:
1979
        raise errors.OpExecError("Version mismatch master version %s,"
1980
                                 " node version %s" %
1981
                                 (constants.PROTOCOL_VERSION, result.data))
1982
    else:
1983
      raise errors.OpExecError("Cannot get version from the new node")
1984

    
1985
    # setup ssh on node
1986
    logging.info("Copy ssh key to node %s", node)
1987
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1988
    keyarray = []
1989
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1990
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1991
                priv_key, pub_key]
1992

    
1993
    for i in keyfiles:
1994
      f = open(i, 'r')
1995
      try:
1996
        keyarray.append(f.read())
1997
      finally:
1998
        f.close()
1999

    
2000
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2001
                                    keyarray[2],
2002
                                    keyarray[3], keyarray[4], keyarray[5])
2003

    
2004
    if result.failed or not result.data:
2005
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2006

    
2007
    # Add node to our /etc/hosts, and add key to known_hosts
2008
    utils.AddHostToEtcHosts(new_node.name)
2009

    
2010
    if new_node.secondary_ip != new_node.primary_ip:
2011
      result = self.rpc.call_node_has_ip_address(new_node.name,
2012
                                                 new_node.secondary_ip)
2013
      if result.failed or not result.data:
2014
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2015
                                 " you gave (%s). Please fix and re-run this"
2016
                                 " command." % new_node.secondary_ip)
2017

    
2018
    node_verify_list = [self.cfg.GetMasterNode()]
2019
    node_verify_param = {
2020
      'nodelist': [node],
2021
      # TODO: do a node-net-test as well?
2022
    }
2023

    
2024
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2025
                                       self.cfg.GetClusterName())
2026
    for verifier in node_verify_list:
2027
      if result[verifier].failed or not result[verifier].data:
2028
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2029
                                 " for remote verification" % verifier)
2030
      if result[verifier].data['nodelist']:
2031
        for failed in result[verifier].data['nodelist']:
2032
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2033
                      (verifier, result[verifier]['nodelist'][failed]))
2034
        raise errors.OpExecError("ssh/hostname verification failed.")
2035

    
2036
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2037
    # including the node just added
2038
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2039
    dist_nodes = self.cfg.GetNodeList()
2040
    if not self.op.readd:
2041
      dist_nodes.append(node)
2042
    if myself.name in dist_nodes:
2043
      dist_nodes.remove(myself.name)
2044

    
2045
    logging.debug("Copying hosts and known_hosts to all nodes")
2046
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2047
      result = self.rpc.call_upload_file(dist_nodes, fname)
2048
      for to_node, to_result in result.iteritems():
2049
        if to_result.failed or not to_result.data:
2050
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2051

    
2052
    to_copy = []
2053
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2054
      to_copy.append(constants.VNC_PASSWORD_FILE)
2055
    for fname in to_copy:
2056
      result = self.rpc.call_upload_file([node], fname)
2057
      if result[node].failed or not result[node]:
2058
        logging.error("Could not copy file %s to node %s", fname, node)
2059

    
2060
    if self.op.readd:
2061
      self.context.ReaddNode(new_node)
2062
    else:
2063
      self.context.AddNode(new_node)
2064

    
2065

    
2066
class LUSetNodeParams(LogicalUnit):
2067
  """Modifies the parameters of a node.
2068

2069
  """
2070
  HPATH = "node-modify"
2071
  HTYPE = constants.HTYPE_NODE
2072
  _OP_REQP = ["node_name"]
2073
  REQ_BGL = False
2074

    
2075
  def CheckArguments(self):
2076
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2077
    if node_name is None:
2078
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2079
    self.op.node_name = node_name
2080
    if not hasattr(self.op, 'master_candidate'):
2081
      raise errors.OpPrereqError("Please pass at least one modification")
2082
    self.op.master_candidate = bool(self.op.master_candidate)
2083

    
2084
  def ExpandNames(self):
2085
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2086

    
2087
  def BuildHooksEnv(self):
2088
    """Build hooks env.
2089

2090
    This runs on the master node.
2091

2092
    """
2093
    env = {
2094
      "OP_TARGET": self.op.node_name,
2095
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2096
      }
2097
    nl = [self.cfg.GetMasterNode(),
2098
          self.op.node_name]
2099
    return env, nl, nl
2100

    
2101
  def CheckPrereq(self):
2102
    """Check prerequisites.
2103

2104
    This only checks the instance list against the existing names.
2105

2106
    """
2107
    force = self.force = self.op.force
2108

    
2109
    if self.op.master_candidate == False:
2110
      if self.op.node_name == self.cfg.GetMasterNode():
2111
        raise errors.OpPrereqError("The master node has to be a"
2112
                                   " master candidate")
2113
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2114
      node_info = self.cfg.GetAllNodesInfo().values()
2115
      num_candidates = len([node for node in node_info
2116
                            if node.master_candidate])
2117
      if num_candidates <= cp_size:
2118
        msg = ("Not enough master candidates (desired"
2119
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2120
        if force:
2121
          self.LogWarning(msg)
2122
        else:
2123
          raise errors.OpPrereqError(msg)
2124

    
2125
    return
2126

    
2127
  def Exec(self, feedback_fn):
2128
    """Modifies a node.
2129

2130
    """
2131
    node = self.cfg.GetNodeInfo(self.op.node_name)
2132

    
2133
    result = []
2134

    
2135
    if self.op.master_candidate is not None:
2136
      node.master_candidate = self.op.master_candidate
2137
      result.append(("master_candidate", str(self.op.master_candidate)))
2138

    
2139
    # this will trigger configuration file update, if needed
2140
    self.cfg.Update(node)
2141
    # this will trigger job queue propagation or cleanup
2142
    if self.op.node_name != self.cfg.GetMasterNode():
2143
      self.context.ReaddNode(node)
2144

    
2145
    return result
2146

    
2147

    
2148
class LUQueryClusterInfo(NoHooksLU):
2149
  """Query cluster configuration.
2150

2151
  """
2152
  _OP_REQP = []
2153
  REQ_BGL = False
2154

    
2155
  def ExpandNames(self):
2156
    self.needed_locks = {}
2157

    
2158
  def CheckPrereq(self):
2159
    """No prerequsites needed for this LU.
2160

2161
    """
2162
    pass
2163

    
2164
  def Exec(self, feedback_fn):
2165
    """Return cluster config.
2166

2167
    """
2168
    cluster = self.cfg.GetClusterInfo()
2169
    result = {
2170
      "software_version": constants.RELEASE_VERSION,
2171
      "protocol_version": constants.PROTOCOL_VERSION,
2172
      "config_version": constants.CONFIG_VERSION,
2173
      "os_api_version": constants.OS_API_VERSION,
2174
      "export_version": constants.EXPORT_VERSION,
2175
      "architecture": (platform.architecture()[0], platform.machine()),
2176
      "name": cluster.cluster_name,
2177
      "master": cluster.master_node,
2178
      "default_hypervisor": cluster.default_hypervisor,
2179
      "enabled_hypervisors": cluster.enabled_hypervisors,
2180
      "hvparams": cluster.hvparams,
2181
      "beparams": cluster.beparams,
2182
      "candidate_pool_size": cluster.candidate_pool_size,
2183
      }
2184

    
2185
    return result
2186

    
2187

    
2188
class LUQueryConfigValues(NoHooksLU):
2189
  """Return configuration values.
2190

2191
  """
2192
  _OP_REQP = []
2193
  REQ_BGL = False
2194
  _FIELDS_DYNAMIC = utils.FieldSet()
2195
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2196

    
2197
  def ExpandNames(self):
2198
    self.needed_locks = {}
2199

    
2200
    _CheckOutputFields(static=self._FIELDS_STATIC,
2201
                       dynamic=self._FIELDS_DYNAMIC,
2202
                       selected=self.op.output_fields)
2203

    
2204
  def CheckPrereq(self):
2205
    """No prerequisites.
2206

2207
    """
2208
    pass
2209

    
2210
  def Exec(self, feedback_fn):
2211
    """Dump a representation of the cluster config to the standard output.
2212

2213
    """
2214
    values = []
2215
    for field in self.op.output_fields:
2216
      if field == "cluster_name":
2217
        entry = self.cfg.GetClusterName()
2218
      elif field == "master_node":
2219
        entry = self.cfg.GetMasterNode()
2220
      elif field == "drain_flag":
2221
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2222
      else:
2223
        raise errors.ParameterError(field)
2224
      values.append(entry)
2225
    return values
2226

    
2227

    
2228
class LUActivateInstanceDisks(NoHooksLU):
2229
  """Bring up an instance's disks.
2230

2231
  """
2232
  _OP_REQP = ["instance_name"]
2233
  REQ_BGL = False
2234

    
2235
  def ExpandNames(self):
2236
    self._ExpandAndLockInstance()
2237
    self.needed_locks[locking.LEVEL_NODE] = []
2238
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2239

    
2240
  def DeclareLocks(self, level):
2241
    if level == locking.LEVEL_NODE:
2242
      self._LockInstancesNodes()
2243

    
2244
  def CheckPrereq(self):
2245
    """Check prerequisites.
2246

2247
    This checks that the instance is in the cluster.
2248

2249
    """
2250
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2251
    assert self.instance is not None, \
2252
      "Cannot retrieve locked instance %s" % self.op.instance_name
2253

    
2254
  def Exec(self, feedback_fn):
2255
    """Activate the disks.
2256

2257
    """
2258
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2259
    if not disks_ok:
2260
      raise errors.OpExecError("Cannot activate block devices")
2261

    
2262
    return disks_info
2263

    
2264

    
2265
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2266
  """Prepare the block devices for an instance.
2267

2268
  This sets up the block devices on all nodes.
2269

2270
  @type lu: L{LogicalUnit}
2271
  @param lu: the logical unit on whose behalf we execute
2272
  @type instance: L{objects.Instance}
2273
  @param instance: the instance for whose disks we assemble
2274
  @type ignore_secondaries: boolean
2275
  @param ignore_secondaries: if true, errors on secondary nodes
2276
      won't result in an error return from the function
2277
  @return: False if the operation failed, otherwise a list of
2278
      (host, instance_visible_name, node_visible_name)
2279
      with the mapping from node devices to instance devices
2280

2281
  """
2282
  device_info = []
2283
  disks_ok = True
2284
  iname = instance.name
2285
  # With the two passes mechanism we try to reduce the window of
2286
  # opportunity for the race condition of switching DRBD to primary
2287
  # before handshaking occured, but we do not eliminate it
2288

    
2289
  # The proper fix would be to wait (with some limits) until the
2290
  # connection has been made and drbd transitions from WFConnection
2291
  # into any other network-connected state (Connected, SyncTarget,
2292
  # SyncSource, etc.)
2293

    
2294
  # 1st pass, assemble on all nodes in secondary mode
2295
  for inst_disk in instance.disks:
2296
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2297
      lu.cfg.SetDiskID(node_disk, node)
2298
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2299
      if result.failed or not result:
2300
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2301
                           " (is_primary=False, pass=1)",
2302
                           inst_disk.iv_name, node)
2303
        if not ignore_secondaries:
2304
          disks_ok = False
2305

    
2306
  # FIXME: race condition on drbd migration to primary
2307

    
2308
  # 2nd pass, do only the primary node
2309
  for inst_disk in instance.disks:
2310
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2311
      if node != instance.primary_node:
2312
        continue
2313
      lu.cfg.SetDiskID(node_disk, node)
2314
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2315
      if result.failed or not result:
2316
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2317
                           " (is_primary=True, pass=2)",
2318
                           inst_disk.iv_name, node)
2319
        disks_ok = False
2320
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2321

    
2322
  # leave the disks configured for the primary node
2323
  # this is a workaround that would be fixed better by
2324
  # improving the logical/physical id handling
2325
  for disk in instance.disks:
2326
    lu.cfg.SetDiskID(disk, instance.primary_node)
2327

    
2328
  return disks_ok, device_info
2329

    
2330

    
2331
def _StartInstanceDisks(lu, instance, force):
2332
  """Start the disks of an instance.
2333

2334
  """
2335
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2336
                                           ignore_secondaries=force)
2337
  if not disks_ok:
2338
    _ShutdownInstanceDisks(lu, instance)
2339
    if force is not None and not force:
2340
      lu.proc.LogWarning("", hint="If the message above refers to a"
2341
                         " secondary node,"
2342
                         " you can retry the operation using '--force'.")
2343
    raise errors.OpExecError("Disk consistency error")
2344

    
2345

    
2346
class LUDeactivateInstanceDisks(NoHooksLU):
2347
  """Shutdown an instance's disks.
2348

2349
  """
2350
  _OP_REQP = ["instance_name"]
2351
  REQ_BGL = False
2352

    
2353
  def ExpandNames(self):
2354
    self._ExpandAndLockInstance()
2355
    self.needed_locks[locking.LEVEL_NODE] = []
2356
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2357

    
2358
  def DeclareLocks(self, level):
2359
    if level == locking.LEVEL_NODE:
2360
      self._LockInstancesNodes()
2361

    
2362
  def CheckPrereq(self):
2363
    """Check prerequisites.
2364

2365
    This checks that the instance is in the cluster.
2366

2367
    """
2368
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2369
    assert self.instance is not None, \
2370
      "Cannot retrieve locked instance %s" % self.op.instance_name
2371

    
2372
  def Exec(self, feedback_fn):
2373
    """Deactivate the disks
2374

2375
    """
2376
    instance = self.instance
2377
    _SafeShutdownInstanceDisks(self, instance)
2378

    
2379

    
2380
def _SafeShutdownInstanceDisks(lu, instance):
2381
  """Shutdown block devices of an instance.
2382

2383
  This function checks if an instance is running, before calling
2384
  _ShutdownInstanceDisks.
2385

2386
  """
2387
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2388
                                      [instance.hypervisor])
2389
  ins_l = ins_l[instance.primary_node]
2390
  if ins_l.failed or not isinstance(ins_l.data, list):
2391
    raise errors.OpExecError("Can't contact node '%s'" %
2392
                             instance.primary_node)
2393

    
2394
  if instance.name in ins_l.data:
2395
    raise errors.OpExecError("Instance is running, can't shutdown"
2396
                             " block devices.")
2397

    
2398
  _ShutdownInstanceDisks(lu, instance)
2399

    
2400

    
2401
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2402
  """Shutdown block devices of an instance.
2403

2404
  This does the shutdown on all nodes of the instance.
2405

2406
  If the ignore_primary is false, errors on the primary node are
2407
  ignored.
2408

2409
  """
2410
  result = True
2411
  for disk in instance.disks:
2412
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2413
      lu.cfg.SetDiskID(top_disk, node)
2414
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2415
      if result.failed or not result.data:
2416
        logging.error("Could not shutdown block device %s on node %s",
2417
                      disk.iv_name, node)
2418
        if not ignore_primary or node != instance.primary_node:
2419
          result = False
2420
  return result
2421

    
2422

    
2423
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2424
  """Checks if a node has enough free memory.
2425

2426
  This function check if a given node has the needed amount of free
2427
  memory. In case the node has less memory or we cannot get the
2428
  information from the node, this function raise an OpPrereqError
2429
  exception.
2430

2431
  @type lu: C{LogicalUnit}
2432
  @param lu: a logical unit from which we get configuration data
2433
  @type node: C{str}
2434
  @param node: the node to check
2435
  @type reason: C{str}
2436
  @param reason: string to use in the error message
2437
  @type requested: C{int}
2438
  @param requested: the amount of memory in MiB to check for
2439
  @type hypervisor: C{str}
2440
  @param hypervisor: the hypervisor to ask for memory stats
2441
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2442
      we cannot check the node
2443

2444
  """
2445
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2446
  nodeinfo[node].Raise()
2447
  free_mem = nodeinfo[node].data.get('memory_free')
2448
  if not isinstance(free_mem, int):
2449
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2450
                             " was '%s'" % (node, free_mem))
2451
  if requested > free_mem:
2452
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2453
                             " needed %s MiB, available %s MiB" %
2454
                             (node, reason, requested, free_mem))
2455

    
2456

    
2457
class LUStartupInstance(LogicalUnit):
2458
  """Starts an instance.
2459

2460
  """
2461
  HPATH = "instance-start"
2462
  HTYPE = constants.HTYPE_INSTANCE
2463
  _OP_REQP = ["instance_name", "force"]
2464
  REQ_BGL = False
2465

    
2466
  def ExpandNames(self):
2467
    self._ExpandAndLockInstance()
2468

    
2469
  def BuildHooksEnv(self):
2470
    """Build hooks env.
2471

2472
    This runs on master, primary and secondary nodes of the instance.
2473

2474
    """
2475
    env = {
2476
      "FORCE": self.op.force,
2477
      }
2478
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2479
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2480
          list(self.instance.secondary_nodes))
2481
    return env, nl, nl
2482

    
2483
  def CheckPrereq(self):
2484
    """Check prerequisites.
2485

2486
    This checks that the instance is in the cluster.
2487

2488
    """
2489
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2490
    assert self.instance is not None, \
2491
      "Cannot retrieve locked instance %s" % self.op.instance_name
2492

    
2493
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2494
    # check bridges existance
2495
    _CheckInstanceBridgesExist(self, instance)
2496

    
2497
    _CheckNodeFreeMemory(self, instance.primary_node,
2498
                         "starting instance %s" % instance.name,
2499
                         bep[constants.BE_MEMORY], instance.hypervisor)
2500

    
2501
  def Exec(self, feedback_fn):
2502
    """Start the instance.
2503

2504
    """
2505
    instance = self.instance
2506
    force = self.op.force
2507
    extra_args = getattr(self.op, "extra_args", "")
2508

    
2509
    self.cfg.MarkInstanceUp(instance.name)
2510

    
2511
    node_current = instance.primary_node
2512

    
2513
    _StartInstanceDisks(self, instance, force)
2514

    
2515
    result = self.rpc.call_instance_start(node_current, instance, extra_args)
2516
    if result.failed or not result.data:
2517
      _ShutdownInstanceDisks(self, instance)
2518
      raise errors.OpExecError("Could not start instance")
2519

    
2520

    
2521
class LURebootInstance(LogicalUnit):
2522
  """Reboot an instance.
2523

2524
  """
2525
  HPATH = "instance-reboot"
2526
  HTYPE = constants.HTYPE_INSTANCE
2527
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2528
  REQ_BGL = False
2529

    
2530
  def ExpandNames(self):
2531
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2532
                                   constants.INSTANCE_REBOOT_HARD,
2533
                                   constants.INSTANCE_REBOOT_FULL]:
2534
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2535
                                  (constants.INSTANCE_REBOOT_SOFT,
2536
                                   constants.INSTANCE_REBOOT_HARD,
2537
                                   constants.INSTANCE_REBOOT_FULL))
2538
    self._ExpandAndLockInstance()
2539

    
2540
  def BuildHooksEnv(self):
2541
    """Build hooks env.
2542

2543
    This runs on master, primary and secondary nodes of the instance.
2544

2545
    """
2546
    env = {
2547
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2548
      }
2549
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2550
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2551
          list(self.instance.secondary_nodes))
2552
    return env, nl, nl
2553

    
2554
  def CheckPrereq(self):
2555
    """Check prerequisites.
2556

2557
    This checks that the instance is in the cluster.
2558

2559
    """
2560
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2561
    assert self.instance is not None, \
2562
      "Cannot retrieve locked instance %s" % self.op.instance_name
2563

    
2564
    # check bridges existance
2565
    _CheckInstanceBridgesExist(self, instance)
2566

    
2567
  def Exec(self, feedback_fn):
2568
    """Reboot the instance.
2569

2570
    """
2571
    instance = self.instance
2572
    ignore_secondaries = self.op.ignore_secondaries
2573
    reboot_type = self.op.reboot_type
2574
    extra_args = getattr(self.op, "extra_args", "")
2575

    
2576
    node_current = instance.primary_node
2577

    
2578
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2579
                       constants.INSTANCE_REBOOT_HARD]:
2580
      result = self.rpc.call_instance_reboot(node_current, instance,
2581
                                             reboot_type, extra_args)
2582
      if result.failed or not result.data:
2583
        raise errors.OpExecError("Could not reboot instance")
2584
    else:
2585
      if not self.rpc.call_instance_shutdown(node_current, instance):
2586
        raise errors.OpExecError("could not shutdown instance for full reboot")
2587
      _ShutdownInstanceDisks(self, instance)
2588
      _StartInstanceDisks(self, instance, ignore_secondaries)
2589
      result = self.rpc.call_instance_start(node_current, instance, extra_args)
2590
      if result.failed or not result.data:
2591
        _ShutdownInstanceDisks(self, instance)
2592
        raise errors.OpExecError("Could not start instance for full reboot")
2593

    
2594
    self.cfg.MarkInstanceUp(instance.name)
2595

    
2596

    
2597
class LUShutdownInstance(LogicalUnit):
2598
  """Shutdown an instance.
2599

2600
  """
2601
  HPATH = "instance-stop"
2602
  HTYPE = constants.HTYPE_INSTANCE
2603
  _OP_REQP = ["instance_name"]
2604
  REQ_BGL = False
2605

    
2606
  def ExpandNames(self):
2607
    self._ExpandAndLockInstance()
2608

    
2609
  def BuildHooksEnv(self):
2610
    """Build hooks env.
2611

2612
    This runs on master, primary and secondary nodes of the instance.
2613

2614
    """
2615
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2616
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2617
          list(self.instance.secondary_nodes))
2618
    return env, nl, nl
2619

    
2620
  def CheckPrereq(self):
2621
    """Check prerequisites.
2622

2623
    This checks that the instance is in the cluster.
2624

2625
    """
2626
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2627
    assert self.instance is not None, \
2628
      "Cannot retrieve locked instance %s" % self.op.instance_name
2629

    
2630
  def Exec(self, feedback_fn):
2631
    """Shutdown the instance.
2632

2633
    """
2634
    instance = self.instance
2635
    node_current = instance.primary_node
2636
    self.cfg.MarkInstanceDown(instance.name)
2637
    result = self.rpc.call_instance_shutdown(node_current, instance)
2638
    if result.failed or not result.data:
2639
      self.proc.LogWarning("Could not shutdown instance")
2640

    
2641
    _ShutdownInstanceDisks(self, instance)
2642

    
2643

    
2644
class LUReinstallInstance(LogicalUnit):
2645
  """Reinstall an instance.
2646

2647
  """
2648
  HPATH = "instance-reinstall"
2649
  HTYPE = constants.HTYPE_INSTANCE
2650
  _OP_REQP = ["instance_name"]
2651
  REQ_BGL = False
2652

    
2653
  def ExpandNames(self):
2654
    self._ExpandAndLockInstance()
2655

    
2656
  def BuildHooksEnv(self):
2657
    """Build hooks env.
2658

2659
    This runs on master, primary and secondary nodes of the instance.
2660

2661
    """
2662
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2663
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2664
          list(self.instance.secondary_nodes))
2665
    return env, nl, nl
2666

    
2667
  def CheckPrereq(self):
2668
    """Check prerequisites.
2669

2670
    This checks that the instance is in the cluster and is not running.
2671

2672
    """
2673
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2674
    assert instance is not None, \
2675
      "Cannot retrieve locked instance %s" % self.op.instance_name
2676

    
2677
    if instance.disk_template == constants.DT_DISKLESS:
2678
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2679
                                 self.op.instance_name)
2680
    if instance.status != "down":
2681
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2682
                                 self.op.instance_name)
2683
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2684
                                              instance.name,
2685
                                              instance.hypervisor)
2686
    if remote_info.failed or remote_info.data:
2687
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2688
                                 (self.op.instance_name,
2689
                                  instance.primary_node))
2690

    
2691
    self.op.os_type = getattr(self.op, "os_type", None)
2692
    if self.op.os_type is not None:
2693
      # OS verification
2694
      pnode = self.cfg.GetNodeInfo(
2695
        self.cfg.ExpandNodeName(instance.primary_node))
2696
      if pnode is None:
2697
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2698
                                   self.op.pnode)
2699
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2700
      result.Raise()
2701
      if not isinstance(result.data, objects.OS):
2702
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2703
                                   " primary node"  % self.op.os_type)
2704

    
2705
    self.instance = instance
2706

    
2707
  def Exec(self, feedback_fn):
2708
    """Reinstall the instance.
2709

2710
    """
2711
    inst = self.instance
2712

    
2713
    if self.op.os_type is not None:
2714
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2715
      inst.os = self.op.os_type
2716
      self.cfg.Update(inst)
2717

    
2718
    _StartInstanceDisks(self, inst, None)
2719
    try:
2720
      feedback_fn("Running the instance OS create scripts...")
2721
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2722
      result.Raise()
2723
      if not result.data:
2724
        raise errors.OpExecError("Could not install OS for instance %s"
2725
                                 " on node %s" %
2726
                                 (inst.name, inst.primary_node))
2727
    finally:
2728
      _ShutdownInstanceDisks(self, inst)
2729

    
2730

    
2731
class LURenameInstance(LogicalUnit):
2732
  """Rename an instance.
2733

2734
  """
2735
  HPATH = "instance-rename"
2736
  HTYPE = constants.HTYPE_INSTANCE
2737
  _OP_REQP = ["instance_name", "new_name"]
2738

    
2739
  def BuildHooksEnv(self):
2740
    """Build hooks env.
2741

2742
    This runs on master, primary and secondary nodes of the instance.
2743

2744
    """
2745
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2746
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2747
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2748
          list(self.instance.secondary_nodes))
2749
    return env, nl, nl
2750

    
2751
  def CheckPrereq(self):
2752
    """Check prerequisites.
2753

2754
    This checks that the instance is in the cluster and is not running.
2755

2756
    """
2757
    instance = self.cfg.GetInstanceInfo(
2758
      self.cfg.ExpandInstanceName(self.op.instance_name))
2759
    if instance is None:
2760
      raise errors.OpPrereqError("Instance '%s' not known" %
2761
                                 self.op.instance_name)
2762
    if instance.status != "down":
2763
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2764
                                 self.op.instance_name)
2765
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2766
                                              instance.name,
2767
                                              instance.hypervisor)
2768
    remote_info.Raise()
2769
    if remote_info.data:
2770
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2771
                                 (self.op.instance_name,
2772
                                  instance.primary_node))
2773
    self.instance = instance
2774

    
2775
    # new name verification
2776
    name_info = utils.HostInfo(self.op.new_name)
2777

    
2778
    self.op.new_name = new_name = name_info.name
2779
    instance_list = self.cfg.GetInstanceList()
2780
    if new_name in instance_list:
2781
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2782
                                 new_name)
2783

    
2784
    if not getattr(self.op, "ignore_ip", False):
2785
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2786
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2787
                                   (name_info.ip, new_name))
2788

    
2789

    
2790
  def Exec(self, feedback_fn):
2791
    """Reinstall the instance.
2792

2793
    """
2794
    inst = self.instance
2795
    old_name = inst.name
2796

    
2797
    if inst.disk_template == constants.DT_FILE:
2798
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2799

    
2800
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2801
    # Change the instance lock. This is definitely safe while we hold the BGL
2802
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2803
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2804

    
2805
    # re-read the instance from the configuration after rename
2806
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2807

    
2808
    if inst.disk_template == constants.DT_FILE:
2809
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2810
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2811
                                                     old_file_storage_dir,
2812
                                                     new_file_storage_dir)
2813
      result.Raise()
2814
      if not result.data:
2815
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2816
                                 " directory '%s' to '%s' (but the instance"
2817
                                 " has been renamed in Ganeti)" % (
2818
                                 inst.primary_node, old_file_storage_dir,
2819
                                 new_file_storage_dir))
2820

    
2821
      if not result.data[0]:
2822
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2823
                                 " (but the instance has been renamed in"
2824
                                 " Ganeti)" % (old_file_storage_dir,
2825
                                               new_file_storage_dir))
2826

    
2827
    _StartInstanceDisks(self, inst, None)
2828
    try:
2829
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2830
                                                 old_name)
2831
      if result.failed or not result.data:
2832
        msg = ("Could not run OS rename script for instance %s on node %s"
2833
               " (but the instance has been renamed in Ganeti)" %
2834
               (inst.name, inst.primary_node))
2835
        self.proc.LogWarning(msg)
2836
    finally:
2837
      _ShutdownInstanceDisks(self, inst)
2838

    
2839

    
2840
class LURemoveInstance(LogicalUnit):
2841
  """Remove an instance.
2842

2843
  """
2844
  HPATH = "instance-remove"
2845
  HTYPE = constants.HTYPE_INSTANCE
2846
  _OP_REQP = ["instance_name", "ignore_failures"]
2847
  REQ_BGL = False
2848

    
2849
  def ExpandNames(self):
2850
    self._ExpandAndLockInstance()
2851
    self.needed_locks[locking.LEVEL_NODE] = []
2852
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2853

    
2854
  def DeclareLocks(self, level):
2855
    if level == locking.LEVEL_NODE:
2856
      self._LockInstancesNodes()
2857

    
2858
  def BuildHooksEnv(self):
2859
    """Build hooks env.
2860

2861
    This runs on master, primary and secondary nodes of the instance.
2862

2863
    """
2864
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2865
    nl = [self.cfg.GetMasterNode()]
2866
    return env, nl, nl
2867

    
2868
  def CheckPrereq(self):
2869
    """Check prerequisites.
2870

2871
    This checks that the instance is in the cluster.
2872

2873
    """
2874
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2875
    assert self.instance is not None, \
2876
      "Cannot retrieve locked instance %s" % self.op.instance_name
2877

    
2878
  def Exec(self, feedback_fn):
2879
    """Remove the instance.
2880

2881
    """
2882
    instance = self.instance
2883
    logging.info("Shutting down instance %s on node %s",
2884
                 instance.name, instance.primary_node)
2885

    
2886
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2887
    if result.failed or not result.data:
2888
      if self.op.ignore_failures:
2889
        feedback_fn("Warning: can't shutdown instance")
2890
      else:
2891
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2892
                                 (instance.name, instance.primary_node))
2893

    
2894
    logging.info("Removing block devices for instance %s", instance.name)
2895

    
2896
    if not _RemoveDisks(self, instance):
2897
      if self.op.ignore_failures:
2898
        feedback_fn("Warning: can't remove instance's disks")
2899
      else:
2900
        raise errors.OpExecError("Can't remove instance's disks")
2901

    
2902
    logging.info("Removing instance %s out of cluster config", instance.name)
2903

    
2904
    self.cfg.RemoveInstance(instance.name)
2905
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2906

    
2907

    
2908
class LUQueryInstances(NoHooksLU):
2909
  """Logical unit for querying instances.
2910

2911
  """
2912
  _OP_REQP = ["output_fields", "names"]
2913
  REQ_BGL = False
2914
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2915
                                    "admin_state", "admin_ram",
2916
                                    "disk_template", "ip", "mac", "bridge",
2917
                                    "sda_size", "sdb_size", "vcpus", "tags",
2918
                                    "network_port", "beparams",
2919
                                    "(disk).(size)/([0-9]+)",
2920
                                    "(disk).(sizes)",
2921
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2922
                                    "(nic).(macs|ips|bridges)",
2923
                                    "(disk|nic).(count)",
2924
                                    "serial_no", "hypervisor", "hvparams",] +
2925
                                  ["hv/%s" % name
2926
                                   for name in constants.HVS_PARAMETERS] +
2927
                                  ["be/%s" % name
2928
                                   for name in constants.BES_PARAMETERS])
2929
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2930

    
2931

    
2932
  def ExpandNames(self):
2933
    _CheckOutputFields(static=self._FIELDS_STATIC,
2934
                       dynamic=self._FIELDS_DYNAMIC,
2935
                       selected=self.op.output_fields)
2936

    
2937
    self.needed_locks = {}
2938
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2939
    self.share_locks[locking.LEVEL_NODE] = 1
2940

    
2941
    if self.op.names:
2942
      self.wanted = _GetWantedInstances(self, self.op.names)
2943
    else:
2944
      self.wanted = locking.ALL_SET
2945

    
2946
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2947
    if self.do_locking:
2948
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2949
      self.needed_locks[locking.LEVEL_NODE] = []
2950
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2951

    
2952
  def DeclareLocks(self, level):
2953
    if level == locking.LEVEL_NODE and self.do_locking:
2954
      self._LockInstancesNodes()
2955

    
2956
  def CheckPrereq(self):
2957
    """Check prerequisites.
2958

2959
    """
2960
    pass
2961

    
2962
  def Exec(self, feedback_fn):
2963
    """Computes the list of nodes and their attributes.
2964

2965
    """
2966
    all_info = self.cfg.GetAllInstancesInfo()
2967
    if self.do_locking:
2968
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2969
    elif self.wanted != locking.ALL_SET:
2970
      instance_names = self.wanted
2971
      missing = set(instance_names).difference(all_info.keys())
2972
      if missing:
2973
        raise errors.OpExecError(
2974
          "Some instances were removed before retrieving their data: %s"
2975
          % missing)
2976
    else:
2977
      instance_names = all_info.keys()
2978

    
2979
    instance_names = utils.NiceSort(instance_names)
2980
    instance_list = [all_info[iname] for iname in instance_names]
2981

    
2982
    # begin data gathering
2983

    
2984
    nodes = frozenset([inst.primary_node for inst in instance_list])
2985
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2986

    
2987
    bad_nodes = []
2988
    if self.do_locking:
2989
      live_data = {}
2990
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2991
      for name in nodes:
2992
        result = node_data[name]
2993
        if result.failed:
2994
          bad_nodes.append(name)
2995
        else:
2996
          if result.data:
2997
            live_data.update(result.data)
2998
            # else no instance is alive
2999
    else:
3000
      live_data = dict([(name, {}) for name in instance_names])
3001

    
3002
    # end data gathering
3003

    
3004
    HVPREFIX = "hv/"
3005
    BEPREFIX = "be/"
3006
    output = []
3007
    for instance in instance_list:
3008
      iout = []
3009
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3010
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3011
      for field in self.op.output_fields:
3012
        st_match = self._FIELDS_STATIC.Matches(field)
3013
        if field == "name":
3014
          val = instance.name
3015
        elif field == "os":
3016
          val = instance.os
3017
        elif field == "pnode":
3018
          val = instance.primary_node
3019
        elif field == "snodes":
3020
          val = list(instance.secondary_nodes)
3021
        elif field == "admin_state":
3022
          val = (instance.status != "down")
3023
        elif field == "oper_state":
3024
          if instance.primary_node in bad_nodes:
3025
            val = None
3026
          else:
3027
            val = bool(live_data.get(instance.name))
3028
        elif field == "status":
3029
          if instance.primary_node in bad_nodes:
3030
            val = "ERROR_nodedown"
3031
          else:
3032
            running = bool(live_data.get(instance.name))
3033
            if running:
3034
              if instance.status != "down":
3035
                val = "running"
3036
              else:
3037
                val = "ERROR_up"
3038
            else:
3039
              if instance.status != "down":
3040
                val = "ERROR_down"
3041
              else:
3042
                val = "ADMIN_down"
3043
        elif field == "oper_ram":
3044
          if instance.primary_node in bad_nodes:
3045
            val = None
3046
          elif instance.name in live_data:
3047
            val = live_data[instance.name].get("memory", "?")
3048
          else:
3049
            val = "-"
3050
        elif field == "disk_template":
3051
          val = instance.disk_template
3052
        elif field == "ip":
3053
          val = instance.nics[0].ip
3054
        elif field == "bridge":
3055
          val = instance.nics[0].bridge
3056
        elif field == "mac":
3057
          val = instance.nics[0].mac
3058
        elif field == "sda_size" or field == "sdb_size":
3059
          idx = ord(field[2]) - ord('a')
3060
          try:
3061
            val = instance.FindDisk(idx).size
3062
          except errors.OpPrereqError:
3063
            val = None
3064
        elif field == "tags":
3065
          val = list(instance.GetTags())
3066
        elif field == "serial_no":
3067
          val = instance.serial_no
3068
        elif field == "network_port":
3069
          val = instance.network_port
3070
        elif field == "hypervisor":
3071
          val = instance.hypervisor
3072
        elif field == "hvparams":
3073
          val = i_hv
3074
        elif (field.startswith(HVPREFIX) and
3075
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3076
          val = i_hv.get(field[len(HVPREFIX):], None)
3077
        elif field == "beparams":
3078
          val = i_be
3079
        elif (field.startswith(BEPREFIX) and
3080
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3081
          val = i_be.get(field[len(BEPREFIX):], None)
3082
        elif st_match and st_match.groups():
3083
          # matches a variable list
3084
          st_groups = st_match.groups()
3085
          if st_groups and st_groups[0] == "disk":
3086
            if st_groups[1] == "count":
3087
              val = len(instance.disks)
3088
            elif st_groups[1] == "sizes":
3089
              val = [disk.size for disk in instance.disks]
3090
            elif st_groups[1] == "size":
3091
              try:
3092
                val = instance.FindDisk(st_groups[2]).size
3093
              except errors.OpPrereqError:
3094
                val = None
3095
            else:
3096
              assert False, "Unhandled disk parameter"
3097
          elif st_groups[0] == "nic":
3098
            if st_groups[1] == "count":
3099
              val = len(instance.nics)
3100
            elif st_groups[1] == "macs":
3101
              val = [nic.mac for nic in instance.nics]
3102
            elif st_groups[1] == "ips":
3103
              val = [nic.ip for nic in instance.nics]
3104
            elif st_groups[1] == "bridges":
3105
              val = [nic.bridge for nic in instance.nics]
3106
            else:
3107
              # index-based item
3108
              nic_idx = int(st_groups[2])
3109
              if nic_idx >= len(instance.nics):
3110
                val = None
3111
              else:
3112
                if st_groups[1] == "mac":
3113
                  val = instance.nics[nic_idx].mac
3114
                elif st_groups[1] == "ip":
3115
                  val = instance.nics[nic_idx].ip
3116
                elif st_groups[1] == "bridge":
3117
                  val = instance.nics[nic_idx].bridge
3118
                else:
3119
                  assert False, "Unhandled NIC parameter"
3120
          else:
3121
            assert False, "Unhandled variable parameter"
3122
        else:
3123
          raise errors.ParameterError(field)
3124
        iout.append(val)
3125
      output.append(iout)
3126

    
3127
    return output
3128

    
3129

    
3130
class LUFailoverInstance(LogicalUnit):
3131
  """Failover an instance.
3132

3133
  """
3134
  HPATH = "instance-failover"
3135
  HTYPE = constants.HTYPE_INSTANCE
3136
  _OP_REQP = ["instance_name", "ignore_consistency"]
3137
  REQ_BGL = False
3138

    
3139
  def ExpandNames(self):
3140
    self._ExpandAndLockInstance()
3141
    self.needed_locks[locking.LEVEL_NODE] = []
3142
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3143

    
3144
  def DeclareLocks(self, level):
3145
    if level == locking.LEVEL_NODE:
3146
      self._LockInstancesNodes()
3147

    
3148
  def BuildHooksEnv(self):
3149
    """Build hooks env.
3150

3151
    This runs on master, primary and secondary nodes of the instance.
3152

3153
    """
3154
    env = {
3155
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3156
      }
3157
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3158
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3159
    return env, nl, nl
3160

    
3161
  def CheckPrereq(self):
3162
    """Check prerequisites.
3163

3164
    This checks that the instance is in the cluster.
3165

3166
    """
3167
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3168
    assert self.instance is not None, \
3169
      "Cannot retrieve locked instance %s" % self.op.instance_name
3170

    
3171
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3172
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3173
      raise errors.OpPrereqError("Instance's disk layout is not"
3174
                                 " network mirrored, cannot failover.")
3175

    
3176
    secondary_nodes = instance.secondary_nodes
3177
    if not secondary_nodes:
3178
      raise errors.ProgrammerError("no secondary node but using "
3179
                                   "a mirrored disk template")
3180

    
3181
    target_node = secondary_nodes[0]
3182
    # check memory requirements on the secondary node
3183
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3184
                         instance.name, bep[constants.BE_MEMORY],
3185
                         instance.hypervisor)
3186

    
3187
    # check bridge existance
3188
    brlist = [nic.bridge for nic in instance.nics]
3189
    result = self.rpc.call_bridges_exist(target_node, brlist)
3190
    result.Raise()
3191
    if not result.data:
3192
      raise errors.OpPrereqError("One or more target bridges %s does not"
3193
                                 " exist on destination node '%s'" %
3194
                                 (brlist, target_node))
3195

    
3196
  def Exec(self, feedback_fn):
3197
    """Failover an instance.
3198

3199
    The failover is done by shutting it down on its present node and
3200
    starting it on the secondary.
3201

3202
    """
3203
    instance = self.instance
3204

    
3205
    source_node = instance.primary_node
3206
    target_node = instance.secondary_nodes[0]
3207

    
3208
    feedback_fn("* checking disk consistency between source and target")
3209
    for dev in instance.disks:
3210
      # for drbd, these are drbd over lvm
3211
      if not _CheckDiskConsistency(self, dev, target_node, False):
3212
        if instance.status == "up" and not self.op.ignore_consistency:
3213
          raise errors.OpExecError("Disk %s is degraded on target node,"
3214
                                   " aborting failover." % dev.iv_name)
3215

    
3216
    feedback_fn("* shutting down instance on source node")
3217
    logging.info("Shutting down instance %s on node %s",
3218
                 instance.name, source_node)
3219

    
3220
    result = self.rpc.call_instance_shutdown(source_node, instance)
3221
    if result.failed or not result.data:
3222
      if self.op.ignore_consistency:
3223
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3224
                             " Proceeding"
3225
                             " anyway. Please make sure node %s is down",
3226
                             instance.name, source_node, source_node)
3227
      else:
3228
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3229
                                 (instance.name, source_node))
3230

    
3231
    feedback_fn("* deactivating the instance's disks on source node")
3232
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3233
      raise errors.OpExecError("Can't shut down the instance's disks.")
3234

    
3235
    instance.primary_node = target_node
3236
    # distribute new instance config to the other nodes
3237
    self.cfg.Update(instance)
3238

    
3239
    # Only start the instance if it's marked as up
3240
    if instance.status == "up":
3241
      feedback_fn("* activating the instance's disks on target node")
3242
      logging.info("Starting instance %s on node %s",
3243
                   instance.name, target_node)
3244

    
3245
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3246
                                               ignore_secondaries=True)
3247
      if not disks_ok:
3248
        _ShutdownInstanceDisks(self, instance)
3249
        raise errors.OpExecError("Can't activate the instance's disks")
3250

    
3251
      feedback_fn("* starting the instance on the target node")
3252
      result = self.rpc.call_instance_start(target_node, instance, None)
3253
      if result.failed or not result.data:
3254
        _ShutdownInstanceDisks(self, instance)
3255
        raise errors.OpExecError("Could not start instance %s on node %s." %
3256
                                 (instance.name, target_node))
3257

    
3258

    
3259
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3260
  """Create a tree of block devices on the primary node.
3261

3262
  This always creates all devices.
3263

3264
  """
3265
  if device.children:
3266
    for child in device.children:
3267
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3268
        return False
3269

    
3270
  lu.cfg.SetDiskID(device, node)
3271
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3272
                                       instance.name, True, info)
3273
  if new_id.failed or not new_id.data:
3274
    return False
3275
  if device.physical_id is None:
3276
    device.physical_id = new_id
3277
  return True
3278

    
3279

    
3280
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3281
  """Create a tree of block devices on a secondary node.
3282

3283
  If this device type has to be created on secondaries, create it and
3284
  all its children.
3285

3286
  If not, just recurse to children keeping the same 'force' value.
3287

3288
  """
3289
  if device.CreateOnSecondary():
3290
    force = True
3291
  if device.children:
3292
    for child in device.children:
3293
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3294
                                        child, force, info):
3295
        return False
3296

    
3297
  if not force:
3298
    return True
3299
  lu.cfg.SetDiskID(device, node)
3300
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3301
                                       instance.name, False, info)
3302
  if new_id.failed or not new_id.data:
3303
    return False
3304
  if device.physical_id is None:
3305
    device.physical_id = new_id
3306
  return True
3307

    
3308

    
3309
def _GenerateUniqueNames(lu, exts):
3310
  """Generate a suitable LV name.
3311

3312
  This will generate a logical volume name for the given instance.
3313

3314
  """
3315
  results = []
3316
  for val in exts:
3317
    new_id = lu.cfg.GenerateUniqueID()
3318
    results.append("%s%s" % (new_id, val))
3319
  return results
3320

    
3321

    
3322
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3323
                         p_minor, s_minor):
3324
  """Generate a drbd8 device complete with its children.
3325

3326
  """
3327
  port = lu.cfg.AllocatePort()
3328
  vgname = lu.cfg.GetVGName()
3329
  shared_secret = lu.cfg.GenerateDRBDSecret()
3330
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3331
                          logical_id=(vgname, names[0]))
3332
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3333
                          logical_id=(vgname, names[1]))
3334
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3335
                          logical_id=(primary, secondary, port,
3336
                                      p_minor, s_minor,
3337
                                      shared_secret),
3338
                          children=[dev_data, dev_meta],
3339
                          iv_name=iv_name)
3340
  return drbd_dev
3341

    
3342

    
3343
def _GenerateDiskTemplate(lu, template_name,
3344
                          instance_name, primary_node,
3345
                          secondary_nodes, disk_info,
3346
                          file_storage_dir, file_driver,
3347
                          base_index):
3348
  """Generate the entire disk layout for a given template type.
3349

3350
  """
3351
  #TODO: compute space requirements
3352

    
3353
  vgname = lu.cfg.GetVGName()
3354
  disk_count = len(disk_info)
3355
  disks = []
3356
  if template_name == constants.DT_DISKLESS:
3357
    pass
3358
  elif template_name == constants.DT_PLAIN:
3359
    if len(secondary_nodes) != 0:
3360
      raise errors.ProgrammerError("Wrong template configuration")
3361

    
3362
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3363
                                      for i in range(disk_count)])
3364
    for idx, disk in enumerate(disk_info):
3365
      disk_index = idx + base_index
3366
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3367
                              logical_id=(vgname, names[idx]),
3368
                              iv_name="disk/%d" % disk_index)
3369
      disks.append(disk_dev)
3370
  elif template_name == constants.DT_DRBD8:
3371
    if len(secondary_nodes) != 1:
3372
      raise errors.ProgrammerError("Wrong template configuration")
3373
    remote_node = secondary_nodes[0]
3374
    minors = lu.cfg.AllocateDRBDMinor(
3375
      [primary_node, remote_node] * len(disk_info), instance_name)
3376

    
3377
    names = _GenerateUniqueNames(lu,
3378
                                 [".disk%d_%s" % (i, s)
3379
                                  for i in range(disk_count)
3380
                                  for s in ("data", "meta")
3381
                                  ])
3382
    for idx, disk in enumerate(disk_info):
3383
      disk_index = idx + base_index
3384
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3385
                                      disk["size"], names[idx*2:idx*2+2],
3386
                                      "disk/%d" % disk_index,
3387
                                      minors[idx*2], minors[idx*2+1])
3388
      disks.append(disk_dev)
3389
  elif template_name == constants.DT_FILE:
3390
    if len(secondary_nodes) != 0:
3391
      raise errors.ProgrammerError("Wrong template configuration")
3392

    
3393
    for idx, disk in enumerate(disk_info):
3394
      disk_index = idx + base_index
3395
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3396
                              iv_name="disk/%d" % disk_index,
3397
                              logical_id=(file_driver,
3398
                                          "%s/disk%d" % (file_storage_dir,
3399
                                                         idx)))
3400
      disks.append(disk_dev)
3401
  else:
3402
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3403
  return disks
3404

    
3405

    
3406
def _GetInstanceInfoText(instance):
3407
  """Compute that text that should be added to the disk's metadata.
3408

3409
  """
3410
  return "originstname+%s" % instance.name
3411

    
3412

    
3413
def _CreateDisks(lu, instance):
3414
  """Create all disks for an instance.
3415

3416
  This abstracts away some work from AddInstance.
3417

3418
  @type lu: L{LogicalUnit}
3419
  @param lu: the logical unit on whose behalf we execute
3420
  @type instance: L{objects.Instance}
3421
  @param instance: the instance whose disks we should create
3422
  @rtype: boolean
3423
  @return: the success of the creation
3424

3425
  """
3426
  info = _GetInstanceInfoText(instance)
3427

    
3428
  if instance.disk_template == constants.DT_FILE:
3429
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3430
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3431
                                                 file_storage_dir)
3432

    
3433
    if result.failed or not result.data:
3434
      logging.error("Could not connect to node '%s'", instance.primary_node)
3435
      return False
3436

    
3437
    if not result.data[0]:
3438
      logging.error("Failed to create directory '%s'", file_storage_dir)
3439
      return False
3440

    
3441
  # Note: this needs to be kept in sync with adding of disks in
3442
  # LUSetInstanceParams
3443
  for device in instance.disks:
3444
    logging.info("Creating volume %s for instance %s",
3445
                 device.iv_name, instance.name)
3446
    #HARDCODE
3447
    for secondary_node in instance.secondary_nodes:
3448
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3449
                                        device, False, info):
3450
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3451
                      device.iv_name, device, secondary_node)
3452
        return False
3453
    #HARDCODE
3454
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3455
                                    instance, device, info):
3456
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3457
      return False
3458

    
3459
  return True
3460

    
3461

    
3462
def _RemoveDisks(lu, instance):
3463
  """Remove all disks for an instance.
3464

3465
  This abstracts away some work from `AddInstance()` and
3466
  `RemoveInstance()`. Note that in case some of the devices couldn't
3467
  be removed, the removal will continue with the other ones (compare
3468
  with `_CreateDisks()`).
3469

3470
  @type lu: L{LogicalUnit}
3471
  @param lu: the logical unit on whose behalf we execute
3472
  @type instance: L{objects.Instance}
3473
  @param instance: the instance whose disks we should remove
3474
  @rtype: boolean
3475
  @return: the success of the removal
3476

3477
  """
3478
  logging.info("Removing block devices for instance %s", instance.name)
3479

    
3480
  result = True
3481
  for device in instance.disks:
3482
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3483
      lu.cfg.SetDiskID(disk, node)
3484
      result = lu.rpc.call_blockdev_remove(node, disk)
3485
      if result.failed or not result.data:
3486
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3487
                           " continuing anyway", device.iv_name, node)
3488
        result = False
3489

    
3490
  if instance.disk_template == constants.DT_FILE:
3491
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3492
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3493
                                                 file_storage_dir)
3494
    if result.failed or not result.data:
3495
      logging.error("Could not remove directory '%s'", file_storage_dir)
3496
      result = False
3497

    
3498
  return result
3499

    
3500

    
3501
def _ComputeDiskSize(disk_template, disks):
3502
  """Compute disk size requirements in the volume group
3503

3504
  """
3505
  # Required free disk space as a function of disk and swap space
3506
  req_size_dict = {
3507
    constants.DT_DISKLESS: None,
3508
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3509
    # 128 MB are added for drbd metadata for each disk
3510
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3511
    constants.DT_FILE: None,
3512
  }
3513

    
3514
  if disk_template not in req_size_dict:
3515
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3516
                                 " is unknown" %  disk_template)
3517

    
3518
  return req_size_dict[disk_template]
3519

    
3520

    
3521
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3522
  """Hypervisor parameter validation.
3523

3524
  This function abstract the hypervisor parameter validation to be
3525
  used in both instance create and instance modify.
3526

3527
  @type lu: L{LogicalUnit}
3528
  @param lu: the logical unit for which we check
3529
  @type nodenames: list
3530
  @param nodenames: the list of nodes on which we should check
3531
  @type hvname: string
3532
  @param hvname: the name of the hypervisor we should use
3533
  @type hvparams: dict
3534
  @param hvparams: the parameters which we need to check
3535
  @raise errors.OpPrereqError: if the parameters are not valid
3536

3537
  """
3538
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3539
                                                  hvname,
3540
                                                  hvparams)
3541
  for node in nodenames:
3542
    info = hvinfo[node]
3543
    info.Raise()
3544
    if not info.data or not isinstance(info.data, (tuple, list)):
3545
      raise errors.OpPrereqError("Cannot get current information"
3546
                                 " from node '%s' (%s)" % (node, info.data))
3547
    if not info.data[0]:
3548
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3549
                                 " %s" % info.data[1])
3550

    
3551

    
3552
class LUCreateInstance(LogicalUnit):
3553
  """Create an instance.
3554

3555
  """
3556
  HPATH = "instance-add"
3557
  HTYPE = constants.HTYPE_INSTANCE
3558
  _OP_REQP = ["instance_name", "disks", "disk_template",
3559
              "mode", "start",
3560
              "wait_for_sync", "ip_check", "nics",
3561
              "hvparams", "beparams"]
3562
  REQ_BGL = False
3563

    
3564
  def _ExpandNode(self, node):
3565
    """Expands and checks one node name.
3566

3567
    """
3568
    node_full = self.cfg.ExpandNodeName(node)
3569
    if node_full is None:
3570
      raise errors.OpPrereqError("Unknown node %s" % node)
3571
    return node_full
3572

    
3573
  def ExpandNames(self):
3574
    """ExpandNames for CreateInstance.
3575

3576
    Figure out the right locks for instance creation.
3577

3578
    """
3579
    self.needed_locks = {}
3580

    
3581
    # set optional parameters to none if they don't exist
3582
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3583
      if not hasattr(self.op, attr):
3584
        setattr(self.op, attr, None)
3585

    
3586
    # cheap checks, mostly valid constants given
3587

    
3588
    # verify creation mode
3589
    if self.op.mode not in (constants.INSTANCE_CREATE,
3590
                            constants.INSTANCE_IMPORT):
3591
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3592
                                 self.op.mode)
3593

    
3594
    # disk template and mirror node verification
3595
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3596
      raise errors.OpPrereqError("Invalid disk template name")
3597

    
3598
    if self.op.hypervisor is None:
3599
      self.op.hypervisor = self.cfg.GetHypervisorType()
3600

    
3601
    cluster = self.cfg.GetClusterInfo()
3602
    enabled_hvs = cluster.enabled_hypervisors
3603
    if self.op.hypervisor not in enabled_hvs:
3604
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3605
                                 " cluster (%s)" % (self.op.hypervisor,
3606
                                  ",".join(enabled_hvs)))
3607

    
3608
    # check hypervisor parameter syntax (locally)
3609

    
3610
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3611
                                  self.op.hvparams)
3612
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3613
    hv_type.CheckParameterSyntax(filled_hvp)
3614

    
3615
    # fill and remember the beparams dict
3616
    utils.CheckBEParams(self.op.beparams)
3617
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3618
                                    self.op.beparams)
3619

    
3620
    #### instance parameters check
3621

    
3622
    # instance name verification
3623
    hostname1 = utils.HostInfo(self.op.instance_name)
3624
    self.op.instance_name = instance_name = hostname1.name
3625

    
3626
    # this is just a preventive check, but someone might still add this
3627
    # instance in the meantime, and creation will fail at lock-add time
3628
    if instance_name in self.cfg.GetInstanceList():
3629
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3630
                                 instance_name)
3631

    
3632
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3633

    
3634
    # NIC buildup
3635
    self.nics = []
3636
    for nic in self.op.nics:
3637
      # ip validity checks
3638
      ip = nic.get("ip", None)
3639
      if ip is None or ip.lower() == "none":
3640
        nic_ip = None
3641
      elif ip.lower() == constants.VALUE_AUTO:
3642
        nic_ip = hostname1.ip
3643
      else:
3644
        if not utils.IsValidIP(ip):
3645
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3646
                                     " like a valid IP" % ip)
3647
        nic_ip = ip
3648

    
3649
      # MAC address verification
3650
      mac = nic.get("mac", constants.VALUE_AUTO)
3651
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3652
        if not utils.IsValidMac(mac.lower()):
3653
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3654
                                     mac)
3655
      # bridge verification
3656
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3657
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3658

    
3659
    # disk checks/pre-build
3660
    self.disks = []
3661
    for disk in self.op.disks:
3662
      mode = disk.get("mode", constants.DISK_RDWR)
3663
      if mode not in constants.DISK_ACCESS_SET:
3664
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3665
                                   mode)
3666
      size = disk.get("size", None)
3667
      if size is None:
3668
        raise errors.OpPrereqError("Missing disk size")
3669
      try:
3670
        size = int(size)
3671
      except ValueError:
3672
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3673
      self.disks.append({"size": size, "mode": mode})
3674

    
3675
    # used in CheckPrereq for ip ping check
3676
    self.check_ip = hostname1.ip
3677

    
3678
    # file storage checks
3679
    if (self.op.file_driver and
3680
        not self.op.file_driver in constants.FILE_DRIVER):
3681
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3682
                                 self.op.file_driver)
3683

    
3684
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3685
      raise errors.OpPrereqError("File storage directory path not absolute")
3686

    
3687
    ### Node/iallocator related checks
3688
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3689
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3690
                                 " node must be given")
3691

    
3692
    if self.op.iallocator:
3693
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3694
    else:
3695
      self.op.pnode = self._ExpandNode(self.op.pnode)
3696
      nodelist = [self.op.pnode]
3697
      if self.op.snode is not None:
3698
        self.op.snode = self._ExpandNode(self.op.snode)
3699
        nodelist.append(self.op.snode)
3700
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3701

    
3702
    # in case of import lock the source node too
3703
    if self.op.mode == constants.INSTANCE_IMPORT:
3704
      src_node = getattr(self.op, "src_node", None)
3705
      src_path = getattr(self.op, "src_path", None)
3706

    
3707
      if src_path is None:
3708
        self.op.src_path = src_path = self.op.instance_name
3709

    
3710
      if src_node is None:
3711
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3712
        self.op.src_node = None
3713
        if os.path.isabs(src_path):
3714
          raise errors.OpPrereqError("Importing an instance from an absolute"
3715
                                     " path requires a source node option.")
3716
      else:
3717
        self.op.src_node = src_node = self._ExpandNode(src_node)
3718
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3719
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
3720
        if not os.path.isabs(src_path):
3721
          self.op.src_path = src_path = \
3722
            os.path.join(constants.EXPORT_DIR, src_path)
3723

    
3724
    else: # INSTANCE_CREATE
3725
      if getattr(self.op, "os_type", None) is None:
3726
        raise errors.OpPrereqError("No guest OS specified")
3727

    
3728
  def _RunAllocator(self):
3729
    """Run the allocator based on input opcode.
3730

3731
    """
3732
    nics = [n.ToDict() for n in self.nics]
3733
    ial = IAllocator(self,
3734
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3735
                     name=self.op.instance_name,
3736
                     disk_template=self.op.disk_template,
3737
                     tags=[],
3738
                     os=self.op.os_type,
3739
                     vcpus=self.be_full[constants.BE_VCPUS],
3740
                     mem_size=self.be_full[constants.BE_MEMORY],
3741
                     disks=self.disks,
3742
                     nics=nics,
3743
                     hypervisor=self.op.hypervisor,
3744
                     )
3745

    
3746
    ial.Run(self.op.iallocator)
3747

    
3748
    if not ial.success:
3749
      raise errors.OpPrereqError("Can't compute nodes using"
3750
                                 " iallocator '%s': %s" % (self.op.iallocator,
3751
                                                           ial.info))
3752
    if len(ial.nodes) != ial.required_nodes:
3753
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3754
                                 " of nodes (%s), required %s" %
3755
                                 (self.op.iallocator, len(ial.nodes),
3756
                                  ial.required_nodes))
3757
    self.op.pnode = ial.nodes[0]
3758
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3759
                 self.op.instance_name, self.op.iallocator,
3760
                 ", ".join(ial.nodes))
3761
    if ial.required_nodes == 2:
3762
      self.op.snode = ial.nodes[1]
3763

    
3764
  def BuildHooksEnv(self):
3765
    """Build hooks env.
3766

3767
    This runs on master, primary and secondary nodes of the instance.
3768

3769
    """
3770
    env = {
3771
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3772
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3773
      "INSTANCE_ADD_MODE": self.op.mode,
3774
      }
3775
    if self.op.mode == constants.INSTANCE_IMPORT:
3776
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3777
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3778
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3779

    
3780
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3781
      primary_node=self.op.pnode,
3782
      secondary_nodes=self.secondaries,
3783
      status=self.instance_status,
3784
      os_type=self.op.os_type,
3785
      memory=self.be_full[constants.BE_MEMORY],
3786
      vcpus=self.be_full[constants.BE_VCPUS],
3787
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3788
    ))
3789

    
3790
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3791
          self.secondaries)
3792
    return env, nl, nl
3793

    
3794

    
3795
  def CheckPrereq(self):
3796
    """Check prerequisites.
3797

3798
    """
3799
    if (not self.cfg.GetVGName() and
3800
        self.op.disk_template not in constants.DTS_NOT_LVM):
3801
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3802
                                 " instances")
3803

    
3804

    
3805
    if self.op.mode == constants.INSTANCE_IMPORT:
3806
      src_node = self.op.src_node
3807
      src_path = self.op.src_path
3808

    
3809
      if src_node is None:
3810
        exp_list = self.rpc.call_export_list(
3811
          self.acquired_locks[locking.LEVEL_NODE])
3812
        found = False
3813
        for node in exp_list:
3814
          if not exp_list[node].failed and src_path in exp_list[node].data:
3815
            found = True
3816
            self.op.src_node = src_node = node
3817
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3818
                                                       src_path)
3819
            break
3820
        if not found:
3821
          raise errors.OpPrereqError("No export found for relative path %s" %
3822
                                      src_path)
3823

    
3824
      result = self.rpc.call_export_info(src_node, src_path)
3825
      result.Raise()
3826
      if not result.data:
3827
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3828

    
3829
      export_info = result.data
3830
      if not export_info.has_section(constants.INISECT_EXP):
3831
        raise errors.ProgrammerError("Corrupted export config")
3832

    
3833
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3834
      if (int(ei_version) != constants.EXPORT_VERSION):
3835
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3836
                                   (ei_version, constants.EXPORT_VERSION))
3837

    
3838
      # Check that the new instance doesn't have less disks than the export
3839
      instance_disks = len(self.disks)
3840
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3841
      if instance_disks < export_disks:
3842
        raise errors.OpPrereqError("Not enough disks to import."
3843
                                   " (instance: %d, export: %d)" %
3844
                                   (instance_disks, export_disks))
3845

    
3846
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3847
      disk_images = []
3848
      for idx in range(export_disks):
3849
        option = 'disk%d_dump' % idx
3850
        if export_info.has_option(constants.INISECT_INS, option):
3851
          # FIXME: are the old os-es, disk sizes, etc. useful?
3852
          export_name = export_info.get(constants.INISECT_INS, option)
3853
          image = os.path.join(src_path, export_name)
3854
          disk_images.append(image)
3855
        else:
3856
          disk_images.append(False)
3857

    
3858
      self.src_images = disk_images
3859

    
3860
      old_name = export_info.get(constants.INISECT_INS, 'name')
3861
      # FIXME: int() here could throw a ValueError on broken exports
3862
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3863
      if self.op.instance_name == old_name:
3864
        for idx, nic in enumerate(self.nics):
3865
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3866
            nic_mac_ini = 'nic%d_mac' % idx
3867
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3868

    
3869
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3870
    if self.op.start and not self.op.ip_check:
3871
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3872
                                 " adding an instance in start mode")
3873

    
3874
    if self.op.ip_check:
3875
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3876
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3877
                                   (self.check_ip, self.op.instance_name))
3878

    
3879
    #### allocator run
3880

    
3881
    if self.op.iallocator is not None:
3882
      self._RunAllocator()
3883

    
3884
    #### node related checks
3885

    
3886
    # check primary node
3887
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3888
    assert self.pnode is not None, \
3889
      "Cannot retrieve locked node %s" % self.op.pnode
3890
    self.secondaries = []
3891

    
3892
    # mirror node verification
3893
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3894
      if self.op.snode is None:
3895
        raise errors.OpPrereqError("The networked disk templates need"
3896
                                   " a mirror node")
3897
      if self.op.snode == pnode.name:
3898
        raise errors.OpPrereqError("The secondary node cannot be"
3899
                                   " the primary node.")
3900
      self.secondaries.append(self.op.snode)
3901

    
3902
    nodenames = [pnode.name] + self.secondaries
3903

    
3904
    req_size = _ComputeDiskSize(self.op.disk_template,
3905
                                self.disks)
3906

    
3907
    # Check lv size requirements
3908
    if req_size is not None:
3909
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3910
                                         self.op.hypervisor)
3911
      for node in nodenames:
3912
        info = nodeinfo[node]
3913
        info.Raise()
3914
        info = info.data
3915
        if not info:
3916
          raise errors.OpPrereqError("Cannot get current information"
3917
                                     " from node '%s'" % node)
3918
        vg_free = info.get('vg_free', None)
3919
        if not isinstance(vg_free, int):
3920
          raise errors.OpPrereqError("Can't compute free disk space on"
3921
                                     " node %s" % node)
3922
        if req_size > info['vg_free']:
3923
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3924
                                     " %d MB available, %d MB required" %
3925
                                     (node, info['vg_free'], req_size))
3926

    
3927
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3928

    
3929
    # os verification
3930
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3931
    result.Raise()
3932
    if not isinstance(result.data, objects.OS):
3933
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3934
                                 " primary node"  % self.op.os_type)
3935

    
3936
    # bridge check on primary node
3937
    bridges = [n.bridge for n in self.nics]
3938
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3939
    result.Raise()
3940
    if not result.data:
3941
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
3942
                                 " exist on destination node '%s'" %
3943
                                 (",".join(bridges), pnode.name))
3944

    
3945
    # memory check on primary node
3946
    if self.op.start:
3947
      _CheckNodeFreeMemory(self, self.pnode.name,
3948
                           "creating instance %s" % self.op.instance_name,
3949
                           self.be_full[constants.BE_MEMORY],
3950
                           self.op.hypervisor)
3951

    
3952
    if self.op.start:
3953
      self.instance_status = 'up'
3954
    else:
3955
      self.instance_status = 'down'
3956

    
3957
  def Exec(self, feedback_fn):
3958
    """Create and add the instance to the cluster.
3959

3960
    """
3961
    instance = self.op.instance_name
3962
    pnode_name = self.pnode.name
3963

    
3964
    for nic in self.nics:
3965
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3966
        nic.mac = self.cfg.GenerateMAC()
3967

    
3968
    ht_kind = self.op.hypervisor
3969
    if ht_kind in constants.HTS_REQ_PORT:
3970
      network_port = self.cfg.AllocatePort()
3971
    else:
3972
      network_port = None
3973

    
3974
    ##if self.op.vnc_bind_address is None:
3975
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3976

    
3977
    # this is needed because os.path.join does not accept None arguments
3978
    if self.op.file_storage_dir is None:
3979
      string_file_storage_dir = ""
3980
    else:
3981
      string_file_storage_dir = self.op.file_storage_dir
3982

    
3983
    # build the full file storage dir path
3984
    file_storage_dir = os.path.normpath(os.path.join(
3985
                                        self.cfg.GetFileStorageDir(),
3986
                                        string_file_storage_dir, instance))
3987

    
3988

    
3989
    disks = _GenerateDiskTemplate(self,
3990
                                  self.op.disk_template,
3991
                                  instance, pnode_name,
3992
                                  self.secondaries,
3993
                                  self.disks,
3994
                                  file_storage_dir,
3995
                                  self.op.file_driver,
3996
                                  0)
3997

    
3998
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3999
                            primary_node=pnode_name,
4000
                            nics=self.nics, disks=disks,
4001
                            disk_template=self.op.disk_template,
4002
                            status=self.instance_status,
4003
                            network_port=network_port,
4004
                            beparams=self.op.beparams,
4005
                            hvparams=self.op.hvparams,
4006
                            hypervisor=self.op.hypervisor,
4007
                            )
4008

    
4009
    feedback_fn("* creating instance disks...")
4010
    if not _CreateDisks(self, iobj):
4011
      _RemoveDisks(self, iobj)
4012
      self.cfg.ReleaseDRBDMinors(instance)
4013
      raise errors.OpExecError("Device creation failed, reverting...")
4014

    
4015
    feedback_fn("adding instance %s to cluster config" % instance)
4016

    
4017
    self.cfg.AddInstance(iobj)
4018
    # Declare that we don't want to remove the instance lock anymore, as we've
4019
    # added the instance to the config
4020
    del self.remove_locks[locking.LEVEL_INSTANCE]
4021
    # Remove the temp. assignements for the instance's drbds
4022
    self.cfg.ReleaseDRBDMinors(instance)
4023
    # Unlock all the nodes
4024
    if self.op.mode == constants.INSTANCE_IMPORT:
4025
      nodes_keep = [self.op.src_node]
4026
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4027
                       if node != self.op.src_node]
4028
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4029
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4030
    else:
4031
      self.context.glm.release(locking.LEVEL_NODE)
4032
      del self.acquired_locks[locking.LEVEL_NODE]
4033

    
4034
    if self.op.wait_for_sync:
4035
      disk_abort = not _WaitForSync(self, iobj)
4036
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4037
      # make sure the disks are not degraded (still sync-ing is ok)
4038
      time.sleep(15)
4039
      feedback_fn("* checking mirrors status")
4040
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4041
    else:
4042
      disk_abort = False
4043

    
4044
    if disk_abort:
4045
      _RemoveDisks(self, iobj)
4046
      self.cfg.RemoveInstance(iobj.name)
4047
      # Make sure the instance lock gets removed
4048
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4049
      raise errors.OpExecError("There are some degraded disks for"
4050
                               " this instance")
4051

    
4052
    feedback_fn("creating os for instance %s on node %s" %
4053
                (instance, pnode_name))
4054

    
4055
    if iobj.disk_template != constants.DT_DISKLESS:
4056
      if self.op.mode == constants.INSTANCE_CREATE:
4057
        feedback_fn("* running the instance OS create scripts...")
4058
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4059
        result.Raise()
4060
        if not result.data:
4061
          raise errors.OpExecError("Could not add os for instance %s"
4062
                                   " on node %s" %
4063
                                   (instance, pnode_name))
4064

    
4065
      elif self.op.mode == constants.INSTANCE_IMPORT:
4066
        feedback_fn("* running the instance OS import scripts...")
4067
        src_node = self.op.src_node
4068
        src_images = self.src_images
4069
        cluster_name = self.cfg.GetClusterName()
4070
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4071
                                                         src_node, src_images,
4072
                                                         cluster_name)
4073
        import_result.Raise()
4074
        for idx, result in enumerate(import_result.data):
4075
          if not result:
4076
            self.LogWarning("Could not import the image %s for instance"
4077
                            " %s, disk %d, on node %s" %
4078
                            (src_images[idx], instance, idx, pnode_name))
4079
      else:
4080
        # also checked in the prereq part
4081
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4082
                                     % self.op.mode)
4083

    
4084
    if self.op.start:
4085
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4086
      feedback_fn("* starting instance...")
4087
      result = self.rpc.call_instance_start(pnode_name, iobj, None)
4088
      result.Raise()
4089
      if not result.data:
4090
        raise errors.OpExecError("Could not start instance")
4091

    
4092

    
4093
class LUConnectConsole(NoHooksLU):
4094
  """Connect to an instance's console.
4095

4096
  This is somewhat special in that it returns the command line that
4097
  you need to run on the master node in order to connect to the
4098
  console.
4099

4100
  """
4101
  _OP_REQP = ["instance_name"]
4102
  REQ_BGL = False
4103

    
4104
  def ExpandNames(self):
4105
    self._ExpandAndLockInstance()
4106

    
4107
  def CheckPrereq(self):
4108
    """Check prerequisites.
4109

4110
    This checks that the instance is in the cluster.
4111

4112
    """
4113
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4114
    assert self.instance is not None, \
4115
      "Cannot retrieve locked instance %s" % self.op.instance_name
4116

    
4117
  def Exec(self, feedback_fn):
4118
    """Connect to the console of an instance
4119

4120
    """
4121
    instance = self.instance
4122
    node = instance.primary_node
4123

    
4124
    node_insts = self.rpc.call_instance_list([node],
4125
                                             [instance.hypervisor])[node]
4126
    node_insts.Raise()
4127

    
4128
    if instance.name not in node_insts.data:
4129
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4130

    
4131
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4132

    
4133
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4134
    console_cmd = hyper.GetShellCommandForConsole(instance)
4135

    
4136
    # build ssh cmdline
4137
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4138

    
4139

    
4140
class LUReplaceDisks(LogicalUnit):
4141
  """Replace the disks of an instance.
4142

4143
  """
4144
  HPATH = "mirrors-replace"
4145
  HTYPE = constants.HTYPE_INSTANCE
4146
  _OP_REQP = ["instance_name", "mode", "disks"]
4147
  REQ_BGL = False
4148

    
4149
  def ExpandNames(self):
4150
    self._ExpandAndLockInstance()
4151

    
4152
    if not hasattr(self.op, "remote_node"):
4153
      self.op.remote_node = None
4154

    
4155
    ia_name = getattr(self.op, "iallocator", None)
4156
    if ia_name is not None:
4157
      if self.op.remote_node is not None:
4158
        raise errors.OpPrereqError("Give either the iallocator or the new"
4159
                                   " secondary, not both")
4160
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4161
    elif self.op.remote_node is not None:
4162
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4163
      if remote_node is None:
4164
        raise errors.OpPrereqError("Node '%s' not known" %
4165
                                   self.op.remote_node)
4166
      self.op.remote_node = remote_node
4167
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4168
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4169
    else:
4170
      self.needed_locks[locking.LEVEL_NODE] = []
4171
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4172

    
4173
  def DeclareLocks(self, level):
4174
    # If we're not already locking all nodes in the set we have to declare the
4175
    # instance's primary/secondary nodes.
4176
    if (level == locking.LEVEL_NODE and
4177
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4178
      self._LockInstancesNodes()
4179

    
4180
  def _RunAllocator(self):
4181
    """Compute a new secondary node using an IAllocator.
4182

4183
    """
4184
    ial = IAllocator(self,
4185
                     mode=constants.IALLOCATOR_MODE_RELOC,
4186
                     name=self.op.instance_name,
4187
                     relocate_from=[self.sec_node])
4188

    
4189
    ial.Run(self.op.iallocator)
4190

    
4191
    if not ial.success:
4192
      raise errors.OpPrereqError("Can't compute nodes using"
4193
                                 " iallocator '%s': %s" % (self.op.iallocator,
4194
                                                           ial.info))
4195
    if len(ial.nodes) != ial.required_nodes:
4196
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4197
                                 " of nodes (%s), required %s" %
4198
                                 (len(ial.nodes), ial.required_nodes))
4199
    self.op.remote_node = ial.nodes[0]
4200
    self.LogInfo("Selected new secondary for the instance: %s",
4201
                 self.op.remote_node)
4202

    
4203
  def BuildHooksEnv(self):
4204
    """Build hooks env.
4205

4206
    This runs on the master, the primary and all the secondaries.
4207

4208
    """
4209
    env = {
4210
      "MODE": self.op.mode,
4211
      "NEW_SECONDARY": self.op.remote_node,
4212
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4213
      }
4214
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4215
    nl = [
4216
      self.cfg.GetMasterNode(),
4217
      self.instance.primary_node,
4218
      ]
4219
    if self.op.remote_node is not None:
4220
      nl.append(self.op.remote_node)
4221
    return env, nl, nl
4222

    
4223
  def CheckPrereq(self):
4224
    """Check prerequisites.
4225

4226
    This checks that the instance is in the cluster.
4227

4228
    """
4229
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4230
    assert instance is not None, \
4231
      "Cannot retrieve locked instance %s" % self.op.instance_name
4232
    self.instance = instance
4233

    
4234
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4235
      raise errors.OpPrereqError("Instance's disk layout is not"
4236
                                 " network mirrored.")
4237

    
4238
    if len(instance.secondary_nodes) != 1:
4239
      raise errors.OpPrereqError("The instance has a strange layout,"
4240
                                 " expected one secondary but found %d" %
4241
                                 len(instance.secondary_nodes))
4242

    
4243
    self.sec_node = instance.secondary_nodes[0]
4244

    
4245
    ia_name = getattr(self.op, "iallocator", None)
4246
    if ia_name is not None:
4247
      self._RunAllocator()
4248

    
4249
    remote_node = self.op.remote_node
4250
    if remote_node is not None:
4251
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4252
      assert self.remote_node_info is not None, \
4253
        "Cannot retrieve locked node %s" % remote_node
4254
    else:
4255
      self.remote_node_info = None
4256
    if remote_node == instance.primary_node:
4257
      raise errors.OpPrereqError("The specified node is the primary node of"
4258
                                 " the instance.")
4259
    elif remote_node == self.sec_node:
4260
      if self.op.mode == constants.REPLACE_DISK_SEC:
4261
        # this is for DRBD8, where we can't execute the same mode of
4262
        # replacement as for drbd7 (no different port allocated)
4263
        raise errors.OpPrereqError("Same secondary given, cannot execute"
4264
                                   " replacement")
4265
    if instance.disk_template == constants.DT_DRBD8:
4266
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4267
          remote_node is not None):
4268
        # switch to replace secondary mode
4269
        self.op.mode = constants.REPLACE_DISK_SEC
4270

    
4271
      if self.op.mode == constants.REPLACE_DISK_ALL:
4272
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4273
                                   " secondary disk replacement, not"
4274
                                   " both at once")
4275
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4276
        if remote_node is not None:
4277
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4278
                                     " the secondary while doing a primary"
4279
                                     " node disk replacement")
4280
        self.tgt_node = instance.primary_node
4281
        self.oth_node = instance.secondary_nodes[0]
4282
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4283
        self.new_node = remote_node # this can be None, in which case
4284
                                    # we don't change the secondary
4285
        self.tgt_node = instance.secondary_nodes[0]
4286
        self.oth_node = instance.primary_node
4287
      else:
4288
        raise errors.ProgrammerError("Unhandled disk replace mode")
4289

    
4290
    if not self.op.disks:
4291
      self.op.disks = range(len(instance.disks))
4292

    
4293
    for disk_idx in self.op.disks:
4294
      instance.FindDisk(disk_idx)
4295

    
4296
  def _ExecD8DiskOnly(self, feedback_fn):
4297
    """Replace a disk on the primary or secondary for dbrd8.
4298

4299
    The algorithm for replace is quite complicated:
4300

4301
      1. for each disk to be replaced:
4302

4303
        1. create new LVs on the target node with unique names
4304
        1. detach old LVs from the drbd device
4305
        1. rename old LVs to name_replaced.<time_t>
4306
        1. rename new LVs to old LVs
4307
        1. attach the new LVs (with the old names now) to the drbd device
4308

4309
      1. wait for sync across all devices
4310

4311
      1. for each modified disk:
4312

4313
        1. remove old LVs (which have the name name_replaces.<time_t>)
4314

4315
    Failures are not very well handled.
4316

4317
    """
4318
    steps_total = 6
4319
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4320
    instance = self.instance
4321
    iv_names = {}
4322
    vgname = self.cfg.GetVGName()
4323
    # start of work
4324
    cfg = self.cfg
4325
    tgt_node = self.tgt_node
4326
    oth_node = self.oth_node
4327

    
4328
    # Step: check device activation
4329
    self.proc.LogStep(1, steps_total, "check device existence")
4330
    info("checking volume groups")
4331
    my_vg = cfg.GetVGName()
4332
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4333
    if not results:
4334
      raise errors.OpExecError("Can't list volume groups on the nodes")
4335
    for node in oth_node, tgt_node:
4336
      res = results[node]
4337
      if res.failed or not res.data or my_vg not in res.data:
4338
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4339
                                 (my_vg, node))
4340
    for idx, dev in enumerate(instance.disks):
4341
      if idx not in self.op.disks:
4342
        continue
4343
      for node in tgt_node, oth_node:
4344
        info("checking disk/%d on %s" % (idx, node))
4345
        cfg.SetDiskID(dev, node)
4346
        if not self.rpc.call_blockdev_find(node, dev):
4347
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4348
                                   (idx, node))
4349

    
4350
    # Step: check other node consistency
4351
    self.proc.LogStep(2, steps_total, "check peer consistency")
4352
    for idx, dev in enumerate(instance.disks):
4353
      if idx not in self.op.disks:
4354
        continue
4355
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4356
      if not _CheckDiskConsistency(self, dev, oth_node,
4357
                                   oth_node==instance.primary_node):
4358
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4359
                                 " to replace disks on this node (%s)" %
4360
                                 (oth_node, tgt_node))
4361

    
4362
    # Step: create new storage
4363
    self.proc.LogStep(3, steps_total, "allocate new storage")
4364
    for idx, dev in enumerate(instance.disks):
4365
      if idx not in self.op.disks:
4366
        continue
4367
      size = dev.size
4368
      cfg.SetDiskID(dev, tgt_node)
4369
      lv_names = [".disk%d_%s" % (idx, suf)
4370
                  for suf in ["data", "meta"]]
4371
      names = _GenerateUniqueNames(self, lv_names)
4372
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4373
                             logical_id=(vgname, names[0]))
4374
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4375
                             logical_id=(vgname, names[1]))
4376
      new_lvs = [lv_data, lv_meta]
4377
      old_lvs = dev.children
4378
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4379
      info("creating new local storage on %s for %s" %
4380
           (tgt_node, dev.iv_name))
4381
      # since we *always* want to create this LV, we use the
4382
      # _Create...OnPrimary (which forces the creation), even if we
4383
      # are talking about the secondary node
4384
      for new_lv in new_lvs:
4385
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4386
                                        _GetInstanceInfoText(instance)):
4387
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4388
                                   " node '%s'" %
4389
                                   (new_lv.logical_id[1], tgt_node))
4390

    
4391
    # Step: for each lv, detach+rename*2+attach
4392
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4393
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4394
      info("detaching %s drbd from local storage" % dev.iv_name)
4395
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4396
      result.Raise()
4397
      if not result.data:
4398
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4399
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4400
      #dev.children = []
4401
      #cfg.Update(instance)
4402

    
4403
      # ok, we created the new LVs, so now we know we have the needed
4404
      # storage; as such, we proceed on the target node to rename
4405
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4406
      # using the assumption that logical_id == physical_id (which in
4407
      # turn is the unique_id on that node)
4408

    
4409
      # FIXME(iustin): use a better name for the replaced LVs
4410
      temp_suffix = int(time.time())
4411
      ren_fn = lambda d, suff: (d.physical_id[0],
4412
                                d.physical_id[1] + "_replaced-%s" % suff)
4413
      # build the rename list based on what LVs exist on the node
4414
      rlist = []
4415
      for to_ren in old_lvs:
4416
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4417
        if not find_res.failed and find_res.data is not None: # device exists
4418
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4419

    
4420
      info("renaming the old LVs on the target node")
4421
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4422
      result.Raise()
4423
      if not result.data:
4424
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4425
      # now we rename the new LVs to the old LVs
4426
      info("renaming the new LVs on the target node")
4427
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4428
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4429
      result.Raise()
4430
      if not result.data:
4431
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4432

    
4433
      for old, new in zip(old_lvs, new_lvs):
4434
        new.logical_id = old.logical_id
4435
        cfg.SetDiskID(new, tgt_node)
4436

    
4437
      for disk in old_lvs:
4438
        disk.logical_id = ren_fn(disk, temp_suffix)
4439
        cfg.SetDiskID(disk, tgt_node)
4440

    
4441
      # now that the new lvs have the old name, we can add them to the device
4442
      info("adding new mirror component on %s" % tgt_node)
4443
      result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4444
      if result.failed or not result.data:
4445
        for new_lv in new_lvs:
4446
          result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4447
          if result.failed or not result.data:
4448
            warning("Can't rollback device %s", hint="manually cleanup unused"
4449
                    " logical volumes")
4450
        raise errors.OpExecError("Can't add local storage to drbd")
4451

    
4452
      dev.children = new_lvs
4453
      cfg.Update(instance)
4454

    
4455
    # Step: wait for sync
4456

    
4457
    # this can fail as the old devices are degraded and _WaitForSync
4458
    # does a combined result over all disks, so we don't check its
4459
    # return value
4460
    self.proc.LogStep(5, steps_total, "sync devices")
4461
    _WaitForSync(self, instance, unlock=True)
4462

    
4463
    # so check manually all the devices
4464
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4465
      cfg.SetDiskID(dev, instance.primary_node)
4466
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4467
      if result.failed or result.data[5]:
4468
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4469

    
4470
    # Step: remove old storage
4471
    self.proc.LogStep(6, steps_total, "removing old storage")
4472
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4473
      info("remove logical volumes for %s" % name)
4474
      for lv in old_lvs:
4475
        cfg.SetDiskID(lv, tgt_node)
4476
        result = self.rpc.call_blockdev_remove(tgt_node, lv)
4477
        if result.failed or not result.data:
4478
          warning("Can't remove old LV", hint="manually remove unused LVs")
4479
          continue
4480

    
4481
  def _ExecD8Secondary(self, feedback_fn):
4482
    """Replace the secondary node for drbd8.
4483

4484
    The algorithm for replace is quite complicated:
4485
      - for all disks of the instance:
4486
        - create new LVs on the new node with same names
4487
        - shutdown the drbd device on the old secondary
4488
        - disconnect the drbd network on the primary
4489
        - create the drbd device on the new secondary
4490
        - network attach the drbd on the primary, using an artifice:
4491
          the drbd code for Attach() will connect to the network if it
4492
          finds a device which is connected to the good local disks but
4493
          not network enabled
4494
      - wait for sync across all devices
4495
      - remove all disks from the old secondary
4496

4497
    Failures are not very well handled.
4498

4499
    """
4500
    steps_total = 6
4501
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4502
    instance = self.instance
4503
    iv_names = {}
4504
    vgname = self.cfg.GetVGName()
4505
    # start of work
4506
    cfg = self.cfg
4507
    old_node = self.tgt_node
4508
    new_node = self.new_node
4509
    pri_node = instance.primary_node
4510

    
4511
    # Step: check device activation
4512
    self.proc.LogStep(1, steps_total, "check device existence")
4513
    info("checking volume groups")
4514
    my_vg = cfg.GetVGName()
4515
    results = self.rpc.call_vg_list([pri_node, new_node])
4516
    for node in pri_node, new_node:
4517
      res = results[node]
4518
      if res.failed or not res.data or my_vg not in res.data:
4519
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4520
                                 (my_vg, node))
4521
    for idx, dev in enumerate(instance.disks):
4522
      if idx not in self.op.disks:
4523
        continue
4524
      info("checking disk/%d on %s" % (idx, pri_node))
4525
      cfg.SetDiskID(dev, pri_node)
4526
      result = self.rpc.call_blockdev_find(pri_node, dev)
4527
      result.Raise()
4528
      if not result.data:
4529
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4530
                                 (idx, pri_node))
4531

    
4532
    # Step: check other node consistency
4533
    self.proc.LogStep(2, steps_total, "check peer consistency")
4534
    for idx, dev in enumerate(instance.disks):
4535
      if idx not in self.op.disks:
4536
        continue
4537
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4538
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4539
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4540
                                 " unsafe to replace the secondary" %
4541
                                 pri_node)
4542

    
4543
    # Step: create new storage
4544
    self.proc.LogStep(3, steps_total, "allocate new storage")
4545
    for idx, dev in enumerate(instance.disks):
4546
      size = dev.size
4547
      info("adding new local storage on %s for disk/%d" %
4548
           (new_node, idx))
4549
      # since we *always* want to create this LV, we use the
4550
      # _Create...OnPrimary (which forces the creation), even if we
4551
      # are talking about the secondary node
4552
      for new_lv in dev.children:
4553
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4554
                                        _GetInstanceInfoText(instance)):
4555
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4556
                                   " node '%s'" %
4557
                                   (new_lv.logical_id[1], new_node))
4558

    
4559
    # Step 4: dbrd minors and drbd setups changes
4560
    # after this, we must manually remove the drbd minors on both the
4561
    # error and the success paths
4562
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4563
                                   instance.name)
4564
    logging.debug("Allocated minors %s" % (minors,))
4565
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4566
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4567
      size = dev.size
4568
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4569
      # create new devices on new_node
4570
      if pri_node == dev.logical_id[0]:
4571
        new_logical_id = (pri_node, new_node,
4572
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4573
                          dev.logical_id[5])
4574
      else:
4575
        new_logical_id = (new_node, pri_node,
4576
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4577
                          dev.logical_id[5])
4578
      iv_names[idx] = (dev, dev.children, new_logical_id)
4579
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4580
                    new_logical_id)
4581
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4582
                              logical_id=new_logical_id,
4583
                              children=dev.children)
4584
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4585
                                        new_drbd, False,
4586
                                        _GetInstanceInfoText(instance)):
4587
        self.cfg.ReleaseDRBDMinors(instance.name)
4588
        raise errors.OpExecError("Failed to create new DRBD on"
4589
                                 " node '%s'" % new_node)
4590

    
4591
    for idx, dev in enumerate(instance.disks):
4592
      # we have new devices, shutdown the drbd on the old secondary
4593
      info("shutting down drbd for disk/%d on old node" % idx)
4594
      cfg.SetDiskID(dev, old_node)
4595
      result = self.rpc.call_blockdev_shutdown(old_node, dev)
4596
      if result.failed or not result.data:
4597
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4598
                hint="Please cleanup this device manually as soon as possible")
4599

    
4600
    info("detaching primary drbds from the network (=> standalone)")
4601
    done = 0
4602
    for idx, dev in enumerate(instance.disks):
4603
      cfg.SetDiskID(dev, pri_node)
4604
      # set the network part of the physical (unique in bdev terms) id
4605
      # to None, meaning detach from network
4606
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4607
      # and 'find' the device, which will 'fix' it to match the
4608
      # standalone state
4609
      result = self.rpc.call_blockdev_find(pri_node, dev)
4610
      if not result.failed and result.data:
4611
        done += 1
4612
      else:
4613
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4614
                idx)
4615

    
4616
    if not done:
4617
      # no detaches succeeded (very unlikely)
4618
      self.cfg.ReleaseDRBDMinors(instance.name)
4619
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4620

    
4621
    # if we managed to detach at least one, we update all the disks of
4622
    # the instance to point to the new secondary
4623
    info("updating instance configuration")
4624
    for dev, _, new_logical_id in iv_names.itervalues():
4625
      dev.logical_id = new_logical_id
4626
      cfg.SetDiskID(dev, pri_node)
4627
    cfg.Update(instance)
4628
    # we can remove now the temp minors as now the new values are
4629
    # written to the config file (and therefore stable)
4630
    self.cfg.ReleaseDRBDMinors(instance.name)
4631

    
4632
    # and now perform the drbd attach
4633
    info("attaching primary drbds to new secondary (standalone => connected)")
4634
    failures = []
4635
    for idx, dev in enumerate(instance.disks):
4636
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4637
      # since the attach is smart, it's enough to 'find' the device,
4638
      # it will automatically activate the network, if the physical_id
4639
      # is correct
4640
      cfg.SetDiskID(dev, pri_node)
4641
      logging.debug("Disk to attach: %s", dev)
4642
      result = self.rpc.call_blockdev_find(pri_node, dev)
4643
      if result.failed or not result.data:
4644
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4645
                "please do a gnt-instance info to see the status of disks")
4646

    
4647
    # this can fail as the old devices are degraded and _WaitForSync
4648
    # does a combined result over all disks, so we don't check its
4649
    # return value
4650
    self.proc.LogStep(5, steps_total, "sync devices")
4651
    _WaitForSync(self, instance, unlock=True)
4652

    
4653
    # so check manually all the devices
4654
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4655
      cfg.SetDiskID(dev, pri_node)
4656
      result = self.rpc.call_blockdev_find(pri_node, dev)
4657
      result.Raise()
4658
      if result.data[5]:
4659
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4660

    
4661
    self.proc.LogStep(6, steps_total, "removing old storage")
4662
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4663
      info("remove logical volumes for disk/%d" % idx)
4664
      for lv in old_lvs:
4665
        cfg.SetDiskID(lv, old_node)
4666
        result = self.rpc.call_blockdev_remove(old_node, lv)
4667
        if result.failed or not result.data:
4668
          warning("Can't remove LV on old secondary",
4669
                  hint="Cleanup stale volumes by hand")
4670

    
4671
  def Exec(self, feedback_fn):
4672
    """Execute disk replacement.
4673

4674
    This dispatches the disk replacement to the appropriate handler.
4675

4676
    """
4677
    instance = self.instance
4678

    
4679
    # Activate the instance disks if we're replacing them on a down instance
4680
    if instance.status == "down":
4681
      _StartInstanceDisks(self, instance, True)
4682

    
4683
    if instance.disk_template == constants.DT_DRBD8:
4684
      if self.op.remote_node is None:
4685
        fn = self._ExecD8DiskOnly
4686
      else:
4687
        fn = self._ExecD8Secondary
4688
    else:
4689
      raise errors.ProgrammerError("Unhandled disk replacement case")
4690

    
4691
    ret = fn(feedback_fn)
4692

    
4693
    # Deactivate the instance disks if we're replacing them on a down instance
4694
    if instance.status == "down":
4695
      _SafeShutdownInstanceDisks(self, instance)
4696

    
4697
    return ret
4698

    
4699

    
4700
class LUGrowDisk(LogicalUnit):
4701
  """Grow a disk of an instance.
4702

4703
  """
4704
  HPATH = "disk-grow"
4705
  HTYPE = constants.HTYPE_INSTANCE
4706
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4707
  REQ_BGL = False
4708

    
4709
  def ExpandNames(self):
4710
    self._ExpandAndLockInstance()
4711
    self.needed_locks[locking.LEVEL_NODE] = []
4712
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4713

    
4714
  def DeclareLocks(self, level):
4715
    if level == locking.LEVEL_NODE:
4716
      self._LockInstancesNodes()
4717

    
4718
  def BuildHooksEnv(self):
4719
    """Build hooks env.
4720

4721
    This runs on the master, the primary and all the secondaries.
4722

4723
    """
4724
    env = {
4725
      "DISK": self.op.disk,
4726
      "AMOUNT": self.op.amount,
4727
      }
4728
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4729
    nl = [
4730
      self.cfg.GetMasterNode(),
4731
      self.instance.primary_node,
4732
      ]
4733
    return env, nl, nl
4734

    
4735
  def CheckPrereq(self):
4736
    """Check prerequisites.
4737

4738
    This checks that the instance is in the cluster.
4739

4740
    """
4741
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4742
    assert instance is not None, \
4743
      "Cannot retrieve locked instance %s" % self.op.instance_name
4744

    
4745
    self.instance = instance
4746

    
4747
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4748
      raise errors.OpPrereqError("Instance's disk layout does not support"
4749
                                 " growing.")
4750

    
4751
    self.disk = instance.FindDisk(self.op.disk)
4752

    
4753
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4754
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4755
                                       instance.hypervisor)
4756
    for node in nodenames:
4757
      info = nodeinfo[node]
4758
      if info.failed or not info.data:
4759
        raise errors.OpPrereqError("Cannot get current information"
4760
                                   " from node '%s'" % node)
4761
      vg_free = info.data.get('vg_free', None)
4762
      if not isinstance(vg_free, int):
4763
        raise errors.OpPrereqError("Can't compute free disk space on"
4764
                                   " node %s" % node)
4765
      if self.op.amount > vg_free:
4766
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4767
                                   " %d MiB available, %d MiB required" %
4768
                                   (node, vg_free, self.op.amount))
4769

    
4770
  def Exec(self, feedback_fn):
4771
    """Execute disk grow.
4772

4773
    """
4774
    instance = self.instance
4775
    disk = self.disk
4776
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4777
      self.cfg.SetDiskID(disk, node)
4778
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4779
      result.Raise()
4780
      if (not result.data or not isinstance(result.data, (list, tuple)) or
4781
          len(result.data) != 2):
4782
        raise errors.OpExecError("Grow request failed to node %s" % node)
4783
      elif not result.data[0]:
4784
        raise errors.OpExecError("Grow request failed to node %s: %s" %
4785
                                 (node, result.data[1]))
4786
    disk.RecordGrow(self.op.amount)
4787
    self.cfg.Update(instance)
4788
    if self.op.wait_for_sync:
4789
      disk_abort = not _WaitForSync(self, instance)
4790
      if disk_abort:
4791
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4792
                             " status.\nPlease check the instance.")
4793

    
4794

    
4795
class LUQueryInstanceData(NoHooksLU):
4796
  """Query runtime instance data.
4797

4798
  """
4799
  _OP_REQP = ["instances", "static"]
4800
  REQ_BGL = False
4801

    
4802
  def ExpandNames(self):
4803
    self.needed_locks = {}
4804
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4805

    
4806
    if not isinstance(self.op.instances, list):
4807
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4808

    
4809
    if self.op.instances:
4810
      self.wanted_names = []
4811
      for name in self.op.instances:
4812
        full_name = self.cfg.ExpandInstanceName(name)
4813
        if full_name is None:
4814
          raise errors.OpPrereqError("Instance '%s' not known" %
4815
                                     self.op.instance_name)
4816
        self.wanted_names.append(full_name)
4817
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4818
    else:
4819
      self.wanted_names = None
4820
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4821

    
4822
    self.needed_locks[locking.LEVEL_NODE] = []
4823
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4824

    
4825
  def DeclareLocks(self, level):
4826
    if level == locking.LEVEL_NODE:
4827
      self._LockInstancesNodes()
4828

    
4829
  def CheckPrereq(self):
4830
    """Check prerequisites.
4831

4832
    This only checks the optional instance list against the existing names.
4833

4834
    """
4835
    if self.wanted_names is None:
4836
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4837

    
4838
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4839
                             in self.wanted_names]
4840
    return
4841

    
4842
  def _ComputeDiskStatus(self, instance, snode, dev):
4843
    """Compute block device status.
4844

4845
    """
4846
    static = self.op.static
4847
    if not static:
4848
      self.cfg.SetDiskID(dev, instance.primary_node)
4849
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4850
      dev_pstatus.Raise()
4851
      dev_pstatus = dev_pstatus.data
4852
    else:
4853
      dev_pstatus = None
4854

    
4855
    if dev.dev_type in constants.LDS_DRBD:
4856
      # we change the snode then (otherwise we use the one passed in)
4857
      if dev.logical_id[0] == instance.primary_node:
4858
        snode = dev.logical_id[1]
4859
      else:
4860
        snode = dev.logical_id[0]
4861

    
4862
    if snode and not static:
4863
      self.cfg.SetDiskID(dev, snode)
4864
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4865
      dev_sstatus.Raise()
4866
      dev_sstatus = dev_sstatus.data
4867
    else:
4868
      dev_sstatus = None
4869

    
4870
    if dev.children:
4871
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4872
                      for child in dev.children]
4873
    else:
4874
      dev_children = []
4875

    
4876
    data = {
4877
      "iv_name": dev.iv_name,
4878
      "dev_type": dev.dev_type,
4879
      "logical_id": dev.logical_id,
4880
      "physical_id": dev.physical_id,
4881
      "pstatus": dev_pstatus,
4882
      "sstatus": dev_sstatus,
4883
      "children": dev_children,
4884
      "mode": dev.mode,
4885
      }
4886

    
4887
    return data
4888

    
4889
  def Exec(self, feedback_fn):
4890
    """Gather and return data"""
4891
    result = {}
4892

    
4893
    cluster = self.cfg.GetClusterInfo()
4894

    
4895
    for instance in self.wanted_instances:
4896
      if not self.op.static:
4897
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4898
                                                  instance.name,
4899
                                                  instance.hypervisor)
4900
        remote_info.Raise()
4901
        remote_info = remote_info.data
4902
        if remote_info and "state" in remote_info:
4903
          remote_state = "up"
4904
        else:
4905
          remote_state = "down"
4906
      else:
4907
        remote_state = None
4908
      if instance.status == "down":
4909
        config_state = "down"
4910
      else:
4911
        config_state = "up"
4912

    
4913
      disks = [self._ComputeDiskStatus(instance, None, device)
4914
               for device in instance.disks]
4915

    
4916
      idict = {
4917
        "name": instance.name,
4918
        "config_state": config_state,
4919
        "run_state": remote_state,
4920
        "pnode": instance.primary_node,
4921
        "snodes": instance.secondary_nodes,
4922
        "os": instance.os,
4923
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4924
        "disks": disks,
4925
        "hypervisor": instance.hypervisor,
4926
        "network_port": instance.network_port,
4927
        "hv_instance": instance.hvparams,
4928
        "hv_actual": cluster.FillHV(instance),
4929
        "be_instance": instance.beparams,
4930
        "be_actual": cluster.FillBE(instance),
4931
        }
4932

    
4933
      result[instance.name] = idict
4934

    
4935
    return result
4936

    
4937

    
4938
class LUSetInstanceParams(LogicalUnit):
4939
  """Modifies an instances's parameters.
4940

4941
  """
4942
  HPATH = "instance-modify"
4943
  HTYPE = constants.HTYPE_INSTANCE
4944
  _OP_REQP = ["instance_name"]
4945
  REQ_BGL = False
4946

    
4947
  def CheckArguments(self):
4948
    if not hasattr(self.op, 'nics'):
4949
      self.op.nics = []
4950
    if not hasattr(self.op, 'disks'):
4951
      self.op.disks = []
4952
    if not hasattr(self.op, 'beparams'):
4953
      self.op.beparams = {}
4954
    if not hasattr(self.op, 'hvparams'):
4955
      self.op.hvparams = {}
4956
    self.op.force = getattr(self.op, "force", False)
4957
    if not (self.op.nics or self.op.disks or
4958
            self.op.hvparams or self.op.beparams):
4959
      raise errors.OpPrereqError("No changes submitted")
4960

    
4961
    utils.CheckBEParams(self.op.beparams)
4962

    
4963
    # Disk validation
4964
    disk_addremove = 0
4965
    for disk_op, disk_dict in self.op.disks:
4966
      if disk_op == constants.DDM_REMOVE:
4967
        disk_addremove += 1
4968
        continue
4969
      elif disk_op == constants.DDM_ADD:
4970
        disk_addremove += 1
4971
      else:
4972
        if not isinstance(disk_op, int):
4973
          raise errors.OpPrereqError("Invalid disk index")
4974
      if disk_op == constants.DDM_ADD:
4975
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4976
        if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4977
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4978
        size = disk_dict.get('size', None)
4979
        if size is None:
4980
          raise errors.OpPrereqError("Required disk parameter size missing")
4981
        try:
4982
          size = int(size)
4983
        except ValueError, err:
4984
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
4985
                                     str(err))
4986
        disk_dict['size'] = size
4987
      else:
4988
        # modification of disk
4989
        if 'size' in disk_dict:
4990
          raise errors.OpPrereqError("Disk size change not possible, use"
4991
                                     " grow-disk")
4992

    
4993
    if disk_addremove > 1:
4994
      raise errors.OpPrereqError("Only one disk add or remove operation"
4995
                                 " supported at a time")
4996

    
4997
    # NIC validation
4998
    nic_addremove = 0
4999
    for nic_op, nic_dict in self.op.nics:
5000
      if nic_op == constants.DDM_REMOVE:
5001
        nic_addremove += 1
5002
        continue
5003
      elif nic_op == constants.DDM_ADD:
5004
        nic_addremove += 1
5005
      else:
5006
        if not isinstance(nic_op, int):
5007
          raise errors.OpPrereqError("Invalid nic index")
5008

    
5009
      # nic_dict should be a dict
5010
      nic_ip = nic_dict.get('ip', None)
5011
      if nic_ip is not None:
5012
        if nic_ip.lower() == "none":
5013
          nic_dict['ip'] = None
5014
        else:
5015
          if not utils.IsValidIP(nic_ip):
5016
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5017
      # we can only check None bridges and assign the default one
5018
      nic_bridge = nic_dict.get('bridge', None)
5019
      if nic_bridge is None:
5020
        nic_dict['bridge'] = self.cfg.GetDefBridge()
5021
      # but we can validate MACs
5022
      nic_mac = nic_dict.get('mac', None)
5023
      if nic_mac is not None:
5024
        if self.cfg.IsMacInUse(nic_mac):
5025
          raise errors.OpPrereqError("MAC address %s already in use"
5026
                                     " in cluster" % nic_mac)
5027
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5028
          if not utils.IsValidMac(nic_mac):
5029
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5030
    if nic_addremove > 1:
5031
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5032
                                 " supported at a time")
5033

    
5034
  def ExpandNames(self):
5035
    self._ExpandAndLockInstance()
5036
    self.needed_locks[locking.LEVEL_NODE] = []
5037
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5038

    
5039
  def DeclareLocks(self, level):
5040
    if level == locking.LEVEL_NODE:
5041
      self._LockInstancesNodes()
5042

    
5043
  def BuildHooksEnv(self):
5044
    """Build hooks env.
5045

5046
    This runs on the master, primary and secondaries.
5047

5048
    """
5049
    args = dict()
5050
    if constants.BE_MEMORY in self.be_new:
5051
      args['memory'] = self.be_new[constants.BE_MEMORY]
5052
    if constants.BE_VCPUS in self.be_new:
5053
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5054
    # FIXME: readd disk/nic changes
5055
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5056
    nl = [self.cfg.GetMasterNode(),
5057
          self.instance.primary_node] + list(self.instance.secondary_nodes)
5058
    return env, nl, nl
5059

    
5060
  def CheckPrereq(self):
5061
    """Check prerequisites.
5062

5063
    This only checks the instance list against the existing names.
5064

5065
    """
5066
    force = self.force = self.op.force
5067

    
5068
    # checking the new params on the primary/secondary nodes
5069

    
5070
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5071
    assert self.instance is not None, \
5072
      "Cannot retrieve locked instance %s" % self.op.instance_name
5073
    pnode = self.instance.primary_node
5074
    nodelist = [pnode]
5075
    nodelist.extend(instance.secondary_nodes)
5076

    
5077
    # hvparams processing
5078
    if self.op.hvparams:
5079
      i_hvdict = copy.deepcopy(instance.hvparams)
5080
      for key, val in self.op.hvparams.iteritems():
5081
        if val == constants.VALUE_DEFAULT:
5082
          try:
5083
            del i_hvdict[key]
5084
          except KeyError:
5085
            pass
5086
        elif val == constants.VALUE_NONE:
5087
          i_hvdict[key] = None
5088
        else:
5089
          i_hvdict[key] = val
5090
      cluster = self.cfg.GetClusterInfo()
5091
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5092
                                i_hvdict)
5093
      # local check
5094
      hypervisor.GetHypervisor(
5095
        instance.hypervisor).CheckParameterSyntax(hv_new)
5096
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5097
      self.hv_new = hv_new # the new actual values
5098
      self.hv_inst = i_hvdict # the new dict (without defaults)
5099
    else:
5100
      self.hv_new = self.hv_inst = {}
5101

    
5102
    # beparams processing
5103
    if self.op.beparams:
5104
      i_bedict = copy.deepcopy(instance.beparams)
5105
      for key, val in self.op.beparams.iteritems():
5106
        if val == constants.VALUE_DEFAULT:
5107
          try:
5108
            del i_bedict[key]
5109
          except KeyError:
5110
            pass
5111
        else:
5112
          i_bedict[key] = val
5113
      cluster = self.cfg.GetClusterInfo()
5114
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5115
                                i_bedict)
5116
      self.be_new = be_new # the new actual values
5117
      self.be_inst = i_bedict # the new dict (without defaults)
5118
    else:
5119
      self.be_new = self.be_inst = {}
5120

    
5121
    self.warn = []
5122

    
5123
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5124
      mem_check_list = [pnode]
5125
      if be_new[constants.BE_AUTO_BALANCE]:
5126
        # either we changed auto_balance to yes or it was from before
5127
        mem_check_list.extend(instance.secondary_nodes)
5128
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5129
                                                  instance.hypervisor)
5130
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5131
                                         instance.hypervisor)
5132
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5133
        # Assume the primary node is unreachable and go ahead
5134
        self.warn.append("Can't get info from primary node %s" % pnode)
5135
      else:
5136
        if not instance_info.failed and instance_info.data:
5137
          current_mem = instance_info.data['memory']
5138
        else:
5139
          # Assume instance not running
5140
          # (there is a slight race condition here, but it's not very probable,
5141
          # and we have no other way to check)
5142
          current_mem = 0
5143
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5144
                    nodeinfo[pnode].data['memory_free'])
5145
        if miss_mem > 0:
5146
          raise errors.OpPrereqError("This change will prevent the instance"
5147
                                     " from starting, due to %d MB of memory"
5148
                                     " missing on its primary node" % miss_mem)
5149

    
5150
      if be_new[constants.BE_AUTO_BALANCE]:
5151
        for node, nres in instance.secondary_nodes.iteritems():
5152
          if nres.failed or not isinstance(nres.data, dict):
5153
            self.warn.append("Can't get info from secondary node %s" % node)
5154
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5155
            self.warn.append("Not enough memory to failover instance to"
5156
                             " secondary node %s" % node)
5157

    
5158
    # NIC processing
5159
    for nic_op, nic_dict in self.op.nics:
5160
      if nic_op == constants.DDM_REMOVE:
5161
        if not instance.nics:
5162
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5163
        continue
5164
      if nic_op != constants.DDM_ADD:
5165
        # an existing nic
5166
        if nic_op < 0 or nic_op >= len(instance.nics):
5167
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5168
                                     " are 0 to %d" %
5169
                                     (nic_op, len(instance.nics)))
5170
      nic_bridge = nic_dict.get('bridge', None)
5171
      if nic_bridge is not None:
5172
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5173
          msg = ("Bridge '%s' doesn't exist on one of"
5174
                 " the instance nodes" % nic_bridge)
5175
          if self.force:
5176
            self.warn.append(msg)
5177
          else:
5178
            raise errors.OpPrereqError(msg)
5179

    
5180
    # DISK processing
5181
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5182
      raise errors.OpPrereqError("Disk operations not supported for"
5183
                                 " diskless instances")
5184
    for disk_op, disk_dict in self.op.disks:
5185
      if disk_op == constants.DDM_REMOVE:
5186
        if len(instance.disks) == 1:
5187
          raise errors.OpPrereqError("Cannot remove the last disk of"
5188
                                     " an instance")
5189
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5190
        ins_l = ins_l[pnode]
5191
        if not type(ins_l) is list:
5192
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5193
        if instance.name in ins_l:
5194
          raise errors.OpPrereqError("Instance is running, can't remove"
5195
                                     " disks.")
5196

    
5197
      if (disk_op == constants.DDM_ADD and
5198
          len(instance.nics) >= constants.MAX_DISKS):
5199
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5200
                                   " add more" % constants.MAX_DISKS)
5201
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5202
        # an existing disk
5203
        if disk_op < 0 or disk_op >= len(instance.disks):
5204
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5205
                                     " are 0 to %d" %
5206
                                     (disk_op, len(instance.disks)))
5207

    
5208
    return
5209

    
5210
  def Exec(self, feedback_fn):
5211
    """Modifies an instance.
5212

5213
    All parameters take effect only at the next restart of the instance.
5214

5215
    """
5216
    # Process here the warnings from CheckPrereq, as we don't have a
5217
    # feedback_fn there.
5218
    for warn in self.warn:
5219
      feedback_fn("WARNING: %s" % warn)
5220

    
5221
    result = []
5222
    instance = self.instance
5223
    # disk changes
5224
    for disk_op, disk_dict in self.op.disks:
5225
      if disk_op == constants.DDM_REMOVE:
5226
        # remove the last disk
5227
        device = instance.disks.pop()
5228
        device_idx = len(instance.disks)
5229
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5230
          self.cfg.SetDiskID(disk, node)
5231
          result = self.rpc.call_blockdev_remove(node, disk)
5232
          if result.failed or not result.data:
5233
            self.proc.LogWarning("Could not remove disk/%d on node %s,"
5234
                                 " continuing anyway", device_idx, node)
5235
        result.append(("disk/%d" % device_idx, "remove"))
5236
      elif disk_op == constants.DDM_ADD:
5237
        # add a new disk
5238
        if instance.disk_template == constants.DT_FILE:
5239
          file_driver, file_path = instance.disks[0].logical_id
5240
          file_path = os.path.dirname(file_path)
5241
        else:
5242
          file_driver = file_path = None
5243
        disk_idx_base = len(instance.disks)
5244
        new_disk = _GenerateDiskTemplate(self,
5245
                                         instance.disk_template,
5246
                                         instance, instance.primary_node,
5247
                                         instance.secondary_nodes,
5248
                                         [disk_dict],
5249
                                         file_path,
5250
                                         file_driver,
5251
                                         disk_idx_base)[0]
5252
        new_disk.mode = disk_dict['mode']
5253
        instance.disks.append(new_disk)
5254
        info = _GetInstanceInfoText(instance)
5255

    
5256
        logging.info("Creating volume %s for instance %s",
5257
                     new_disk.iv_name, instance.name)
5258
        # Note: this needs to be kept in sync with _CreateDisks
5259
        #HARDCODE
5260
        for secondary_node in instance.secondary_nodes:
5261
          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5262
                                            new_disk, False, info):
5263
            self.LogWarning("Failed to create volume %s (%s) on"
5264
                            " secondary node %s!",
5265
                            new_disk.iv_name, new_disk, secondary_node)
5266
        #HARDCODE
5267
        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5268
                                        instance, new_disk, info):
5269
          self.LogWarning("Failed to create volume %s on primary!",
5270
                          new_disk.iv_name)
5271
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5272
                       (new_disk.size, new_disk.mode)))
5273
      else:
5274
        # change a given disk
5275
        instance.disks[disk_op].mode = disk_dict['mode']
5276
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5277
    # NIC changes
5278
    for nic_op, nic_dict in self.op.nics:
5279
      if nic_op == constants.DDM_REMOVE:
5280
        # remove the last nic
5281
        del instance.nics[-1]
5282
        result.append(("nic.%d" % len(instance.nics), "remove"))
5283
      elif nic_op == constants.DDM_ADD:
5284
        # add a new nic
5285
        if 'mac' not in nic_dict:
5286
          mac = constants.VALUE_GENERATE
5287
        else:
5288
          mac = nic_dict['mac']
5289
        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5290
          mac = self.cfg.GenerateMAC()
5291
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5292
                              bridge=nic_dict.get('bridge', None))
5293
        instance.nics.append(new_nic)
5294
        result.append(("nic.%d" % (len(instance.nics) - 1),
5295
                       "add:mac=%s,ip=%s,bridge=%s" %
5296
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
5297
      else:
5298
        # change a given nic
5299
        for key in 'mac', 'ip', 'bridge':
5300
          if key in nic_dict:
5301
            setattr(instance.nics[nic_op], key, nic_dict[key])
5302
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5303

    
5304
    # hvparams changes
5305
    if self.op.hvparams:
5306
      instance.hvparams = self.hv_new
5307
      for key, val in self.op.hvparams.iteritems():
5308
        result.append(("hv/%s" % key, val))
5309

    
5310
    # beparams changes
5311
    if self.op.beparams:
5312
      instance.beparams = self.be_inst
5313
      for key, val in self.op.beparams.iteritems():
5314
        result.append(("be/%s" % key, val))
5315

    
5316
    self.cfg.Update(instance)
5317

    
5318
    return result
5319

    
5320

    
5321
class LUQueryExports(NoHooksLU):
5322
  """Query the exports list
5323

5324
  """
5325
  _OP_REQP = ['nodes']
5326
  REQ_BGL = False
5327

    
5328
  def ExpandNames(self):
5329
    self.needed_locks = {}
5330
    self.share_locks[locking.LEVEL_NODE] = 1
5331
    if not self.op.nodes:
5332
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5333
    else:
5334
      self.needed_locks[locking.LEVEL_NODE] = \
5335
        _GetWantedNodes(self, self.op.nodes)
5336

    
5337
  def CheckPrereq(self):
5338
    """Check prerequisites.
5339

5340
    """
5341
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5342

    
5343
  def Exec(self, feedback_fn):
5344
    """Compute the list of all the exported system images.
5345

5346
    @rtype: dict
5347
    @return: a dictionary with the structure node->(export-list)
5348
        where export-list is a list of the instances exported on
5349
        that node.
5350

5351
    """
5352
    result = self.rpc.call_export_list(self.nodes)
5353
    result.Raise()
5354
    return result.data
5355

    
5356

    
5357
class LUExportInstance(LogicalUnit):
5358
  """Export an instance to an image in the cluster.
5359

5360
  """
5361
  HPATH = "instance-export"
5362
  HTYPE = constants.HTYPE_INSTANCE
5363
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
5364
  REQ_BGL = False
5365

    
5366
  def ExpandNames(self):
5367
    self._ExpandAndLockInstance()
5368
    # FIXME: lock only instance primary and destination node
5369
    #
5370
    # Sad but true, for now we have do lock all nodes, as we don't know where
5371
    # the previous export might be, and and in this LU we search for it and
5372
    # remove it from its current node. In the future we could fix this by:
5373
    #  - making a tasklet to search (share-lock all), then create the new one,
5374
    #    then one to remove, after
5375
    #  - removing the removal operation altoghether
5376
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5377

    
5378
  def DeclareLocks(self, level):
5379
    """Last minute lock declaration."""
5380
    # All nodes are locked anyway, so nothing to do here.
5381

    
5382
  def BuildHooksEnv(self):
5383
    """Build hooks env.
5384

5385
    This will run on the master, primary node and target node.
5386

5387
    """
5388
    env = {
5389
      "EXPORT_NODE": self.op.target_node,
5390
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5391
      }
5392
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5393
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5394
          self.op.target_node]
5395
    return env, nl, nl
5396

    
5397
  def CheckPrereq(self):
5398
    """Check prerequisites.
5399

5400
    This checks that the instance and node names are valid.
5401

5402
    """
5403
    instance_name = self.op.instance_name
5404
    self.instance = self.cfg.GetInstanceInfo(instance_name)
5405
    assert self.instance is not None, \
5406
          "Cannot retrieve locked instance %s" % self.op.instance_name
5407

    
5408
    self.dst_node = self.cfg.GetNodeInfo(
5409
      self.cfg.ExpandNodeName(self.op.target_node))
5410

    
5411
    if self.dst_node is None:
5412
      # This is wrong node name, not a non-locked node
5413
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5414

    
5415
    # instance disk type verification
5416
    for disk in self.instance.disks:
5417
      if disk.dev_type == constants.LD_FILE:
5418
        raise errors.OpPrereqError("Export not supported for instances with"
5419
                                   " file-based disks")
5420

    
5421
  def Exec(self, feedback_fn):
5422
    """Export an instance to an image in the cluster.
5423

5424
    """
5425
    instance = self.instance
5426
    dst_node = self.dst_node
5427
    src_node = instance.primary_node
5428
    if self.op.shutdown:
5429
      # shutdown the instance, but not the disks
5430
      result = self.rpc.call_instance_shutdown(src_node, instance)
5431
      result.Raise()
5432
      if not result.data:
5433
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5434
                                 (instance.name, src_node))
5435

    
5436
    vgname = self.cfg.GetVGName()
5437

    
5438
    snap_disks = []
5439

    
5440
    try:
5441
      for disk in instance.disks:
5442
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5443
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5444
        if new_dev_name.failed or not new_dev_name.data:
5445
          self.LogWarning("Could not snapshot block device %s on node %s",
5446
                          disk.logical_id[1], src_node)
5447
          snap_disks.append(False)
5448
        else:
5449
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5450
                                 logical_id=(vgname, new_dev_name.data),
5451
                                 physical_id=(vgname, new_dev_name.data),
5452
                                 iv_name=disk.iv_name)
5453
          snap_disks.append(new_dev)
5454

    
5455
    finally:
5456
      if self.op.shutdown and instance.status == "up":
5457
        result = self.rpc.call_instance_start(src_node, instance, None)
5458
        if result.failed or not result.data:
5459
          _ShutdownInstanceDisks(self, instance)
5460
          raise errors.OpExecError("Could not start instance")
5461

    
5462
    # TODO: check for size
5463

    
5464
    cluster_name = self.cfg.GetClusterName()
5465
    for idx, dev in enumerate(snap_disks):
5466
      if dev:
5467
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5468
                                               instance, cluster_name, idx)
5469
        if result.failed or not result.data:
5470
          self.LogWarning("Could not export block device %s from node %s to"
5471
                          " node %s", dev.logical_id[1], src_node,
5472
                          dst_node.name)
5473
        result = self.rpc.call_blockdev_remove(src_node, dev)
5474
        if result.failed or not result.data:
5475
          self.LogWarning("Could not remove snapshot block device %s from node"
5476
                          " %s", dev.logical_id[1], src_node)
5477

    
5478
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5479
    if result.failed or not result.data:
5480
      self.LogWarning("Could not finalize export for instance %s on node %s",
5481
                      instance.name, dst_node.name)
5482

    
5483
    nodelist = self.cfg.GetNodeList()
5484
    nodelist.remove(dst_node.name)
5485

    
5486
    # on one-node clusters nodelist will be empty after the removal
5487
    # if we proceed the backup would be removed because OpQueryExports
5488
    # substitutes an empty list with the full cluster node list.
5489
    if nodelist:
5490
      exportlist = self.rpc.call_export_list(nodelist)
5491
      for node in exportlist:
5492
        if exportlist[node].failed:
5493
          continue
5494
        if instance.name in exportlist[node].data:
5495
          if not self.rpc.call_export_remove(node, instance.name):
5496
            self.LogWarning("Could not remove older export for instance %s"
5497
                            " on node %s", instance.name, node)
5498

    
5499

    
5500
class LURemoveExport(NoHooksLU):
5501
  """Remove exports related to the named instance.
5502

5503
  """
5504
  _OP_REQP = ["instance_name"]
5505
  REQ_BGL = False
5506

    
5507
  def ExpandNames(self):
5508
    self.needed_locks = {}
5509
    # We need all nodes to be locked in order for RemoveExport to work, but we
5510
    # don't need to lock the instance itself, as nothing will happen to it (and
5511
    # we can remove exports also for a removed instance)
5512
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5513

    
5514
  def CheckPrereq(self):
5515
    """Check prerequisites.
5516
    """
5517
    pass
5518

    
5519
  def Exec(self, feedback_fn):
5520
    """Remove any export.
5521

5522
    """
5523
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5524
    # If the instance was not found we'll try with the name that was passed in.
5525
    # This will only work if it was an FQDN, though.
5526
    fqdn_warn = False
5527
    if not instance_name:
5528
      fqdn_warn = True
5529
      instance_name = self.op.instance_name
5530

    
5531
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5532
      locking.LEVEL_NODE])
5533
    found = False
5534
    for node in exportlist:
5535
      if exportlist[node].failed:
5536
        self.LogWarning("Failed to query node %s, continuing" % node)
5537
        continue
5538
      if instance_name in exportlist[node].data:
5539
        found = True
5540
        result = self.rpc.call_export_remove(node, instance_name)
5541
        if result.failed or not result.data:
5542
          logging.error("Could not remove export for instance %s"
5543
                        " on node %s", instance_name, node)
5544

    
5545
    if fqdn_warn and not found:
5546
      feedback_fn("Export not found. If trying to remove an export belonging"
5547
                  " to a deleted instance please use its Fully Qualified"
5548
                  " Domain Name.")
5549

    
5550

    
5551
class TagsLU(NoHooksLU):
5552
  """Generic tags LU.
5553

5554
  This is an abstract class which is the parent of all the other tags LUs.
5555

5556
  """
5557

    
5558
  def ExpandNames(self):
5559
    self.needed_locks = {}
5560
    if self.op.kind == constants.TAG_NODE:
5561
      name = self.cfg.ExpandNodeName(self.op.name)
5562
      if name is None:
5563
        raise errors.OpPrereqError("Invalid node name (%s)" %
5564
                                   (self.op.name,))
5565
      self.op.name = name
5566
      self.needed_locks[locking.LEVEL_NODE] = name
5567
    elif self.op.kind == constants.TAG_INSTANCE:
5568
      name = self.cfg.ExpandInstanceName(self.op.name)
5569
      if name is None:
5570
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5571
                                   (self.op.name,))
5572
      self.op.name = name
5573
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5574

    
5575
  def CheckPrereq(self):
5576
    """Check prerequisites.
5577

5578
    """
5579
    if self.op.kind == constants.TAG_CLUSTER:
5580
      self.target = self.cfg.GetClusterInfo()
5581
    elif self.op.kind == constants.TAG_NODE:
5582
      self.target = self.cfg.GetNodeInfo(self.op.name)
5583
    elif self.op.kind == constants.TAG_INSTANCE:
5584
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5585
    else:
5586
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5587
                                 str(self.op.kind))
5588

    
5589

    
5590
class LUGetTags(TagsLU):
5591
  """Returns the tags of a given object.
5592

5593
  """
5594
  _OP_REQP = ["kind", "name"]
5595
  REQ_BGL = False
5596

    
5597
  def Exec(self, feedback_fn):
5598
    """Returns the tag list.
5599

5600
    """
5601
    return list(self.target.GetTags())
5602

    
5603

    
5604
class LUSearchTags(NoHooksLU):
5605
  """Searches the tags for a given pattern.
5606

5607
  """
5608
  _OP_REQP = ["pattern"]
5609
  REQ_BGL = False
5610

    
5611
  def ExpandNames(self):
5612
    self.needed_locks = {}
5613

    
5614
  def CheckPrereq(self):
5615
    """Check prerequisites.
5616

5617
    This checks the pattern passed for validity by compiling it.
5618

5619
    """
5620
    try:
5621
      self.re = re.compile(self.op.pattern)
5622
    except re.error, err:
5623
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5624
                                 (self.op.pattern, err))
5625

    
5626
  def Exec(self, feedback_fn):
5627
    """Returns the tag list.
5628

5629
    """
5630
    cfg = self.cfg
5631
    tgts = [("/cluster", cfg.GetClusterInfo())]
5632
    ilist = cfg.GetAllInstancesInfo().values()
5633
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5634
    nlist = cfg.GetAllNodesInfo().values()
5635
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5636
    results = []
5637
    for path, target in tgts:
5638
      for tag in target.GetTags():
5639
        if self.re.search(tag):
5640
          results.append((path, tag))
5641
    return results
5642

    
5643

    
5644
class LUAddTags(TagsLU):
5645
  """Sets a tag on a given object.
5646

5647
  """
5648
  _OP_REQP = ["kind", "name", "tags"]
5649
  REQ_BGL = False
5650

    
5651
  def CheckPrereq(self):
5652
    """Check prerequisites.
5653

5654
    This checks the type and length of the tag name and value.
5655

5656
    """
5657
    TagsLU.CheckPrereq(self)
5658
    for tag in self.op.tags:
5659
      objects.TaggableObject.ValidateTag(tag)
5660

    
5661
  def Exec(self, feedback_fn):
5662
    """Sets the tag.
5663

5664
    """
5665
    try:
5666
      for tag in self.op.tags:
5667
        self.target.AddTag(tag)
5668
    except errors.TagError, err:
5669
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5670
    try:
5671
      self.cfg.Update(self.target)
5672
    except errors.ConfigurationError:
5673
      raise errors.OpRetryError("There has been a modification to the"
5674
                                " config file and the operation has been"
5675
                                " aborted. Please retry.")
5676

    
5677

    
5678
class LUDelTags(TagsLU):
5679
  """Delete a list of tags from a given object.
5680

5681
  """
5682
  _OP_REQP = ["kind", "name", "tags"]
5683
  REQ_BGL = False
5684

    
5685
  def CheckPrereq(self):
5686
    """Check prerequisites.
5687

5688
    This checks that we have the given tag.
5689

5690
    """
5691
    TagsLU.CheckPrereq(self)
5692
    for tag in self.op.tags:
5693
      objects.TaggableObject.ValidateTag(tag)
5694
    del_tags = frozenset(self.op.tags)
5695
    cur_tags = self.target.GetTags()
5696
    if not del_tags <= cur_tags:
5697
      diff_tags = del_tags - cur_tags
5698
      diff_names = ["'%s'" % tag for tag in diff_tags]
5699
      diff_names.sort()
5700
      raise errors.OpPrereqError("Tag(s) %s not found" %
5701
                                 (",".join(diff_names)))
5702

    
5703
  def Exec(self, feedback_fn):
5704
    """Remove the tag from the object.
5705

5706
    """
5707
    for tag in self.op.tags:
5708
      self.target.RemoveTag(tag)
5709
    try:
5710
      self.cfg.Update(self.target)
5711
    except errors.ConfigurationError:
5712
      raise errors.OpRetryError("There has been a modification to the"
5713
                                " config file and the operation has been"
5714
                                " aborted. Please retry.")
5715

    
5716

    
5717
class LUTestDelay(NoHooksLU):
5718
  """Sleep for a specified amount of time.
5719

5720
  This LU sleeps on the master and/or nodes for a specified amount of
5721
  time.
5722

5723
  """
5724
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5725
  REQ_BGL = False
5726

    
5727
  def ExpandNames(self):
5728
    """Expand names and set required locks.
5729

5730
    This expands the node list, if any.
5731

5732
    """
5733
    self.needed_locks = {}
5734
    if self.op.on_nodes:
5735
      # _GetWantedNodes can be used here, but is not always appropriate to use
5736
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5737
      # more information.
5738
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5739
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5740

    
5741
  def CheckPrereq(self):
5742
    """Check prerequisites.
5743

5744
    """
5745

    
5746
  def Exec(self, feedback_fn):
5747
    """Do the actual sleep.
5748

5749
    """
5750
    if self.op.on_master:
5751
      if not utils.TestDelay(self.op.duration):
5752
        raise errors.OpExecError("Error during master delay test")
5753
    if self.op.on_nodes:
5754
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5755
      if not result:
5756
        raise errors.OpExecError("Complete failure from rpc call")
5757
      for node, node_result in result.items():
5758
        node_result.Raise()
5759
        if not node_result.data:
5760
          raise errors.OpExecError("Failure during rpc call to node %s,"
5761
                                   " result: %s" % (node, node_result.data))
5762

    
5763

    
5764
class IAllocator(object):
5765
  """IAllocator framework.
5766

5767
  An IAllocator instance has three sets of attributes:
5768
    - cfg that is needed to query the cluster
5769
    - input data (all members of the _KEYS class attribute are required)
5770
    - four buffer attributes (in|out_data|text), that represent the
5771
      input (to the external script) in text and data structure format,
5772
      and the output from it, again in two formats
5773
    - the result variables from the script (success, info, nodes) for
5774
      easy usage
5775

5776
  """
5777
  _ALLO_KEYS = [
5778
    "mem_size", "disks", "disk_template",
5779
    "os", "tags", "nics", "vcpus", "hypervisor",
5780
    ]
5781
  _RELO_KEYS = [
5782
    "relocate_from",
5783
    ]
5784

    
5785
  def __init__(self, lu, mode, name, **kwargs):
5786
    self.lu = lu
5787
    # init buffer variables
5788
    self.in_text = self.out_text = self.in_data = self.out_data = None
5789
    # init all input fields so that pylint is happy
5790
    self.mode = mode
5791
    self.name = name
5792
    self.mem_size = self.disks = self.disk_template = None
5793
    self.os = self.tags = self.nics = self.vcpus = None
5794
    self.relocate_from = None
5795
    # computed fields
5796
    self.required_nodes = None
5797
    # init result fields
5798
    self.success = self.info = self.nodes = None
5799
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5800
      keyset = self._ALLO_KEYS
5801
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5802
      keyset = self._RELO_KEYS
5803
    else:
5804
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5805
                                   " IAllocator" % self.mode)
5806
    for key in kwargs:
5807
      if key not in keyset:
5808
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5809
                                     " IAllocator" % key)
5810
      setattr(self, key, kwargs[key])
5811
    for key in keyset:
5812
      if key not in kwargs:
5813
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5814
                                     " IAllocator" % key)
5815
    self._BuildInputData()
5816

    
5817
  def _ComputeClusterData(self):
5818
    """Compute the generic allocator input data.
5819

5820
    This is the data that is independent of the actual operation.
5821

5822
    """
5823
    cfg = self.lu.cfg
5824
    cluster_info = cfg.GetClusterInfo()
5825
    # cluster data
5826
    data = {
5827
      "version": 1,
5828
      "cluster_name": cfg.GetClusterName(),
5829
      "cluster_tags": list(cluster_info.GetTags()),
5830
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5831
      # we don't have job IDs
5832
      }
5833
    iinfo = cfg.GetAllInstancesInfo().values()
5834
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5835

    
5836
    # node data
5837
    node_results = {}
5838
    node_list = cfg.GetNodeList()
5839

    
5840
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5841
      hypervisor = self.hypervisor
5842
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5843
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5844

    
5845
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5846
                                           hypervisor)
5847
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5848
                       cluster_info.enabled_hypervisors)
5849
    for nname in node_list:
5850
      ninfo = cfg.GetNodeInfo(nname)
5851
      node_data[nname].Raise()
5852
      if not isinstance(node_data[nname].data, dict):
5853
        raise errors.OpExecError("Can't get data for node %s" % nname)
5854
      remote_info = node_data[nname].data
5855
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5856
                   'vg_size', 'vg_free', 'cpu_total']:
5857
        if attr not in remote_info:
5858
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5859
                                   (nname, attr))
5860
        try:
5861
          remote_info[attr] = int(remote_info[attr])
5862
        except ValueError, err:
5863
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5864
                                   " %s" % (nname, attr, str(err)))
5865
      # compute memory used by primary instances
5866
      i_p_mem = i_p_up_mem = 0
5867
      for iinfo, beinfo in i_list:
5868
        if iinfo.primary_node == nname:
5869
          i_p_mem += beinfo[constants.BE_MEMORY]
5870
          if iinfo.name not in node_iinfo[nname]:
5871
            i_used_mem = 0
5872
          else:
5873
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5874
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5875
          remote_info['memory_free'] -= max(0, i_mem_diff)
5876

    
5877
          if iinfo.status == "up":
5878
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5879

    
5880
      # compute memory used by instances
5881
      pnr = {
5882
        "tags": list(ninfo.GetTags()),
5883
        "total_memory": remote_info['memory_total'],
5884
        "reserved_memory": remote_info['memory_dom0'],
5885
        "free_memory": remote_info['memory_free'],
5886
        "i_pri_memory": i_p_mem,
5887
        "i_pri_up_memory": i_p_up_mem,
5888
        "total_disk": remote_info['vg_size'],
5889
        "free_disk": remote_info['vg_free'],
5890
        "primary_ip": ninfo.primary_ip,
5891
        "secondary_ip": ninfo.secondary_ip,
5892
        "total_cpus": remote_info['cpu_total'],
5893
        }
5894
      node_results[nname] = pnr
5895
    data["nodes"] = node_results
5896

    
5897
    # instance data
5898
    instance_data = {}
5899
    for iinfo, beinfo in i_list:
5900
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5901
                  for n in iinfo.nics]
5902
      pir = {
5903
        "tags": list(iinfo.GetTags()),
5904
        "should_run": iinfo.status == "up",
5905
        "vcpus": beinfo[constants.BE_VCPUS],
5906
        "memory": beinfo[constants.BE_MEMORY],
5907
        "os": iinfo.os,
5908
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5909
        "nics": nic_data,
5910
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5911
        "disk_template": iinfo.disk_template,
5912
        "hypervisor": iinfo.hypervisor,
5913
        }
5914
      instance_data[iinfo.name] = pir
5915

    
5916
    data["instances"] = instance_data
5917

    
5918
    self.in_data = data
5919

    
5920
  def _AddNewInstance(self):
5921
    """Add new instance data to allocator structure.
5922

5923
    This in combination with _AllocatorGetClusterData will create the
5924
    correct structure needed as input for the allocator.
5925

5926
    The checks for the completeness of the opcode must have already been
5927
    done.
5928

5929
    """
5930
    data = self.in_data
5931
    if len(self.disks) != 2:
5932
      raise errors.OpExecError("Only two-disk configurations supported")
5933

    
5934
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5935

    
5936
    if self.disk_template in constants.DTS_NET_MIRROR:
5937
      self.required_nodes = 2
5938
    else:
5939
      self.required_nodes = 1
5940
    request = {
5941
      "type": "allocate",
5942
      "name": self.name,
5943
      "disk_template": self.disk_template,
5944
      "tags": self.tags,
5945
      "os": self.os,
5946
      "vcpus": self.vcpus,
5947
      "memory": self.mem_size,
5948
      "disks": self.disks,
5949
      "disk_space_total": disk_space,
5950
      "nics": self.nics,
5951
      "required_nodes": self.required_nodes,
5952
      }
5953
    data["request"] = request
5954

    
5955
  def _AddRelocateInstance(self):
5956
    """Add relocate instance data to allocator structure.
5957

5958
    This in combination with _IAllocatorGetClusterData will create the
5959
    correct structure needed as input for the allocator.
5960

5961
    The checks for the completeness of the opcode must have already been
5962
    done.
5963

5964
    """
5965
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5966
    if instance is None:
5967
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5968
                                   " IAllocator" % self.name)
5969

    
5970
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5971
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5972

    
5973
    if len(instance.secondary_nodes) != 1:
5974
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5975

    
5976
    self.required_nodes = 1
5977
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
5978
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5979

    
5980
    request = {
5981
      "type": "relocate",
5982
      "name": self.name,
5983
      "disk_space_total": disk_space,
5984
      "required_nodes": self.required_nodes,
5985
      "relocate_from": self.relocate_from,
5986
      }
5987
    self.in_data["request"] = request
5988

    
5989
  def _BuildInputData(self):
5990
    """Build input data structures.
5991

5992
    """
5993
    self._ComputeClusterData()
5994

    
5995
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5996
      self._AddNewInstance()
5997
    else:
5998
      self._AddRelocateInstance()
5999

    
6000
    self.in_text = serializer.Dump(self.in_data)
6001

    
6002
  def Run(self, name, validate=True, call_fn=None):
6003
    """Run an instance allocator and return the results.
6004

6005
    """
6006
    if call_fn is None:
6007
      call_fn = self.lu.rpc.call_iallocator_runner
6008
    data = self.in_text
6009

    
6010
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6011
    result.Raise()
6012

    
6013
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6014
      raise errors.OpExecError("Invalid result from master iallocator runner")
6015

    
6016
    rcode, stdout, stderr, fail = result.data
6017

    
6018
    if rcode == constants.IARUN_NOTFOUND:
6019
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6020
    elif rcode == constants.IARUN_FAILURE:
6021
      raise errors.OpExecError("Instance allocator call failed: %s,"
6022
                               " output: %s" % (fail, stdout+stderr))
6023
    self.out_text = stdout
6024
    if validate:
6025
      self._ValidateResult()
6026

    
6027
  def _ValidateResult(self):
6028
    """Process the allocator results.
6029

6030
    This will process and if successful save the result in
6031
    self.out_data and the other parameters.
6032

6033
    """
6034
    try:
6035
      rdict = serializer.Load(self.out_text)
6036
    except Exception, err:
6037
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6038

    
6039
    if not isinstance(rdict, dict):
6040
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6041

    
6042
    for key in "success", "info", "nodes":
6043
      if key not in rdict:
6044
        raise errors.OpExecError("Can't parse iallocator results:"
6045
                                 " missing key '%s'" % key)
6046
      setattr(self, key, rdict[key])
6047

    
6048
    if not isinstance(rdict["nodes"], list):
6049
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6050
                               " is not a list")
6051
    self.out_data = rdict
6052

    
6053

    
6054
class LUTestAllocator(NoHooksLU):
6055
  """Run allocator tests.
6056

6057
  This LU runs the allocator tests
6058

6059
  """
6060
  _OP_REQP = ["direction", "mode", "name"]
6061

    
6062
  def CheckPrereq(self):
6063
    """Check prerequisites.
6064

6065
    This checks the opcode parameters depending on the director and mode test.
6066

6067
    """
6068
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6069
      for attr in ["name", "mem_size", "disks", "disk_template",
6070
                   "os", "tags", "nics", "vcpus"]:
6071
        if not hasattr(self.op, attr):
6072
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6073
                                     attr)
6074
      iname = self.cfg.ExpandInstanceName(self.op.name)
6075
      if iname is not None:
6076
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6077
                                   iname)
6078
      if not isinstance(self.op.nics, list):
6079
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6080
      for row in self.op.nics:
6081
        if (not isinstance(row, dict) or
6082
            "mac" not in row or
6083
            "ip" not in row or
6084
            "bridge" not in row):
6085
          raise errors.OpPrereqError("Invalid contents of the"
6086
                                     " 'nics' parameter")
6087
      if not isinstance(self.op.disks, list):
6088
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6089
      if len(self.op.disks) != 2:
6090
        raise errors.OpPrereqError("Only two-disk configurations supported")
6091
      for row in self.op.disks:
6092
        if (not isinstance(row, dict) or
6093
            "size" not in row or
6094
            not isinstance(row["size"], int) or
6095
            "mode" not in row or
6096
            row["mode"] not in ['r', 'w']):
6097
          raise errors.OpPrereqError("Invalid contents of the"
6098
                                     " 'disks' parameter")
6099
      if self.op.hypervisor is None:
6100
        self.op.hypervisor = self.cfg.GetHypervisorType()
6101
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6102
      if not hasattr(self.op, "name"):
6103
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6104
      fname = self.cfg.ExpandInstanceName(self.op.name)
6105
      if fname is None:
6106
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6107
                                   self.op.name)
6108
      self.op.name = fname
6109
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6110
    else:
6111
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6112
                                 self.op.mode)
6113

    
6114
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6115
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6116
        raise errors.OpPrereqError("Missing allocator name")
6117
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6118
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6119
                                 self.op.direction)
6120

    
6121
  def Exec(self, feedback_fn):
6122
    """Run the allocator test.
6123

6124
    """
6125
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6126
      ial = IAllocator(self,
6127
                       mode=self.op.mode,
6128
                       name=self.op.name,
6129
                       mem_size=self.op.mem_size,
6130
                       disks=self.op.disks,
6131
                       disk_template=self.op.disk_template,
6132
                       os=self.op.os,
6133
                       tags=self.op.tags,
6134
                       nics=self.op.nics,
6135
                       vcpus=self.op.vcpus,
6136
                       hypervisor=self.op.hypervisor,
6137
                       )
6138
    else:
6139
      ial = IAllocator(self,
6140
                       mode=self.op.mode,
6141
                       name=self.op.name,
6142
                       relocate_from=list(self.relocate_from),
6143
                       )
6144

    
6145
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6146
      result = ial.in_text
6147
    else:
6148
      ial.Run(self.op.allocator, validate=False)
6149
      result = ial.out_text
6150
    return result