Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ a7f5dc98

History | View | Annotate | Download (232.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396
  return wanted
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _CheckBooleanOpField(op, name):
419
  """Validates boolean opcode parameters.
420

421
  This will ensure that an opcode parameter is either a boolean value,
422
  or None (but that it always exists).
423

424
  """
425
  val = getattr(op, name, None)
426
  if not (val is None or isinstance(val, bool)):
427
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428
                               (name, str(val)))
429
  setattr(op, name, val)
430

    
431

    
432
def _CheckNodeOnline(lu, node):
433
  """Ensure that a given node is online.
434

435
  @param lu: the LU on behalf of which we make the check
436
  @param node: the node to check
437
  @raise errors.OpPrereqError: if the nodes is offline
438

439
  """
440
  if lu.cfg.GetNodeInfo(node).offline:
441
    raise errors.OpPrereqError("Can't use offline node %s" % node)
442

    
443

    
444
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445
                          memory, vcpus, nics):
446
  """Builds instance related env variables for hooks
447

448
  This builds the hook environment from individual variables.
449

450
  @type name: string
451
  @param name: the name of the instance
452
  @type primary_node: string
453
  @param primary_node: the name of the instance's primary node
454
  @type secondary_nodes: list
455
  @param secondary_nodes: list of secondary nodes as strings
456
  @type os_type: string
457
  @param os_type: the name of the instance's OS
458
  @type status: boolean
459
  @param status: the should_run status of the instance
460
  @type memory: string
461
  @param memory: the memory size of the instance
462
  @type vcpus: string
463
  @param vcpus: the count of VCPUs the instance has
464
  @type nics: list
465
  @param nics: list of tuples (ip, bridge, mac) representing
466
      the NICs the instance  has
467
  @rtype: dict
468
  @return: the hook environment for this instance
469

470
  """
471
  if status:
472
    str_status = "up"
473
  else:
474
    str_status = "down"
475
  env = {
476
    "OP_TARGET": name,
477
    "INSTANCE_NAME": name,
478
    "INSTANCE_PRIMARY": primary_node,
479
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
480
    "INSTANCE_OS_TYPE": os_type,
481
    "INSTANCE_STATUS": str_status,
482
    "INSTANCE_MEMORY": memory,
483
    "INSTANCE_VCPUS": vcpus,
484
  }
485

    
486
  if nics:
487
    nic_count = len(nics)
488
    for idx, (ip, bridge, mac) in enumerate(nics):
489
      if ip is None:
490
        ip = ""
491
      env["INSTANCE_NIC%d_IP" % idx] = ip
492
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
493
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
494
  else:
495
    nic_count = 0
496

    
497
  env["INSTANCE_NIC_COUNT"] = nic_count
498

    
499
  return env
500

    
501

    
502
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
503
  """Builds instance related env variables for hooks from an object.
504

505
  @type lu: L{LogicalUnit}
506
  @param lu: the logical unit on whose behalf we execute
507
  @type instance: L{objects.Instance}
508
  @param instance: the instance for which we should build the
509
      environment
510
  @type override: dict
511
  @param override: dictionary with key/values that will override
512
      our values
513
  @rtype: dict
514
  @return: the hook environment dictionary
515

516
  """
517
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
518
  args = {
519
    'name': instance.name,
520
    'primary_node': instance.primary_node,
521
    'secondary_nodes': instance.secondary_nodes,
522
    'os_type': instance.os,
523
    'status': instance.admin_up,
524
    'memory': bep[constants.BE_MEMORY],
525
    'vcpus': bep[constants.BE_VCPUS],
526
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
527
  }
528
  if override:
529
    args.update(override)
530
  return _BuildInstanceHookEnv(**args)
531

    
532

    
533
def _AdjustCandidatePool(lu):
534
  """Adjust the candidate pool after node operations.
535

536
  """
537
  mod_list = lu.cfg.MaintainCandidatePool()
538
  if mod_list:
539
    lu.LogInfo("Promoted nodes to master candidate role: %s",
540
               ", ".join(node.name for node in mod_list))
541
    for name in mod_list:
542
      lu.context.ReaddNode(name)
543
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
544
  if mc_now > mc_max:
545
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
546
               (mc_now, mc_max))
547

    
548

    
549
def _CheckInstanceBridgesExist(lu, instance):
550
  """Check that the brigdes needed by an instance exist.
551

552
  """
553
  # check bridges existance
554
  brlist = [nic.bridge for nic in instance.nics]
555
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
556
  result.Raise()
557
  if not result.data:
558
    raise errors.OpPrereqError("One or more target bridges %s does not"
559
                               " exist on destination node '%s'" %
560
                               (brlist, instance.primary_node))
561

    
562

    
563
class LUDestroyCluster(NoHooksLU):
564
  """Logical unit for destroying the cluster.
565

566
  """
567
  _OP_REQP = []
568

    
569
  def CheckPrereq(self):
570
    """Check prerequisites.
571

572
    This checks whether the cluster is empty.
573

574
    Any errors are signalled by raising errors.OpPrereqError.
575

576
    """
577
    master = self.cfg.GetMasterNode()
578

    
579
    nodelist = self.cfg.GetNodeList()
580
    if len(nodelist) != 1 or nodelist[0] != master:
581
      raise errors.OpPrereqError("There are still %d node(s) in"
582
                                 " this cluster." % (len(nodelist) - 1))
583
    instancelist = self.cfg.GetInstanceList()
584
    if instancelist:
585
      raise errors.OpPrereqError("There are still %d instance(s) in"
586
                                 " this cluster." % len(instancelist))
587

    
588
  def Exec(self, feedback_fn):
589
    """Destroys the cluster.
590

591
    """
592
    master = self.cfg.GetMasterNode()
593
    result = self.rpc.call_node_stop_master(master, False)
594
    result.Raise()
595
    if not result.data:
596
      raise errors.OpExecError("Could not disable the master role")
597
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
598
    utils.CreateBackup(priv_key)
599
    utils.CreateBackup(pub_key)
600
    return master
601

    
602

    
603
class LUVerifyCluster(LogicalUnit):
604
  """Verifies the cluster status.
605

606
  """
607
  HPATH = "cluster-verify"
608
  HTYPE = constants.HTYPE_CLUSTER
609
  _OP_REQP = ["skip_checks"]
610
  REQ_BGL = False
611

    
612
  def ExpandNames(self):
613
    self.needed_locks = {
614
      locking.LEVEL_NODE: locking.ALL_SET,
615
      locking.LEVEL_INSTANCE: locking.ALL_SET,
616
    }
617
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
618

    
619
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
620
                  node_result, feedback_fn, master_files,
621
                  drbd_map):
622
    """Run multiple tests against a node.
623

624
    Test list:
625

626
      - compares ganeti version
627
      - checks vg existance and size > 20G
628
      - checks config file checksum
629
      - checks ssh to other nodes
630

631
    @type nodeinfo: L{objects.Node}
632
    @param nodeinfo: the node to check
633
    @param file_list: required list of files
634
    @param local_cksum: dictionary of local files and their checksums
635
    @param node_result: the results from the node
636
    @param feedback_fn: function used to accumulate results
637
    @param master_files: list of files that only masters should have
638
    @param drbd_map: the useddrbd minors for this node, in
639
        form of minor: (instance, must_exist) which correspond to instances
640
        and their running status
641

642
    """
643
    node = nodeinfo.name
644

    
645
    # main result, node_result should be a non-empty dict
646
    if not node_result or not isinstance(node_result, dict):
647
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
648
      return True
649

    
650
    # compares ganeti version
651
    local_version = constants.PROTOCOL_VERSION
652
    remote_version = node_result.get('version', None)
653
    if not remote_version:
654
      feedback_fn("  - ERROR: connection to %s failed" % (node))
655
      return True
656

    
657
    if local_version != remote_version:
658
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
659
                      (local_version, node, remote_version))
660
      return True
661

    
662
    # checks vg existance and size > 20G
663

    
664
    bad = False
665
    vglist = node_result.get(constants.NV_VGLIST, None)
666
    if not vglist:
667
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
668
                      (node,))
669
      bad = True
670
    else:
671
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
672
                                            constants.MIN_VG_SIZE)
673
      if vgstatus:
674
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
675
        bad = True
676

    
677
    # checks config file checksum
678

    
679
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
680
    if not isinstance(remote_cksum, dict):
681
      bad = True
682
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
683
    else:
684
      for file_name in file_list:
685
        node_is_mc = nodeinfo.master_candidate
686
        must_have_file = file_name not in master_files
687
        if file_name not in remote_cksum:
688
          if node_is_mc or must_have_file:
689
            bad = True
690
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
691
        elif remote_cksum[file_name] != local_cksum[file_name]:
692
          if node_is_mc or must_have_file:
693
            bad = True
694
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
695
          else:
696
            # not candidate and this is not a must-have file
697
            bad = True
698
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
699
                        " '%s'" % file_name)
700
        else:
701
          # all good, except non-master/non-must have combination
702
          if not node_is_mc and not must_have_file:
703
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
704
                        " candidates" % file_name)
705

    
706
    # checks ssh to any
707

    
708
    if constants.NV_NODELIST not in node_result:
709
      bad = True
710
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
711
    else:
712
      if node_result[constants.NV_NODELIST]:
713
        bad = True
714
        for node in node_result[constants.NV_NODELIST]:
715
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
716
                          (node, node_result[constants.NV_NODELIST][node]))
717

    
718
    if constants.NV_NODENETTEST not in node_result:
719
      bad = True
720
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
721
    else:
722
      if node_result[constants.NV_NODENETTEST]:
723
        bad = True
724
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
725
        for node in nlist:
726
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
727
                          (node, node_result[constants.NV_NODENETTEST][node]))
728

    
729
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
730
    if isinstance(hyp_result, dict):
731
      for hv_name, hv_result in hyp_result.iteritems():
732
        if hv_result is not None:
733
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
734
                      (hv_name, hv_result))
735

    
736
    # check used drbd list
737
    used_minors = node_result.get(constants.NV_DRBDLIST, [])
738
    for minor, (iname, must_exist) in drbd_map.items():
739
      if minor not in used_minors and must_exist:
740
        feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
741
                    (minor, iname))
742
        bad = True
743
    for minor in used_minors:
744
      if minor not in drbd_map:
745
        feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
746
        bad = True
747

    
748
    return bad
749

    
750
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
751
                      node_instance, feedback_fn, n_offline):
752
    """Verify an instance.
753

754
    This function checks to see if the required block devices are
755
    available on the instance's node.
756

757
    """
758
    bad = False
759

    
760
    node_current = instanceconfig.primary_node
761

    
762
    node_vol_should = {}
763
    instanceconfig.MapLVsByNode(node_vol_should)
764

    
765
    for node in node_vol_should:
766
      if node in n_offline:
767
        # ignore missing volumes on offline nodes
768
        continue
769
      for volume in node_vol_should[node]:
770
        if node not in node_vol_is or volume not in node_vol_is[node]:
771
          feedback_fn("  - ERROR: volume %s missing on node %s" %
772
                          (volume, node))
773
          bad = True
774

    
775
    if instanceconfig.admin_up:
776
      if ((node_current not in node_instance or
777
          not instance in node_instance[node_current]) and
778
          node_current not in n_offline):
779
        feedback_fn("  - ERROR: instance %s not running on node %s" %
780
                        (instance, node_current))
781
        bad = True
782

    
783
    for node in node_instance:
784
      if (not node == node_current):
785
        if instance in node_instance[node]:
786
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
787
                          (instance, node))
788
          bad = True
789

    
790
    return bad
791

    
792
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
793
    """Verify if there are any unknown volumes in the cluster.
794

795
    The .os, .swap and backup volumes are ignored. All other volumes are
796
    reported as unknown.
797

798
    """
799
    bad = False
800

    
801
    for node in node_vol_is:
802
      for volume in node_vol_is[node]:
803
        if node not in node_vol_should or volume not in node_vol_should[node]:
804
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
805
                      (volume, node))
806
          bad = True
807
    return bad
808

    
809
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
810
    """Verify the list of running instances.
811

812
    This checks what instances are running but unknown to the cluster.
813

814
    """
815
    bad = False
816
    for node in node_instance:
817
      for runninginstance in node_instance[node]:
818
        if runninginstance not in instancelist:
819
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
820
                          (runninginstance, node))
821
          bad = True
822
    return bad
823

    
824
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
825
    """Verify N+1 Memory Resilience.
826

827
    Check that if one single node dies we can still start all the instances it
828
    was primary for.
829

830
    """
831
    bad = False
832

    
833
    for node, nodeinfo in node_info.iteritems():
834
      # This code checks that every node which is now listed as secondary has
835
      # enough memory to host all instances it is supposed to should a single
836
      # other node in the cluster fail.
837
      # FIXME: not ready for failover to an arbitrary node
838
      # FIXME: does not support file-backed instances
839
      # WARNING: we currently take into account down instances as well as up
840
      # ones, considering that even if they're down someone might want to start
841
      # them even in the event of a node failure.
842
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
843
        needed_mem = 0
844
        for instance in instances:
845
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
846
          if bep[constants.BE_AUTO_BALANCE]:
847
            needed_mem += bep[constants.BE_MEMORY]
848
        if nodeinfo['mfree'] < needed_mem:
849
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
850
                      " failovers should node %s fail" % (node, prinode))
851
          bad = True
852
    return bad
853

    
854
  def CheckPrereq(self):
855
    """Check prerequisites.
856

857
    Transform the list of checks we're going to skip into a set and check that
858
    all its members are valid.
859

860
    """
861
    self.skip_set = frozenset(self.op.skip_checks)
862
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
863
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
864

    
865
  def BuildHooksEnv(self):
866
    """Build hooks env.
867

868
    Cluster-Verify hooks just rone in the post phase and their failure makes
869
    the output be logged in the verify output and the verification to fail.
870

871
    """
872
    all_nodes = self.cfg.GetNodeList()
873
    # TODO: populate the environment with useful information for verify hooks
874
    env = {}
875
    return env, [], all_nodes
876

    
877
  def Exec(self, feedback_fn):
878
    """Verify integrity of cluster, performing various test on nodes.
879

880
    """
881
    bad = False
882
    feedback_fn("* Verifying global settings")
883
    for msg in self.cfg.VerifyConfig():
884
      feedback_fn("  - ERROR: %s" % msg)
885

    
886
    vg_name = self.cfg.GetVGName()
887
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
888
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
889
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
890
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
891
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
892
                        for iname in instancelist)
893
    i_non_redundant = [] # Non redundant instances
894
    i_non_a_balanced = [] # Non auto-balanced instances
895
    n_offline = [] # List of offline nodes
896
    node_volume = {}
897
    node_instance = {}
898
    node_info = {}
899
    instance_cfg = {}
900

    
901
    # FIXME: verify OS list
902
    # do local checksums
903
    master_files = [constants.CLUSTER_CONF_FILE]
904

    
905
    file_names = ssconf.SimpleStore().GetFileList()
906
    file_names.append(constants.SSL_CERT_FILE)
907
    file_names.append(constants.RAPI_CERT_FILE)
908
    file_names.extend(master_files)
909

    
910
    local_checksums = utils.FingerprintFiles(file_names)
911

    
912
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
913
    node_verify_param = {
914
      constants.NV_FILELIST: file_names,
915
      constants.NV_NODELIST: [node.name for node in nodeinfo
916
                              if not node.offline],
917
      constants.NV_HYPERVISOR: hypervisors,
918
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
919
                                  node.secondary_ip) for node in nodeinfo
920
                                 if not node.offline],
921
      constants.NV_LVLIST: vg_name,
922
      constants.NV_INSTANCELIST: hypervisors,
923
      constants.NV_VGLIST: None,
924
      constants.NV_VERSION: None,
925
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
926
      constants.NV_DRBDLIST: None,
927
      }
928
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
929
                                           self.cfg.GetClusterName())
930

    
931
    cluster = self.cfg.GetClusterInfo()
932
    master_node = self.cfg.GetMasterNode()
933
    all_drbd_map = self.cfg.ComputeDRBDMap()
934

    
935
    for node_i in nodeinfo:
936
      node = node_i.name
937
      nresult = all_nvinfo[node].data
938

    
939
      if node_i.offline:
940
        feedback_fn("* Skipping offline node %s" % (node,))
941
        n_offline.append(node)
942
        continue
943

    
944
      if node == master_node:
945
        ntype = "master"
946
      elif node_i.master_candidate:
947
        ntype = "master candidate"
948
      else:
949
        ntype = "regular"
950
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
951

    
952
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
953
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
954
        bad = True
955
        continue
956

    
957
      node_drbd = {}
958
      for minor, instance in all_drbd_map[node].items():
959
        instance = instanceinfo[instance]
960
        node_drbd[minor] = (instance.name, instance.admin_up)
961
      result = self._VerifyNode(node_i, file_names, local_checksums,
962
                                nresult, feedback_fn, master_files,
963
                                node_drbd)
964
      bad = bad or result
965

    
966
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
967
      if isinstance(lvdata, basestring):
968
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
969
                    (node, lvdata.encode('string_escape')))
970
        bad = True
971
        node_volume[node] = {}
972
      elif not isinstance(lvdata, dict):
973
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
974
        bad = True
975
        continue
976
      else:
977
        node_volume[node] = lvdata
978

    
979
      # node_instance
980
      idata = nresult.get(constants.NV_INSTANCELIST, None)
981
      if not isinstance(idata, list):
982
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
983
                    (node,))
984
        bad = True
985
        continue
986

    
987
      node_instance[node] = idata
988

    
989
      # node_info
990
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
991
      if not isinstance(nodeinfo, dict):
992
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
993
        bad = True
994
        continue
995

    
996
      try:
997
        node_info[node] = {
998
          "mfree": int(nodeinfo['memory_free']),
999
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1000
          "pinst": [],
1001
          "sinst": [],
1002
          # dictionary holding all instances this node is secondary for,
1003
          # grouped by their primary node. Each key is a cluster node, and each
1004
          # value is a list of instances which have the key as primary and the
1005
          # current node as secondary.  this is handy to calculate N+1 memory
1006
          # availability if you can only failover from a primary to its
1007
          # secondary.
1008
          "sinst-by-pnode": {},
1009
        }
1010
      except ValueError:
1011
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1012
        bad = True
1013
        continue
1014

    
1015
    node_vol_should = {}
1016

    
1017
    for instance in instancelist:
1018
      feedback_fn("* Verifying instance %s" % instance)
1019
      inst_config = instanceinfo[instance]
1020
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1021
                                     node_instance, feedback_fn, n_offline)
1022
      bad = bad or result
1023
      inst_nodes_offline = []
1024

    
1025
      inst_config.MapLVsByNode(node_vol_should)
1026

    
1027
      instance_cfg[instance] = inst_config
1028

    
1029
      pnode = inst_config.primary_node
1030
      if pnode in node_info:
1031
        node_info[pnode]['pinst'].append(instance)
1032
      elif pnode not in n_offline:
1033
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1034
                    " %s failed" % (instance, pnode))
1035
        bad = True
1036

    
1037
      if pnode in n_offline:
1038
        inst_nodes_offline.append(pnode)
1039

    
1040
      # If the instance is non-redundant we cannot survive losing its primary
1041
      # node, so we are not N+1 compliant. On the other hand we have no disk
1042
      # templates with more than one secondary so that situation is not well
1043
      # supported either.
1044
      # FIXME: does not support file-backed instances
1045
      if len(inst_config.secondary_nodes) == 0:
1046
        i_non_redundant.append(instance)
1047
      elif len(inst_config.secondary_nodes) > 1:
1048
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1049
                    % instance)
1050

    
1051
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1052
        i_non_a_balanced.append(instance)
1053

    
1054
      for snode in inst_config.secondary_nodes:
1055
        if snode in node_info:
1056
          node_info[snode]['sinst'].append(instance)
1057
          if pnode not in node_info[snode]['sinst-by-pnode']:
1058
            node_info[snode]['sinst-by-pnode'][pnode] = []
1059
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1060
        elif snode not in n_offline:
1061
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1062
                      " %s failed" % (instance, snode))
1063
          bad = True
1064
        if snode in n_offline:
1065
          inst_nodes_offline.append(snode)
1066

    
1067
      if inst_nodes_offline:
1068
        # warn that the instance lives on offline nodes, and set bad=True
1069
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1070
                    ", ".join(inst_nodes_offline))
1071
        bad = True
1072

    
1073
    feedback_fn("* Verifying orphan volumes")
1074
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1075
                                       feedback_fn)
1076
    bad = bad or result
1077

    
1078
    feedback_fn("* Verifying remaining instances")
1079
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1080
                                         feedback_fn)
1081
    bad = bad or result
1082

    
1083
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1084
      feedback_fn("* Verifying N+1 Memory redundancy")
1085
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1086
      bad = bad or result
1087

    
1088
    feedback_fn("* Other Notes")
1089
    if i_non_redundant:
1090
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1091
                  % len(i_non_redundant))
1092

    
1093
    if i_non_a_balanced:
1094
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1095
                  % len(i_non_a_balanced))
1096

    
1097
    if n_offline:
1098
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1099

    
1100
    return not bad
1101

    
1102
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1103
    """Analize the post-hooks' result
1104

1105
    This method analyses the hook result, handles it, and sends some
1106
    nicely-formatted feedback back to the user.
1107

1108
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1109
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1110
    @param hooks_results: the results of the multi-node hooks rpc call
1111
    @param feedback_fn: function used send feedback back to the caller
1112
    @param lu_result: previous Exec result
1113
    @return: the new Exec result, based on the previous result
1114
        and hook results
1115

1116
    """
1117
    # We only really run POST phase hooks, and are only interested in
1118
    # their results
1119
    if phase == constants.HOOKS_PHASE_POST:
1120
      # Used to change hooks' output to proper indentation
1121
      indent_re = re.compile('^', re.M)
1122
      feedback_fn("* Hooks Results")
1123
      if not hooks_results:
1124
        feedback_fn("  - ERROR: general communication failure")
1125
        lu_result = 1
1126
      else:
1127
        for node_name in hooks_results:
1128
          show_node_header = True
1129
          res = hooks_results[node_name]
1130
          if res.failed or res.data is False or not isinstance(res.data, list):
1131
            if res.offline:
1132
              # no need to warn or set fail return value
1133
              continue
1134
            feedback_fn("    Communication failure in hooks execution")
1135
            lu_result = 1
1136
            continue
1137
          for script, hkr, output in res.data:
1138
            if hkr == constants.HKR_FAIL:
1139
              # The node header is only shown once, if there are
1140
              # failing hooks on that node
1141
              if show_node_header:
1142
                feedback_fn("  Node %s:" % node_name)
1143
                show_node_header = False
1144
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1145
              output = indent_re.sub('      ', output)
1146
              feedback_fn("%s" % output)
1147
              lu_result = 1
1148

    
1149
      return lu_result
1150

    
1151

    
1152
class LUVerifyDisks(NoHooksLU):
1153
  """Verifies the cluster disks status.
1154

1155
  """
1156
  _OP_REQP = []
1157
  REQ_BGL = False
1158

    
1159
  def ExpandNames(self):
1160
    self.needed_locks = {
1161
      locking.LEVEL_NODE: locking.ALL_SET,
1162
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1163
    }
1164
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1165

    
1166
  def CheckPrereq(self):
1167
    """Check prerequisites.
1168

1169
    This has no prerequisites.
1170

1171
    """
1172
    pass
1173

    
1174
  def Exec(self, feedback_fn):
1175
    """Verify integrity of cluster disks.
1176

1177
    """
1178
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1179

    
1180
    vg_name = self.cfg.GetVGName()
1181
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1182
    instances = [self.cfg.GetInstanceInfo(name)
1183
                 for name in self.cfg.GetInstanceList()]
1184

    
1185
    nv_dict = {}
1186
    for inst in instances:
1187
      inst_lvs = {}
1188
      if (not inst.admin_up or
1189
          inst.disk_template not in constants.DTS_NET_MIRROR):
1190
        continue
1191
      inst.MapLVsByNode(inst_lvs)
1192
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1193
      for node, vol_list in inst_lvs.iteritems():
1194
        for vol in vol_list:
1195
          nv_dict[(node, vol)] = inst
1196

    
1197
    if not nv_dict:
1198
      return result
1199

    
1200
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1201

    
1202
    to_act = set()
1203
    for node in nodes:
1204
      # node_volume
1205
      lvs = node_lvs[node]
1206
      if lvs.failed:
1207
        if not lvs.offline:
1208
          self.LogWarning("Connection to node %s failed: %s" %
1209
                          (node, lvs.data))
1210
        continue
1211
      lvs = lvs.data
1212
      if isinstance(lvs, basestring):
1213
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1214
        res_nlvm[node] = lvs
1215
      elif not isinstance(lvs, dict):
1216
        logging.warning("Connection to node %s failed or invalid data"
1217
                        " returned", node)
1218
        res_nodes.append(node)
1219
        continue
1220

    
1221
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1222
        inst = nv_dict.pop((node, lv_name), None)
1223
        if (not lv_online and inst is not None
1224
            and inst.name not in res_instances):
1225
          res_instances.append(inst.name)
1226

    
1227
    # any leftover items in nv_dict are missing LVs, let's arrange the
1228
    # data better
1229
    for key, inst in nv_dict.iteritems():
1230
      if inst.name not in res_missing:
1231
        res_missing[inst.name] = []
1232
      res_missing[inst.name].append(key)
1233

    
1234
    return result
1235

    
1236

    
1237
class LURenameCluster(LogicalUnit):
1238
  """Rename the cluster.
1239

1240
  """
1241
  HPATH = "cluster-rename"
1242
  HTYPE = constants.HTYPE_CLUSTER
1243
  _OP_REQP = ["name"]
1244

    
1245
  def BuildHooksEnv(self):
1246
    """Build hooks env.
1247

1248
    """
1249
    env = {
1250
      "OP_TARGET": self.cfg.GetClusterName(),
1251
      "NEW_NAME": self.op.name,
1252
      }
1253
    mn = self.cfg.GetMasterNode()
1254
    return env, [mn], [mn]
1255

    
1256
  def CheckPrereq(self):
1257
    """Verify that the passed name is a valid one.
1258

1259
    """
1260
    hostname = utils.HostInfo(self.op.name)
1261

    
1262
    new_name = hostname.name
1263
    self.ip = new_ip = hostname.ip
1264
    old_name = self.cfg.GetClusterName()
1265
    old_ip = self.cfg.GetMasterIP()
1266
    if new_name == old_name and new_ip == old_ip:
1267
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1268
                                 " cluster has changed")
1269
    if new_ip != old_ip:
1270
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1271
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1272
                                   " reachable on the network. Aborting." %
1273
                                   new_ip)
1274

    
1275
    self.op.name = new_name
1276

    
1277
  def Exec(self, feedback_fn):
1278
    """Rename the cluster.
1279

1280
    """
1281
    clustername = self.op.name
1282
    ip = self.ip
1283

    
1284
    # shutdown the master IP
1285
    master = self.cfg.GetMasterNode()
1286
    result = self.rpc.call_node_stop_master(master, False)
1287
    if result.failed or not result.data:
1288
      raise errors.OpExecError("Could not disable the master role")
1289

    
1290
    try:
1291
      cluster = self.cfg.GetClusterInfo()
1292
      cluster.cluster_name = clustername
1293
      cluster.master_ip = ip
1294
      self.cfg.Update(cluster)
1295

    
1296
      # update the known hosts file
1297
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1298
      node_list = self.cfg.GetNodeList()
1299
      try:
1300
        node_list.remove(master)
1301
      except ValueError:
1302
        pass
1303
      result = self.rpc.call_upload_file(node_list,
1304
                                         constants.SSH_KNOWN_HOSTS_FILE)
1305
      for to_node, to_result in result.iteritems():
1306
        if to_result.failed or not to_result.data:
1307
          logging.error("Copy of file %s to node %s failed",
1308
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1309

    
1310
    finally:
1311
      result = self.rpc.call_node_start_master(master, False)
1312
      if result.failed or not result.data:
1313
        self.LogWarning("Could not re-enable the master role on"
1314
                        " the master, please restart manually.")
1315

    
1316

    
1317
def _RecursiveCheckIfLVMBased(disk):
1318
  """Check if the given disk or its children are lvm-based.
1319

1320
  @type disk: L{objects.Disk}
1321
  @param disk: the disk to check
1322
  @rtype: booleean
1323
  @return: boolean indicating whether a LD_LV dev_type was found or not
1324

1325
  """
1326
  if disk.children:
1327
    for chdisk in disk.children:
1328
      if _RecursiveCheckIfLVMBased(chdisk):
1329
        return True
1330
  return disk.dev_type == constants.LD_LV
1331

    
1332

    
1333
class LUSetClusterParams(LogicalUnit):
1334
  """Change the parameters of the cluster.
1335

1336
  """
1337
  HPATH = "cluster-modify"
1338
  HTYPE = constants.HTYPE_CLUSTER
1339
  _OP_REQP = []
1340
  REQ_BGL = False
1341

    
1342
  def CheckParameters(self):
1343
    """Check parameters
1344

1345
    """
1346
    if not hasattr(self.op, "candidate_pool_size"):
1347
      self.op.candidate_pool_size = None
1348
    if self.op.candidate_pool_size is not None:
1349
      try:
1350
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1351
      except ValueError, err:
1352
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1353
                                   str(err))
1354
      if self.op.candidate_pool_size < 1:
1355
        raise errors.OpPrereqError("At least one master candidate needed")
1356

    
1357
  def ExpandNames(self):
1358
    # FIXME: in the future maybe other cluster params won't require checking on
1359
    # all nodes to be modified.
1360
    self.needed_locks = {
1361
      locking.LEVEL_NODE: locking.ALL_SET,
1362
    }
1363
    self.share_locks[locking.LEVEL_NODE] = 1
1364

    
1365
  def BuildHooksEnv(self):
1366
    """Build hooks env.
1367

1368
    """
1369
    env = {
1370
      "OP_TARGET": self.cfg.GetClusterName(),
1371
      "NEW_VG_NAME": self.op.vg_name,
1372
      }
1373
    mn = self.cfg.GetMasterNode()
1374
    return env, [mn], [mn]
1375

    
1376
  def CheckPrereq(self):
1377
    """Check prerequisites.
1378

1379
    This checks whether the given params don't conflict and
1380
    if the given volume group is valid.
1381

1382
    """
1383
    # FIXME: This only works because there is only one parameter that can be
1384
    # changed or removed.
1385
    if self.op.vg_name is not None and not self.op.vg_name:
1386
      instances = self.cfg.GetAllInstancesInfo().values()
1387
      for inst in instances:
1388
        for disk in inst.disks:
1389
          if _RecursiveCheckIfLVMBased(disk):
1390
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1391
                                       " lvm-based instances exist")
1392

    
1393
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1394

    
1395
    # if vg_name not None, checks given volume group on all nodes
1396
    if self.op.vg_name:
1397
      vglist = self.rpc.call_vg_list(node_list)
1398
      for node in node_list:
1399
        if vglist[node].failed:
1400
          # ignoring down node
1401
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1402
          continue
1403
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1404
                                              self.op.vg_name,
1405
                                              constants.MIN_VG_SIZE)
1406
        if vgstatus:
1407
          raise errors.OpPrereqError("Error on node '%s': %s" %
1408
                                     (node, vgstatus))
1409

    
1410
    self.cluster = cluster = self.cfg.GetClusterInfo()
1411
    # validate beparams changes
1412
    if self.op.beparams:
1413
      utils.CheckBEParams(self.op.beparams)
1414
      self.new_beparams = cluster.FillDict(
1415
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1416

    
1417
    # hypervisor list/parameters
1418
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1419
    if self.op.hvparams:
1420
      if not isinstance(self.op.hvparams, dict):
1421
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1422
      for hv_name, hv_dict in self.op.hvparams.items():
1423
        if hv_name not in self.new_hvparams:
1424
          self.new_hvparams[hv_name] = hv_dict
1425
        else:
1426
          self.new_hvparams[hv_name].update(hv_dict)
1427

    
1428
    if self.op.enabled_hypervisors is not None:
1429
      self.hv_list = self.op.enabled_hypervisors
1430
    else:
1431
      self.hv_list = cluster.enabled_hypervisors
1432

    
1433
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1434
      # either the enabled list has changed, or the parameters have, validate
1435
      for hv_name, hv_params in self.new_hvparams.items():
1436
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1437
            (self.op.enabled_hypervisors and
1438
             hv_name in self.op.enabled_hypervisors)):
1439
          # either this is a new hypervisor, or its parameters have changed
1440
          hv_class = hypervisor.GetHypervisor(hv_name)
1441
          hv_class.CheckParameterSyntax(hv_params)
1442
          _CheckHVParams(self, node_list, hv_name, hv_params)
1443

    
1444
  def Exec(self, feedback_fn):
1445
    """Change the parameters of the cluster.
1446

1447
    """
1448
    if self.op.vg_name is not None:
1449
      if self.op.vg_name != self.cfg.GetVGName():
1450
        self.cfg.SetVGName(self.op.vg_name)
1451
      else:
1452
        feedback_fn("Cluster LVM configuration already in desired"
1453
                    " state, not changing")
1454
    if self.op.hvparams:
1455
      self.cluster.hvparams = self.new_hvparams
1456
    if self.op.enabled_hypervisors is not None:
1457
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1458
    if self.op.beparams:
1459
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1460
    if self.op.candidate_pool_size is not None:
1461
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1462

    
1463
    self.cfg.Update(self.cluster)
1464

    
1465
    # we want to update nodes after the cluster so that if any errors
1466
    # happen, we have recorded and saved the cluster info
1467
    if self.op.candidate_pool_size is not None:
1468
      _AdjustCandidatePool(self)
1469

    
1470

    
1471
class LURedistributeConfig(NoHooksLU):
1472
  """Force the redistribution of cluster configuration.
1473

1474
  This is a very simple LU.
1475

1476
  """
1477
  _OP_REQP = []
1478
  REQ_BGL = False
1479

    
1480
  def ExpandNames(self):
1481
    self.needed_locks = {
1482
      locking.LEVEL_NODE: locking.ALL_SET,
1483
    }
1484
    self.share_locks[locking.LEVEL_NODE] = 1
1485

    
1486
  def CheckPrereq(self):
1487
    """Check prerequisites.
1488

1489
    """
1490

    
1491
  def Exec(self, feedback_fn):
1492
    """Redistribute the configuration.
1493

1494
    """
1495
    self.cfg.Update(self.cfg.GetClusterInfo())
1496

    
1497

    
1498
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1499
  """Sleep and poll for an instance's disk to sync.
1500

1501
  """
1502
  if not instance.disks:
1503
    return True
1504

    
1505
  if not oneshot:
1506
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1507

    
1508
  node = instance.primary_node
1509

    
1510
  for dev in instance.disks:
1511
    lu.cfg.SetDiskID(dev, node)
1512

    
1513
  retries = 0
1514
  while True:
1515
    max_time = 0
1516
    done = True
1517
    cumul_degraded = False
1518
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1519
    if rstats.failed or not rstats.data:
1520
      lu.LogWarning("Can't get any data from node %s", node)
1521
      retries += 1
1522
      if retries >= 10:
1523
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1524
                                 " aborting." % node)
1525
      time.sleep(6)
1526
      continue
1527
    rstats = rstats.data
1528
    retries = 0
1529
    for i, mstat in enumerate(rstats):
1530
      if mstat is None:
1531
        lu.LogWarning("Can't compute data for node %s/%s",
1532
                           node, instance.disks[i].iv_name)
1533
        continue
1534
      # we ignore the ldisk parameter
1535
      perc_done, est_time, is_degraded, _ = mstat
1536
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1537
      if perc_done is not None:
1538
        done = False
1539
        if est_time is not None:
1540
          rem_time = "%d estimated seconds remaining" % est_time
1541
          max_time = est_time
1542
        else:
1543
          rem_time = "no time estimate"
1544
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1545
                        (instance.disks[i].iv_name, perc_done, rem_time))
1546
    if done or oneshot:
1547
      break
1548

    
1549
    time.sleep(min(60, max_time))
1550

    
1551
  if done:
1552
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1553
  return not cumul_degraded
1554

    
1555

    
1556
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1557
  """Check that mirrors are not degraded.
1558

1559
  The ldisk parameter, if True, will change the test from the
1560
  is_degraded attribute (which represents overall non-ok status for
1561
  the device(s)) to the ldisk (representing the local storage status).
1562

1563
  """
1564
  lu.cfg.SetDiskID(dev, node)
1565
  if ldisk:
1566
    idx = 6
1567
  else:
1568
    idx = 5
1569

    
1570
  result = True
1571
  if on_primary or dev.AssembleOnSecondary():
1572
    rstats = lu.rpc.call_blockdev_find(node, dev)
1573
    if rstats.failed or not rstats.data:
1574
      logging.warning("Node %s: disk degraded, not found or node down", node)
1575
      result = False
1576
    else:
1577
      result = result and (not rstats.data[idx])
1578
  if dev.children:
1579
    for child in dev.children:
1580
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1581

    
1582
  return result
1583

    
1584

    
1585
class LUDiagnoseOS(NoHooksLU):
1586
  """Logical unit for OS diagnose/query.
1587

1588
  """
1589
  _OP_REQP = ["output_fields", "names"]
1590
  REQ_BGL = False
1591
  _FIELDS_STATIC = utils.FieldSet()
1592
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1593

    
1594
  def ExpandNames(self):
1595
    if self.op.names:
1596
      raise errors.OpPrereqError("Selective OS query not supported")
1597

    
1598
    _CheckOutputFields(static=self._FIELDS_STATIC,
1599
                       dynamic=self._FIELDS_DYNAMIC,
1600
                       selected=self.op.output_fields)
1601

    
1602
    # Lock all nodes, in shared mode
1603
    self.needed_locks = {}
1604
    self.share_locks[locking.LEVEL_NODE] = 1
1605
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1606

    
1607
  def CheckPrereq(self):
1608
    """Check prerequisites.
1609

1610
    """
1611

    
1612
  @staticmethod
1613
  def _DiagnoseByOS(node_list, rlist):
1614
    """Remaps a per-node return list into an a per-os per-node dictionary
1615

1616
    @param node_list: a list with the names of all nodes
1617
    @param rlist: a map with node names as keys and OS objects as values
1618

1619
    @rtype: dict
1620
    @returns: a dictionary with osnames as keys and as value another map, with
1621
        nodes as keys and list of OS objects as values, eg::
1622

1623
          {"debian-etch": {"node1": [<object>,...],
1624
                           "node2": [<object>,]}
1625
          }
1626

1627
    """
1628
    all_os = {}
1629
    for node_name, nr in rlist.iteritems():
1630
      if nr.failed or not nr.data:
1631
        continue
1632
      for os_obj in nr.data:
1633
        if os_obj.name not in all_os:
1634
          # build a list of nodes for this os containing empty lists
1635
          # for each node in node_list
1636
          all_os[os_obj.name] = {}
1637
          for nname in node_list:
1638
            all_os[os_obj.name][nname] = []
1639
        all_os[os_obj.name][node_name].append(os_obj)
1640
    return all_os
1641

    
1642
  def Exec(self, feedback_fn):
1643
    """Compute the list of OSes.
1644

1645
    """
1646
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1647
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1648
                   if node in node_list]
1649
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1650
    if node_data == False:
1651
      raise errors.OpExecError("Can't gather the list of OSes")
1652
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1653
    output = []
1654
    for os_name, os_data in pol.iteritems():
1655
      row = []
1656
      for field in self.op.output_fields:
1657
        if field == "name":
1658
          val = os_name
1659
        elif field == "valid":
1660
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1661
        elif field == "node_status":
1662
          val = {}
1663
          for node_name, nos_list in os_data.iteritems():
1664
            val[node_name] = [(v.status, v.path) for v in nos_list]
1665
        else:
1666
          raise errors.ParameterError(field)
1667
        row.append(val)
1668
      output.append(row)
1669

    
1670
    return output
1671

    
1672

    
1673
class LURemoveNode(LogicalUnit):
1674
  """Logical unit for removing a node.
1675

1676
  """
1677
  HPATH = "node-remove"
1678
  HTYPE = constants.HTYPE_NODE
1679
  _OP_REQP = ["node_name"]
1680

    
1681
  def BuildHooksEnv(self):
1682
    """Build hooks env.
1683

1684
    This doesn't run on the target node in the pre phase as a failed
1685
    node would then be impossible to remove.
1686

1687
    """
1688
    env = {
1689
      "OP_TARGET": self.op.node_name,
1690
      "NODE_NAME": self.op.node_name,
1691
      }
1692
    all_nodes = self.cfg.GetNodeList()
1693
    all_nodes.remove(self.op.node_name)
1694
    return env, all_nodes, all_nodes
1695

    
1696
  def CheckPrereq(self):
1697
    """Check prerequisites.
1698

1699
    This checks:
1700
     - the node exists in the configuration
1701
     - it does not have primary or secondary instances
1702
     - it's not the master
1703

1704
    Any errors are signalled by raising errors.OpPrereqError.
1705

1706
    """
1707
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1708
    if node is None:
1709
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1710

    
1711
    instance_list = self.cfg.GetInstanceList()
1712

    
1713
    masternode = self.cfg.GetMasterNode()
1714
    if node.name == masternode:
1715
      raise errors.OpPrereqError("Node is the master node,"
1716
                                 " you need to failover first.")
1717

    
1718
    for instance_name in instance_list:
1719
      instance = self.cfg.GetInstanceInfo(instance_name)
1720
      if node.name in instance.all_nodes:
1721
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1722
                                   " please remove first." % instance_name)
1723
    self.op.node_name = node.name
1724
    self.node = node
1725

    
1726
  def Exec(self, feedback_fn):
1727
    """Removes the node from the cluster.
1728

1729
    """
1730
    node = self.node
1731
    logging.info("Stopping the node daemon and removing configs from node %s",
1732
                 node.name)
1733

    
1734
    self.context.RemoveNode(node.name)
1735

    
1736
    self.rpc.call_node_leave_cluster(node.name)
1737

    
1738
    # Promote nodes to master candidate as needed
1739
    _AdjustCandidatePool(self)
1740

    
1741

    
1742
class LUQueryNodes(NoHooksLU):
1743
  """Logical unit for querying nodes.
1744

1745
  """
1746
  _OP_REQP = ["output_fields", "names"]
1747
  REQ_BGL = False
1748
  _FIELDS_DYNAMIC = utils.FieldSet(
1749
    "dtotal", "dfree",
1750
    "mtotal", "mnode", "mfree",
1751
    "bootid",
1752
    "ctotal",
1753
    )
1754

    
1755
  _FIELDS_STATIC = utils.FieldSet(
1756
    "name", "pinst_cnt", "sinst_cnt",
1757
    "pinst_list", "sinst_list",
1758
    "pip", "sip", "tags",
1759
    "serial_no",
1760
    "master_candidate",
1761
    "master",
1762
    "offline",
1763
    )
1764

    
1765
  def ExpandNames(self):
1766
    _CheckOutputFields(static=self._FIELDS_STATIC,
1767
                       dynamic=self._FIELDS_DYNAMIC,
1768
                       selected=self.op.output_fields)
1769

    
1770
    self.needed_locks = {}
1771
    self.share_locks[locking.LEVEL_NODE] = 1
1772

    
1773
    if self.op.names:
1774
      self.wanted = _GetWantedNodes(self, self.op.names)
1775
    else:
1776
      self.wanted = locking.ALL_SET
1777

    
1778
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1779
    if self.do_locking:
1780
      # if we don't request only static fields, we need to lock the nodes
1781
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1782

    
1783

    
1784
  def CheckPrereq(self):
1785
    """Check prerequisites.
1786

1787
    """
1788
    # The validation of the node list is done in the _GetWantedNodes,
1789
    # if non empty, and if empty, there's no validation to do
1790
    pass
1791

    
1792
  def Exec(self, feedback_fn):
1793
    """Computes the list of nodes and their attributes.
1794

1795
    """
1796
    all_info = self.cfg.GetAllNodesInfo()
1797
    if self.do_locking:
1798
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1799
    elif self.wanted != locking.ALL_SET:
1800
      nodenames = self.wanted
1801
      missing = set(nodenames).difference(all_info.keys())
1802
      if missing:
1803
        raise errors.OpExecError(
1804
          "Some nodes were removed before retrieving their data: %s" % missing)
1805
    else:
1806
      nodenames = all_info.keys()
1807

    
1808
    nodenames = utils.NiceSort(nodenames)
1809
    nodelist = [all_info[name] for name in nodenames]
1810

    
1811
    # begin data gathering
1812

    
1813
    if self.do_locking:
1814
      live_data = {}
1815
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1816
                                          self.cfg.GetHypervisorType())
1817
      for name in nodenames:
1818
        nodeinfo = node_data[name]
1819
        if not nodeinfo.failed and nodeinfo.data:
1820
          nodeinfo = nodeinfo.data
1821
          fn = utils.TryConvert
1822
          live_data[name] = {
1823
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1824
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1825
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1826
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1827
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1828
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1829
            "bootid": nodeinfo.get('bootid', None),
1830
            }
1831
        else:
1832
          live_data[name] = {}
1833
    else:
1834
      live_data = dict.fromkeys(nodenames, {})
1835

    
1836
    node_to_primary = dict([(name, set()) for name in nodenames])
1837
    node_to_secondary = dict([(name, set()) for name in nodenames])
1838

    
1839
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1840
                             "sinst_cnt", "sinst_list"))
1841
    if inst_fields & frozenset(self.op.output_fields):
1842
      instancelist = self.cfg.GetInstanceList()
1843

    
1844
      for instance_name in instancelist:
1845
        inst = self.cfg.GetInstanceInfo(instance_name)
1846
        if inst.primary_node in node_to_primary:
1847
          node_to_primary[inst.primary_node].add(inst.name)
1848
        for secnode in inst.secondary_nodes:
1849
          if secnode in node_to_secondary:
1850
            node_to_secondary[secnode].add(inst.name)
1851

    
1852
    master_node = self.cfg.GetMasterNode()
1853

    
1854
    # end data gathering
1855

    
1856
    output = []
1857
    for node in nodelist:
1858
      node_output = []
1859
      for field in self.op.output_fields:
1860
        if field == "name":
1861
          val = node.name
1862
        elif field == "pinst_list":
1863
          val = list(node_to_primary[node.name])
1864
        elif field == "sinst_list":
1865
          val = list(node_to_secondary[node.name])
1866
        elif field == "pinst_cnt":
1867
          val = len(node_to_primary[node.name])
1868
        elif field == "sinst_cnt":
1869
          val = len(node_to_secondary[node.name])
1870
        elif field == "pip":
1871
          val = node.primary_ip
1872
        elif field == "sip":
1873
          val = node.secondary_ip
1874
        elif field == "tags":
1875
          val = list(node.GetTags())
1876
        elif field == "serial_no":
1877
          val = node.serial_no
1878
        elif field == "master_candidate":
1879
          val = node.master_candidate
1880
        elif field == "master":
1881
          val = node.name == master_node
1882
        elif field == "offline":
1883
          val = node.offline
1884
        elif self._FIELDS_DYNAMIC.Matches(field):
1885
          val = live_data[node.name].get(field, None)
1886
        else:
1887
          raise errors.ParameterError(field)
1888
        node_output.append(val)
1889
      output.append(node_output)
1890

    
1891
    return output
1892

    
1893

    
1894
class LUQueryNodeVolumes(NoHooksLU):
1895
  """Logical unit for getting volumes on node(s).
1896

1897
  """
1898
  _OP_REQP = ["nodes", "output_fields"]
1899
  REQ_BGL = False
1900
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1901
  _FIELDS_STATIC = utils.FieldSet("node")
1902

    
1903
  def ExpandNames(self):
1904
    _CheckOutputFields(static=self._FIELDS_STATIC,
1905
                       dynamic=self._FIELDS_DYNAMIC,
1906
                       selected=self.op.output_fields)
1907

    
1908
    self.needed_locks = {}
1909
    self.share_locks[locking.LEVEL_NODE] = 1
1910
    if not self.op.nodes:
1911
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1912
    else:
1913
      self.needed_locks[locking.LEVEL_NODE] = \
1914
        _GetWantedNodes(self, self.op.nodes)
1915

    
1916
  def CheckPrereq(self):
1917
    """Check prerequisites.
1918

1919
    This checks that the fields required are valid output fields.
1920

1921
    """
1922
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1923

    
1924
  def Exec(self, feedback_fn):
1925
    """Computes the list of nodes and their attributes.
1926

1927
    """
1928
    nodenames = self.nodes
1929
    volumes = self.rpc.call_node_volumes(nodenames)
1930

    
1931
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1932
             in self.cfg.GetInstanceList()]
1933

    
1934
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1935

    
1936
    output = []
1937
    for node in nodenames:
1938
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1939
        continue
1940

    
1941
      node_vols = volumes[node].data[:]
1942
      node_vols.sort(key=lambda vol: vol['dev'])
1943

    
1944
      for vol in node_vols:
1945
        node_output = []
1946
        for field in self.op.output_fields:
1947
          if field == "node":
1948
            val = node
1949
          elif field == "phys":
1950
            val = vol['dev']
1951
          elif field == "vg":
1952
            val = vol['vg']
1953
          elif field == "name":
1954
            val = vol['name']
1955
          elif field == "size":
1956
            val = int(float(vol['size']))
1957
          elif field == "instance":
1958
            for inst in ilist:
1959
              if node not in lv_by_node[inst]:
1960
                continue
1961
              if vol['name'] in lv_by_node[inst][node]:
1962
                val = inst.name
1963
                break
1964
            else:
1965
              val = '-'
1966
          else:
1967
            raise errors.ParameterError(field)
1968
          node_output.append(str(val))
1969

    
1970
        output.append(node_output)
1971

    
1972
    return output
1973

    
1974

    
1975
class LUAddNode(LogicalUnit):
1976
  """Logical unit for adding node to the cluster.
1977

1978
  """
1979
  HPATH = "node-add"
1980
  HTYPE = constants.HTYPE_NODE
1981
  _OP_REQP = ["node_name"]
1982

    
1983
  def BuildHooksEnv(self):
1984
    """Build hooks env.
1985

1986
    This will run on all nodes before, and on all nodes + the new node after.
1987

1988
    """
1989
    env = {
1990
      "OP_TARGET": self.op.node_name,
1991
      "NODE_NAME": self.op.node_name,
1992
      "NODE_PIP": self.op.primary_ip,
1993
      "NODE_SIP": self.op.secondary_ip,
1994
      }
1995
    nodes_0 = self.cfg.GetNodeList()
1996
    nodes_1 = nodes_0 + [self.op.node_name, ]
1997
    return env, nodes_0, nodes_1
1998

    
1999
  def CheckPrereq(self):
2000
    """Check prerequisites.
2001

2002
    This checks:
2003
     - the new node is not already in the config
2004
     - it is resolvable
2005
     - its parameters (single/dual homed) matches the cluster
2006

2007
    Any errors are signalled by raising errors.OpPrereqError.
2008

2009
    """
2010
    node_name = self.op.node_name
2011
    cfg = self.cfg
2012

    
2013
    dns_data = utils.HostInfo(node_name)
2014

    
2015
    node = dns_data.name
2016
    primary_ip = self.op.primary_ip = dns_data.ip
2017
    secondary_ip = getattr(self.op, "secondary_ip", None)
2018
    if secondary_ip is None:
2019
      secondary_ip = primary_ip
2020
    if not utils.IsValidIP(secondary_ip):
2021
      raise errors.OpPrereqError("Invalid secondary IP given")
2022
    self.op.secondary_ip = secondary_ip
2023

    
2024
    node_list = cfg.GetNodeList()
2025
    if not self.op.readd and node in node_list:
2026
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2027
                                 node)
2028
    elif self.op.readd and node not in node_list:
2029
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2030

    
2031
    for existing_node_name in node_list:
2032
      existing_node = cfg.GetNodeInfo(existing_node_name)
2033

    
2034
      if self.op.readd and node == existing_node_name:
2035
        if (existing_node.primary_ip != primary_ip or
2036
            existing_node.secondary_ip != secondary_ip):
2037
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2038
                                     " address configuration as before")
2039
        continue
2040

    
2041
      if (existing_node.primary_ip == primary_ip or
2042
          existing_node.secondary_ip == primary_ip or
2043
          existing_node.primary_ip == secondary_ip or
2044
          existing_node.secondary_ip == secondary_ip):
2045
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2046
                                   " existing node %s" % existing_node.name)
2047

    
2048
    # check that the type of the node (single versus dual homed) is the
2049
    # same as for the master
2050
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2051
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2052
    newbie_singlehomed = secondary_ip == primary_ip
2053
    if master_singlehomed != newbie_singlehomed:
2054
      if master_singlehomed:
2055
        raise errors.OpPrereqError("The master has no private ip but the"
2056
                                   " new node has one")
2057
      else:
2058
        raise errors.OpPrereqError("The master has a private ip but the"
2059
                                   " new node doesn't have one")
2060

    
2061
    # checks reachablity
2062
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2063
      raise errors.OpPrereqError("Node not reachable by ping")
2064

    
2065
    if not newbie_singlehomed:
2066
      # check reachability from my secondary ip to newbie's secondary ip
2067
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2068
                           source=myself.secondary_ip):
2069
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2070
                                   " based ping to noded port")
2071

    
2072
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2073
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2074
    master_candidate = mc_now < cp_size
2075

    
2076
    self.new_node = objects.Node(name=node,
2077
                                 primary_ip=primary_ip,
2078
                                 secondary_ip=secondary_ip,
2079
                                 master_candidate=master_candidate,
2080
                                 offline=False)
2081

    
2082
  def Exec(self, feedback_fn):
2083
    """Adds the new node to the cluster.
2084

2085
    """
2086
    new_node = self.new_node
2087
    node = new_node.name
2088

    
2089
    # check connectivity
2090
    result = self.rpc.call_version([node])[node]
2091
    result.Raise()
2092
    if result.data:
2093
      if constants.PROTOCOL_VERSION == result.data:
2094
        logging.info("Communication to node %s fine, sw version %s match",
2095
                     node, result.data)
2096
      else:
2097
        raise errors.OpExecError("Version mismatch master version %s,"
2098
                                 " node version %s" %
2099
                                 (constants.PROTOCOL_VERSION, result.data))
2100
    else:
2101
      raise errors.OpExecError("Cannot get version from the new node")
2102

    
2103
    # setup ssh on node
2104
    logging.info("Copy ssh key to node %s", node)
2105
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2106
    keyarray = []
2107
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2108
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2109
                priv_key, pub_key]
2110

    
2111
    for i in keyfiles:
2112
      f = open(i, 'r')
2113
      try:
2114
        keyarray.append(f.read())
2115
      finally:
2116
        f.close()
2117

    
2118
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2119
                                    keyarray[2],
2120
                                    keyarray[3], keyarray[4], keyarray[5])
2121

    
2122
    if result.failed or not result.data:
2123
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2124

    
2125
    # Add node to our /etc/hosts, and add key to known_hosts
2126
    utils.AddHostToEtcHosts(new_node.name)
2127

    
2128
    if new_node.secondary_ip != new_node.primary_ip:
2129
      result = self.rpc.call_node_has_ip_address(new_node.name,
2130
                                                 new_node.secondary_ip)
2131
      if result.failed or not result.data:
2132
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2133
                                 " you gave (%s). Please fix and re-run this"
2134
                                 " command." % new_node.secondary_ip)
2135

    
2136
    node_verify_list = [self.cfg.GetMasterNode()]
2137
    node_verify_param = {
2138
      'nodelist': [node],
2139
      # TODO: do a node-net-test as well?
2140
    }
2141

    
2142
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2143
                                       self.cfg.GetClusterName())
2144
    for verifier in node_verify_list:
2145
      if result[verifier].failed or not result[verifier].data:
2146
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2147
                                 " for remote verification" % verifier)
2148
      if result[verifier].data['nodelist']:
2149
        for failed in result[verifier].data['nodelist']:
2150
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2151
                      (verifier, result[verifier]['nodelist'][failed]))
2152
        raise errors.OpExecError("ssh/hostname verification failed.")
2153

    
2154
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2155
    # including the node just added
2156
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2157
    dist_nodes = self.cfg.GetNodeList()
2158
    if not self.op.readd:
2159
      dist_nodes.append(node)
2160
    if myself.name in dist_nodes:
2161
      dist_nodes.remove(myself.name)
2162

    
2163
    logging.debug("Copying hosts and known_hosts to all nodes")
2164
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2165
      result = self.rpc.call_upload_file(dist_nodes, fname)
2166
      for to_node, to_result in result.iteritems():
2167
        if to_result.failed or not to_result.data:
2168
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2169

    
2170
    to_copy = []
2171
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2172
      to_copy.append(constants.VNC_PASSWORD_FILE)
2173
    for fname in to_copy:
2174
      result = self.rpc.call_upload_file([node], fname)
2175
      if result[node].failed or not result[node]:
2176
        logging.error("Could not copy file %s to node %s", fname, node)
2177

    
2178
    if self.op.readd:
2179
      self.context.ReaddNode(new_node)
2180
    else:
2181
      self.context.AddNode(new_node)
2182

    
2183

    
2184
class LUSetNodeParams(LogicalUnit):
2185
  """Modifies the parameters of a node.
2186

2187
  """
2188
  HPATH = "node-modify"
2189
  HTYPE = constants.HTYPE_NODE
2190
  _OP_REQP = ["node_name"]
2191
  REQ_BGL = False
2192

    
2193
  def CheckArguments(self):
2194
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2195
    if node_name is None:
2196
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2197
    self.op.node_name = node_name
2198
    _CheckBooleanOpField(self.op, 'master_candidate')
2199
    _CheckBooleanOpField(self.op, 'offline')
2200
    if self.op.master_candidate is None and self.op.offline is None:
2201
      raise errors.OpPrereqError("Please pass at least one modification")
2202
    if self.op.offline == True and self.op.master_candidate == True:
2203
      raise errors.OpPrereqError("Can't set the node into offline and"
2204
                                 " master_candidate at the same time")
2205

    
2206
  def ExpandNames(self):
2207
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2208

    
2209
  def BuildHooksEnv(self):
2210
    """Build hooks env.
2211

2212
    This runs on the master node.
2213

2214
    """
2215
    env = {
2216
      "OP_TARGET": self.op.node_name,
2217
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2218
      "OFFLINE": str(self.op.offline),
2219
      }
2220
    nl = [self.cfg.GetMasterNode(),
2221
          self.op.node_name]
2222
    return env, nl, nl
2223

    
2224
  def CheckPrereq(self):
2225
    """Check prerequisites.
2226

2227
    This only checks the instance list against the existing names.
2228

2229
    """
2230
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2231

    
2232
    if ((self.op.master_candidate == False or self.op.offline == True)
2233
        and node.master_candidate):
2234
      # we will demote the node from master_candidate
2235
      if self.op.node_name == self.cfg.GetMasterNode():
2236
        raise errors.OpPrereqError("The master node has to be a"
2237
                                   " master candidate and online")
2238
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2239
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2240
      if num_candidates <= cp_size:
2241
        msg = ("Not enough master candidates (desired"
2242
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2243
        if self.op.force:
2244
          self.LogWarning(msg)
2245
        else:
2246
          raise errors.OpPrereqError(msg)
2247

    
2248
    if (self.op.master_candidate == True and node.offline and
2249
        not self.op.offline == False):
2250
      raise errors.OpPrereqError("Can't set an offline node to"
2251
                                 " master_candidate")
2252

    
2253
    return
2254

    
2255
  def Exec(self, feedback_fn):
2256
    """Modifies a node.
2257

2258
    """
2259
    node = self.node
2260

    
2261
    result = []
2262

    
2263
    if self.op.offline is not None:
2264
      node.offline = self.op.offline
2265
      result.append(("offline", str(self.op.offline)))
2266
      if self.op.offline == True and node.master_candidate:
2267
        node.master_candidate = False
2268
        result.append(("master_candidate", "auto-demotion due to offline"))
2269

    
2270
    if self.op.master_candidate is not None:
2271
      node.master_candidate = self.op.master_candidate
2272
      result.append(("master_candidate", str(self.op.master_candidate)))
2273
      if self.op.master_candidate == False:
2274
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2275
        if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2276
            or len(rrc.data) != 2):
2277
          self.LogWarning("Node rpc error: %s" % rrc.error)
2278
        elif not rrc.data[0]:
2279
          self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2280

    
2281
    # this will trigger configuration file update, if needed
2282
    self.cfg.Update(node)
2283
    # this will trigger job queue propagation or cleanup
2284
    if self.op.node_name != self.cfg.GetMasterNode():
2285
      self.context.ReaddNode(node)
2286

    
2287
    return result
2288

    
2289

    
2290
class LUQueryClusterInfo(NoHooksLU):
2291
  """Query cluster configuration.
2292

2293
  """
2294
  _OP_REQP = []
2295
  REQ_BGL = False
2296

    
2297
  def ExpandNames(self):
2298
    self.needed_locks = {}
2299

    
2300
  def CheckPrereq(self):
2301
    """No prerequsites needed for this LU.
2302

2303
    """
2304
    pass
2305

    
2306
  def Exec(self, feedback_fn):
2307
    """Return cluster config.
2308

2309
    """
2310
    cluster = self.cfg.GetClusterInfo()
2311
    result = {
2312
      "software_version": constants.RELEASE_VERSION,
2313
      "protocol_version": constants.PROTOCOL_VERSION,
2314
      "config_version": constants.CONFIG_VERSION,
2315
      "os_api_version": constants.OS_API_VERSION,
2316
      "export_version": constants.EXPORT_VERSION,
2317
      "architecture": (platform.architecture()[0], platform.machine()),
2318
      "name": cluster.cluster_name,
2319
      "master": cluster.master_node,
2320
      "default_hypervisor": cluster.default_hypervisor,
2321
      "enabled_hypervisors": cluster.enabled_hypervisors,
2322
      "hvparams": cluster.hvparams,
2323
      "beparams": cluster.beparams,
2324
      "candidate_pool_size": cluster.candidate_pool_size,
2325
      }
2326

    
2327
    return result
2328

    
2329

    
2330
class LUQueryConfigValues(NoHooksLU):
2331
  """Return configuration values.
2332

2333
  """
2334
  _OP_REQP = []
2335
  REQ_BGL = False
2336
  _FIELDS_DYNAMIC = utils.FieldSet()
2337
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2338

    
2339
  def ExpandNames(self):
2340
    self.needed_locks = {}
2341

    
2342
    _CheckOutputFields(static=self._FIELDS_STATIC,
2343
                       dynamic=self._FIELDS_DYNAMIC,
2344
                       selected=self.op.output_fields)
2345

    
2346
  def CheckPrereq(self):
2347
    """No prerequisites.
2348

2349
    """
2350
    pass
2351

    
2352
  def Exec(self, feedback_fn):
2353
    """Dump a representation of the cluster config to the standard output.
2354

2355
    """
2356
    values = []
2357
    for field in self.op.output_fields:
2358
      if field == "cluster_name":
2359
        entry = self.cfg.GetClusterName()
2360
      elif field == "master_node":
2361
        entry = self.cfg.GetMasterNode()
2362
      elif field == "drain_flag":
2363
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2364
      else:
2365
        raise errors.ParameterError(field)
2366
      values.append(entry)
2367
    return values
2368

    
2369

    
2370
class LUActivateInstanceDisks(NoHooksLU):
2371
  """Bring up an instance's disks.
2372

2373
  """
2374
  _OP_REQP = ["instance_name"]
2375
  REQ_BGL = False
2376

    
2377
  def ExpandNames(self):
2378
    self._ExpandAndLockInstance()
2379
    self.needed_locks[locking.LEVEL_NODE] = []
2380
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2381

    
2382
  def DeclareLocks(self, level):
2383
    if level == locking.LEVEL_NODE:
2384
      self._LockInstancesNodes()
2385

    
2386
  def CheckPrereq(self):
2387
    """Check prerequisites.
2388

2389
    This checks that the instance is in the cluster.
2390

2391
    """
2392
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2393
    assert self.instance is not None, \
2394
      "Cannot retrieve locked instance %s" % self.op.instance_name
2395
    _CheckNodeOnline(self, self.instance.primary_node)
2396

    
2397
  def Exec(self, feedback_fn):
2398
    """Activate the disks.
2399

2400
    """
2401
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2402
    if not disks_ok:
2403
      raise errors.OpExecError("Cannot activate block devices")
2404

    
2405
    return disks_info
2406

    
2407

    
2408
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2409
  """Prepare the block devices for an instance.
2410

2411
  This sets up the block devices on all nodes.
2412

2413
  @type lu: L{LogicalUnit}
2414
  @param lu: the logical unit on whose behalf we execute
2415
  @type instance: L{objects.Instance}
2416
  @param instance: the instance for whose disks we assemble
2417
  @type ignore_secondaries: boolean
2418
  @param ignore_secondaries: if true, errors on secondary nodes
2419
      won't result in an error return from the function
2420
  @return: False if the operation failed, otherwise a list of
2421
      (host, instance_visible_name, node_visible_name)
2422
      with the mapping from node devices to instance devices
2423

2424
  """
2425
  device_info = []
2426
  disks_ok = True
2427
  iname = instance.name
2428
  # With the two passes mechanism we try to reduce the window of
2429
  # opportunity for the race condition of switching DRBD to primary
2430
  # before handshaking occured, but we do not eliminate it
2431

    
2432
  # The proper fix would be to wait (with some limits) until the
2433
  # connection has been made and drbd transitions from WFConnection
2434
  # into any other network-connected state (Connected, SyncTarget,
2435
  # SyncSource, etc.)
2436

    
2437
  # 1st pass, assemble on all nodes in secondary mode
2438
  for inst_disk in instance.disks:
2439
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2440
      lu.cfg.SetDiskID(node_disk, node)
2441
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2442
      if result.failed or not result:
2443
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2444
                           " (is_primary=False, pass=1)",
2445
                           inst_disk.iv_name, node)
2446
        if not ignore_secondaries:
2447
          disks_ok = False
2448

    
2449
  # FIXME: race condition on drbd migration to primary
2450

    
2451
  # 2nd pass, do only the primary node
2452
  for inst_disk in instance.disks:
2453
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2454
      if node != instance.primary_node:
2455
        continue
2456
      lu.cfg.SetDiskID(node_disk, node)
2457
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2458
      if result.failed or not result:
2459
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2460
                           " (is_primary=True, pass=2)",
2461
                           inst_disk.iv_name, node)
2462
        disks_ok = False
2463
    device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
2464

    
2465
  # leave the disks configured for the primary node
2466
  # this is a workaround that would be fixed better by
2467
  # improving the logical/physical id handling
2468
  for disk in instance.disks:
2469
    lu.cfg.SetDiskID(disk, instance.primary_node)
2470

    
2471
  return disks_ok, device_info
2472

    
2473

    
2474
def _StartInstanceDisks(lu, instance, force):
2475
  """Start the disks of an instance.
2476

2477
  """
2478
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2479
                                           ignore_secondaries=force)
2480
  if not disks_ok:
2481
    _ShutdownInstanceDisks(lu, instance)
2482
    if force is not None and not force:
2483
      lu.proc.LogWarning("", hint="If the message above refers to a"
2484
                         " secondary node,"
2485
                         " you can retry the operation using '--force'.")
2486
    raise errors.OpExecError("Disk consistency error")
2487

    
2488

    
2489
class LUDeactivateInstanceDisks(NoHooksLU):
2490
  """Shutdown an instance's disks.
2491

2492
  """
2493
  _OP_REQP = ["instance_name"]
2494
  REQ_BGL = False
2495

    
2496
  def ExpandNames(self):
2497
    self._ExpandAndLockInstance()
2498
    self.needed_locks[locking.LEVEL_NODE] = []
2499
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2500

    
2501
  def DeclareLocks(self, level):
2502
    if level == locking.LEVEL_NODE:
2503
      self._LockInstancesNodes()
2504

    
2505
  def CheckPrereq(self):
2506
    """Check prerequisites.
2507

2508
    This checks that the instance is in the cluster.
2509

2510
    """
2511
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2512
    assert self.instance is not None, \
2513
      "Cannot retrieve locked instance %s" % self.op.instance_name
2514

    
2515
  def Exec(self, feedback_fn):
2516
    """Deactivate the disks
2517

2518
    """
2519
    instance = self.instance
2520
    _SafeShutdownInstanceDisks(self, instance)
2521

    
2522

    
2523
def _SafeShutdownInstanceDisks(lu, instance):
2524
  """Shutdown block devices of an instance.
2525

2526
  This function checks if an instance is running, before calling
2527
  _ShutdownInstanceDisks.
2528

2529
  """
2530
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2531
                                      [instance.hypervisor])
2532
  ins_l = ins_l[instance.primary_node]
2533
  if ins_l.failed or not isinstance(ins_l.data, list):
2534
    raise errors.OpExecError("Can't contact node '%s'" %
2535
                             instance.primary_node)
2536

    
2537
  if instance.name in ins_l.data:
2538
    raise errors.OpExecError("Instance is running, can't shutdown"
2539
                             " block devices.")
2540

    
2541
  _ShutdownInstanceDisks(lu, instance)
2542

    
2543

    
2544
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2545
  """Shutdown block devices of an instance.
2546

2547
  This does the shutdown on all nodes of the instance.
2548

2549
  If the ignore_primary is false, errors on the primary node are
2550
  ignored.
2551

2552
  """
2553
  result = True
2554
  for disk in instance.disks:
2555
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2556
      lu.cfg.SetDiskID(top_disk, node)
2557
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2558
      if result.failed or not result.data:
2559
        logging.error("Could not shutdown block device %s on node %s",
2560
                      disk.iv_name, node)
2561
        if not ignore_primary or node != instance.primary_node:
2562
          result = False
2563
  return result
2564

    
2565

    
2566
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2567
  """Checks if a node has enough free memory.
2568

2569
  This function check if a given node has the needed amount of free
2570
  memory. In case the node has less memory or we cannot get the
2571
  information from the node, this function raise an OpPrereqError
2572
  exception.
2573

2574
  @type lu: C{LogicalUnit}
2575
  @param lu: a logical unit from which we get configuration data
2576
  @type node: C{str}
2577
  @param node: the node to check
2578
  @type reason: C{str}
2579
  @param reason: string to use in the error message
2580
  @type requested: C{int}
2581
  @param requested: the amount of memory in MiB to check for
2582
  @type hypervisor_name: C{str}
2583
  @param hypervisor_name: the hypervisor to ask for memory stats
2584
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2585
      we cannot check the node
2586

2587
  """
2588
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2589
  nodeinfo[node].Raise()
2590
  free_mem = nodeinfo[node].data.get('memory_free')
2591
  if not isinstance(free_mem, int):
2592
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2593
                             " was '%s'" % (node, free_mem))
2594
  if requested > free_mem:
2595
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2596
                             " needed %s MiB, available %s MiB" %
2597
                             (node, reason, requested, free_mem))
2598

    
2599

    
2600
class LUStartupInstance(LogicalUnit):
2601
  """Starts an instance.
2602

2603
  """
2604
  HPATH = "instance-start"
2605
  HTYPE = constants.HTYPE_INSTANCE
2606
  _OP_REQP = ["instance_name", "force"]
2607
  REQ_BGL = False
2608

    
2609
  def ExpandNames(self):
2610
    self._ExpandAndLockInstance()
2611

    
2612
  def BuildHooksEnv(self):
2613
    """Build hooks env.
2614

2615
    This runs on master, primary and secondary nodes of the instance.
2616

2617
    """
2618
    env = {
2619
      "FORCE": self.op.force,
2620
      }
2621
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2622
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2623
    return env, nl, nl
2624

    
2625
  def CheckPrereq(self):
2626
    """Check prerequisites.
2627

2628
    This checks that the instance is in the cluster.
2629

2630
    """
2631
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2632
    assert self.instance is not None, \
2633
      "Cannot retrieve locked instance %s" % self.op.instance_name
2634

    
2635
    _CheckNodeOnline(self, instance.primary_node)
2636

    
2637
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2638
    # check bridges existance
2639
    _CheckInstanceBridgesExist(self, instance)
2640

    
2641
    _CheckNodeFreeMemory(self, instance.primary_node,
2642
                         "starting instance %s" % instance.name,
2643
                         bep[constants.BE_MEMORY], instance.hypervisor)
2644

    
2645
  def Exec(self, feedback_fn):
2646
    """Start the instance.
2647

2648
    """
2649
    instance = self.instance
2650
    force = self.op.force
2651
    extra_args = getattr(self.op, "extra_args", "")
2652

    
2653
    self.cfg.MarkInstanceUp(instance.name)
2654

    
2655
    node_current = instance.primary_node
2656

    
2657
    _StartInstanceDisks(self, instance, force)
2658

    
2659
    result = self.rpc.call_instance_start(node_current, instance, extra_args)
2660
    msg = result.RemoteFailMsg()
2661
    if msg:
2662
      _ShutdownInstanceDisks(self, instance)
2663
      raise errors.OpExecError("Could not start instance: %s" % msg)
2664

    
2665

    
2666
class LURebootInstance(LogicalUnit):
2667
  """Reboot an instance.
2668

2669
  """
2670
  HPATH = "instance-reboot"
2671
  HTYPE = constants.HTYPE_INSTANCE
2672
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2673
  REQ_BGL = False
2674

    
2675
  def ExpandNames(self):
2676
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2677
                                   constants.INSTANCE_REBOOT_HARD,
2678
                                   constants.INSTANCE_REBOOT_FULL]:
2679
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2680
                                  (constants.INSTANCE_REBOOT_SOFT,
2681
                                   constants.INSTANCE_REBOOT_HARD,
2682
                                   constants.INSTANCE_REBOOT_FULL))
2683
    self._ExpandAndLockInstance()
2684

    
2685
  def BuildHooksEnv(self):
2686
    """Build hooks env.
2687

2688
    This runs on master, primary and secondary nodes of the instance.
2689

2690
    """
2691
    env = {
2692
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2693
      }
2694
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2695
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2696
    return env, nl, nl
2697

    
2698
  def CheckPrereq(self):
2699
    """Check prerequisites.
2700

2701
    This checks that the instance is in the cluster.
2702

2703
    """
2704
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2705
    assert self.instance is not None, \
2706
      "Cannot retrieve locked instance %s" % self.op.instance_name
2707

    
2708
    _CheckNodeOnline(self, instance.primary_node)
2709

    
2710
    # check bridges existance
2711
    _CheckInstanceBridgesExist(self, instance)
2712

    
2713
  def Exec(self, feedback_fn):
2714
    """Reboot the instance.
2715

2716
    """
2717
    instance = self.instance
2718
    ignore_secondaries = self.op.ignore_secondaries
2719
    reboot_type = self.op.reboot_type
2720
    extra_args = getattr(self.op, "extra_args", "")
2721

    
2722
    node_current = instance.primary_node
2723

    
2724
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2725
                       constants.INSTANCE_REBOOT_HARD]:
2726
      result = self.rpc.call_instance_reboot(node_current, instance,
2727
                                             reboot_type, extra_args)
2728
      if result.failed or not result.data:
2729
        raise errors.OpExecError("Could not reboot instance")
2730
    else:
2731
      if not self.rpc.call_instance_shutdown(node_current, instance):
2732
        raise errors.OpExecError("could not shutdown instance for full reboot")
2733
      _ShutdownInstanceDisks(self, instance)
2734
      _StartInstanceDisks(self, instance, ignore_secondaries)
2735
      result = self.rpc.call_instance_start(node_current, instance, extra_args)
2736
      msg = result.RemoteFailMsg()
2737
      if msg:
2738
        _ShutdownInstanceDisks(self, instance)
2739
        raise errors.OpExecError("Could not start instance for"
2740
                                 " full reboot: %s" % msg)
2741

    
2742
    self.cfg.MarkInstanceUp(instance.name)
2743

    
2744

    
2745
class LUShutdownInstance(LogicalUnit):
2746
  """Shutdown an instance.
2747

2748
  """
2749
  HPATH = "instance-stop"
2750
  HTYPE = constants.HTYPE_INSTANCE
2751
  _OP_REQP = ["instance_name"]
2752
  REQ_BGL = False
2753

    
2754
  def ExpandNames(self):
2755
    self._ExpandAndLockInstance()
2756

    
2757
  def BuildHooksEnv(self):
2758
    """Build hooks env.
2759

2760
    This runs on master, primary and secondary nodes of the instance.
2761

2762
    """
2763
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2764
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2765
    return env, nl, nl
2766

    
2767
  def CheckPrereq(self):
2768
    """Check prerequisites.
2769

2770
    This checks that the instance is in the cluster.
2771

2772
    """
2773
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2774
    assert self.instance is not None, \
2775
      "Cannot retrieve locked instance %s" % self.op.instance_name
2776
    _CheckNodeOnline(self, self.instance.primary_node)
2777

    
2778
  def Exec(self, feedback_fn):
2779
    """Shutdown the instance.
2780

2781
    """
2782
    instance = self.instance
2783
    node_current = instance.primary_node
2784
    self.cfg.MarkInstanceDown(instance.name)
2785
    result = self.rpc.call_instance_shutdown(node_current, instance)
2786
    if result.failed or not result.data:
2787
      self.proc.LogWarning("Could not shutdown instance")
2788

    
2789
    _ShutdownInstanceDisks(self, instance)
2790

    
2791

    
2792
class LUReinstallInstance(LogicalUnit):
2793
  """Reinstall an instance.
2794

2795
  """
2796
  HPATH = "instance-reinstall"
2797
  HTYPE = constants.HTYPE_INSTANCE
2798
  _OP_REQP = ["instance_name"]
2799
  REQ_BGL = False
2800

    
2801
  def ExpandNames(self):
2802
    self._ExpandAndLockInstance()
2803

    
2804
  def BuildHooksEnv(self):
2805
    """Build hooks env.
2806

2807
    This runs on master, primary and secondary nodes of the instance.
2808

2809
    """
2810
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2811
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2812
    return env, nl, nl
2813

    
2814
  def CheckPrereq(self):
2815
    """Check prerequisites.
2816

2817
    This checks that the instance is in the cluster and is not running.
2818

2819
    """
2820
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2821
    assert instance is not None, \
2822
      "Cannot retrieve locked instance %s" % self.op.instance_name
2823
    _CheckNodeOnline(self, instance.primary_node)
2824

    
2825
    if instance.disk_template == constants.DT_DISKLESS:
2826
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2827
                                 self.op.instance_name)
2828
    if instance.admin_up:
2829
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2830
                                 self.op.instance_name)
2831
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2832
                                              instance.name,
2833
                                              instance.hypervisor)
2834
    if remote_info.failed or remote_info.data:
2835
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2836
                                 (self.op.instance_name,
2837
                                  instance.primary_node))
2838

    
2839
    self.op.os_type = getattr(self.op, "os_type", None)
2840
    if self.op.os_type is not None:
2841
      # OS verification
2842
      pnode = self.cfg.GetNodeInfo(
2843
        self.cfg.ExpandNodeName(instance.primary_node))
2844
      if pnode is None:
2845
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2846
                                   self.op.pnode)
2847
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2848
      result.Raise()
2849
      if not isinstance(result.data, objects.OS):
2850
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2851
                                   " primary node"  % self.op.os_type)
2852

    
2853
    self.instance = instance
2854

    
2855
  def Exec(self, feedback_fn):
2856
    """Reinstall the instance.
2857

2858
    """
2859
    inst = self.instance
2860

    
2861
    if self.op.os_type is not None:
2862
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2863
      inst.os = self.op.os_type
2864
      self.cfg.Update(inst)
2865

    
2866
    _StartInstanceDisks(self, inst, None)
2867
    try:
2868
      feedback_fn("Running the instance OS create scripts...")
2869
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2870
      msg = result.RemoteFailMsg()
2871
      if msg:
2872
        raise errors.OpExecError("Could not install OS for instance %s"
2873
                                 " on node %s: %s" %
2874
                                 (inst.name, inst.primary_node, msg))
2875
    finally:
2876
      _ShutdownInstanceDisks(self, inst)
2877

    
2878

    
2879
class LURenameInstance(LogicalUnit):
2880
  """Rename an instance.
2881

2882
  """
2883
  HPATH = "instance-rename"
2884
  HTYPE = constants.HTYPE_INSTANCE
2885
  _OP_REQP = ["instance_name", "new_name"]
2886

    
2887
  def BuildHooksEnv(self):
2888
    """Build hooks env.
2889

2890
    This runs on master, primary and secondary nodes of the instance.
2891

2892
    """
2893
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2894
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2895
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2896
    return env, nl, nl
2897

    
2898
  def CheckPrereq(self):
2899
    """Check prerequisites.
2900

2901
    This checks that the instance is in the cluster and is not running.
2902

2903
    """
2904
    instance = self.cfg.GetInstanceInfo(
2905
      self.cfg.ExpandInstanceName(self.op.instance_name))
2906
    if instance is None:
2907
      raise errors.OpPrereqError("Instance '%s' not known" %
2908
                                 self.op.instance_name)
2909
    _CheckNodeOnline(self, instance.primary_node)
2910

    
2911
    if instance.admin_up:
2912
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2913
                                 self.op.instance_name)
2914
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2915
                                              instance.name,
2916
                                              instance.hypervisor)
2917
    remote_info.Raise()
2918
    if remote_info.data:
2919
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2920
                                 (self.op.instance_name,
2921
                                  instance.primary_node))
2922
    self.instance = instance
2923

    
2924
    # new name verification
2925
    name_info = utils.HostInfo(self.op.new_name)
2926

    
2927
    self.op.new_name = new_name = name_info.name
2928
    instance_list = self.cfg.GetInstanceList()
2929
    if new_name in instance_list:
2930
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2931
                                 new_name)
2932

    
2933
    if not getattr(self.op, "ignore_ip", False):
2934
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2935
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2936
                                   (name_info.ip, new_name))
2937

    
2938

    
2939
  def Exec(self, feedback_fn):
2940
    """Reinstall the instance.
2941

2942
    """
2943
    inst = self.instance
2944
    old_name = inst.name
2945

    
2946
    if inst.disk_template == constants.DT_FILE:
2947
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2948

    
2949
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2950
    # Change the instance lock. This is definitely safe while we hold the BGL
2951
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2952
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2953

    
2954
    # re-read the instance from the configuration after rename
2955
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2956

    
2957
    if inst.disk_template == constants.DT_FILE:
2958
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2959
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2960
                                                     old_file_storage_dir,
2961
                                                     new_file_storage_dir)
2962
      result.Raise()
2963
      if not result.data:
2964
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2965
                                 " directory '%s' to '%s' (but the instance"
2966
                                 " has been renamed in Ganeti)" % (
2967
                                 inst.primary_node, old_file_storage_dir,
2968
                                 new_file_storage_dir))
2969

    
2970
      if not result.data[0]:
2971
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2972
                                 " (but the instance has been renamed in"
2973
                                 " Ganeti)" % (old_file_storage_dir,
2974
                                               new_file_storage_dir))
2975

    
2976
    _StartInstanceDisks(self, inst, None)
2977
    try:
2978
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2979
                                                 old_name)
2980
      msg = result.RemoteFailMsg()
2981
      if msg:
2982
        msg = ("Could not run OS rename script for instance %s on node %s"
2983
               " (but the instance has been renamed in Ganeti): %s" %
2984
               (inst.name, inst.primary_node, msg))
2985
        self.proc.LogWarning(msg)
2986
    finally:
2987
      _ShutdownInstanceDisks(self, inst)
2988

    
2989

    
2990
class LURemoveInstance(LogicalUnit):
2991
  """Remove an instance.
2992

2993
  """
2994
  HPATH = "instance-remove"
2995
  HTYPE = constants.HTYPE_INSTANCE
2996
  _OP_REQP = ["instance_name", "ignore_failures"]
2997
  REQ_BGL = False
2998

    
2999
  def ExpandNames(self):
3000
    self._ExpandAndLockInstance()
3001
    self.needed_locks[locking.LEVEL_NODE] = []
3002
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3003

    
3004
  def DeclareLocks(self, level):
3005
    if level == locking.LEVEL_NODE:
3006
      self._LockInstancesNodes()
3007

    
3008
  def BuildHooksEnv(self):
3009
    """Build hooks env.
3010

3011
    This runs on master, primary and secondary nodes of the instance.
3012

3013
    """
3014
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3015
    nl = [self.cfg.GetMasterNode()]
3016
    return env, nl, nl
3017

    
3018
  def CheckPrereq(self):
3019
    """Check prerequisites.
3020

3021
    This checks that the instance is in the cluster.
3022

3023
    """
3024
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3025
    assert self.instance is not None, \
3026
      "Cannot retrieve locked instance %s" % self.op.instance_name
3027

    
3028
  def Exec(self, feedback_fn):
3029
    """Remove the instance.
3030

3031
    """
3032
    instance = self.instance
3033
    logging.info("Shutting down instance %s on node %s",
3034
                 instance.name, instance.primary_node)
3035

    
3036
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3037
    if result.failed or not result.data:
3038
      if self.op.ignore_failures:
3039
        feedback_fn("Warning: can't shutdown instance")
3040
      else:
3041
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3042
                                 (instance.name, instance.primary_node))
3043

    
3044
    logging.info("Removing block devices for instance %s", instance.name)
3045

    
3046
    if not _RemoveDisks(self, instance):
3047
      if self.op.ignore_failures:
3048
        feedback_fn("Warning: can't remove instance's disks")
3049
      else:
3050
        raise errors.OpExecError("Can't remove instance's disks")
3051

    
3052
    logging.info("Removing instance %s out of cluster config", instance.name)
3053

    
3054
    self.cfg.RemoveInstance(instance.name)
3055
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3056

    
3057

    
3058
class LUQueryInstances(NoHooksLU):
3059
  """Logical unit for querying instances.
3060

3061
  """
3062
  _OP_REQP = ["output_fields", "names"]
3063
  REQ_BGL = False
3064
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3065
                                    "admin_state", "admin_ram",
3066
                                    "disk_template", "ip", "mac", "bridge",
3067
                                    "sda_size", "sdb_size", "vcpus", "tags",
3068
                                    "network_port", "beparams",
3069
                                    "(disk).(size)/([0-9]+)",
3070
                                    "(disk).(sizes)",
3071
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
3072
                                    "(nic).(macs|ips|bridges)",
3073
                                    "(disk|nic).(count)",
3074
                                    "serial_no", "hypervisor", "hvparams",] +
3075
                                  ["hv/%s" % name
3076
                                   for name in constants.HVS_PARAMETERS] +
3077
                                  ["be/%s" % name
3078
                                   for name in constants.BES_PARAMETERS])
3079
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3080

    
3081

    
3082
  def ExpandNames(self):
3083
    _CheckOutputFields(static=self._FIELDS_STATIC,
3084
                       dynamic=self._FIELDS_DYNAMIC,
3085
                       selected=self.op.output_fields)
3086

    
3087
    self.needed_locks = {}
3088
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3089
    self.share_locks[locking.LEVEL_NODE] = 1
3090

    
3091
    if self.op.names:
3092
      self.wanted = _GetWantedInstances(self, self.op.names)
3093
    else:
3094
      self.wanted = locking.ALL_SET
3095

    
3096
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3097
    if self.do_locking:
3098
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3099
      self.needed_locks[locking.LEVEL_NODE] = []
3100
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3101

    
3102
  def DeclareLocks(self, level):
3103
    if level == locking.LEVEL_NODE and self.do_locking:
3104
      self._LockInstancesNodes()
3105

    
3106
  def CheckPrereq(self):
3107
    """Check prerequisites.
3108

3109
    """
3110
    pass
3111

    
3112
  def Exec(self, feedback_fn):
3113
    """Computes the list of nodes and their attributes.
3114

3115
    """
3116
    all_info = self.cfg.GetAllInstancesInfo()
3117
    if self.wanted == locking.ALL_SET:
3118
      # caller didn't specify instance names, so ordering is not important
3119
      if self.do_locking:
3120
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3121
      else:
3122
        instance_names = all_info.keys()
3123
      instance_names = utils.NiceSort(instance_names)
3124
    else:
3125
      # caller did specify names, so we must keep the ordering
3126
      if self.do_locking:
3127
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3128
      else:
3129
        tgt_set = all_info.keys()
3130
      missing = set(self.wanted).difference(tgt_set)
3131
      if missing:
3132
        raise errors.OpExecError("Some instances were removed before"
3133
                                 " retrieving their data: %s" % missing)
3134
      instance_names = self.wanted
3135

    
3136
    instance_list = [all_info[iname] for iname in instance_names]
3137

    
3138
    # begin data gathering
3139

    
3140
    nodes = frozenset([inst.primary_node for inst in instance_list])
3141
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3142

    
3143
    bad_nodes = []
3144
    off_nodes = []
3145
    if self.do_locking:
3146
      live_data = {}
3147
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3148
      for name in nodes:
3149
        result = node_data[name]
3150
        if result.offline:
3151
          # offline nodes will be in both lists
3152
          off_nodes.append(name)
3153
        if result.failed:
3154
          bad_nodes.append(name)
3155
        else:
3156
          if result.data:
3157
            live_data.update(result.data)
3158
            # else no instance is alive
3159
    else:
3160
      live_data = dict([(name, {}) for name in instance_names])
3161

    
3162
    # end data gathering
3163

    
3164
    HVPREFIX = "hv/"
3165
    BEPREFIX = "be/"
3166
    output = []
3167
    for instance in instance_list:
3168
      iout = []
3169
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3170
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3171
      for field in self.op.output_fields:
3172
        st_match = self._FIELDS_STATIC.Matches(field)
3173
        if field == "name":
3174
          val = instance.name
3175
        elif field == "os":
3176
          val = instance.os
3177
        elif field == "pnode":
3178
          val = instance.primary_node
3179
        elif field == "snodes":
3180
          val = list(instance.secondary_nodes)
3181
        elif field == "admin_state":
3182
          val = instance.admin_up
3183
        elif field == "oper_state":
3184
          if instance.primary_node in bad_nodes:
3185
            val = None
3186
          else:
3187
            val = bool(live_data.get(instance.name))
3188
        elif field == "status":
3189
          if instance.primary_node in off_nodes:
3190
            val = "ERROR_nodeoffline"
3191
          elif instance.primary_node in bad_nodes:
3192
            val = "ERROR_nodedown"
3193
          else:
3194
            running = bool(live_data.get(instance.name))
3195
            if running:
3196
              if instance.admin_up:
3197
                val = "running"
3198
              else:
3199
                val = "ERROR_up"
3200
            else:
3201
              if instance.admin_up:
3202
                val = "ERROR_down"
3203
              else:
3204
                val = "ADMIN_down"
3205
        elif field == "oper_ram":
3206
          if instance.primary_node in bad_nodes:
3207
            val = None
3208
          elif instance.name in live_data:
3209
            val = live_data[instance.name].get("memory", "?")
3210
          else:
3211
            val = "-"
3212
        elif field == "disk_template":
3213
          val = instance.disk_template
3214
        elif field == "ip":
3215
          val = instance.nics[0].ip
3216
        elif field == "bridge":
3217
          val = instance.nics[0].bridge
3218
        elif field == "mac":
3219
          val = instance.nics[0].mac
3220
        elif field == "sda_size" or field == "sdb_size":
3221
          idx = ord(field[2]) - ord('a')
3222
          try:
3223
            val = instance.FindDisk(idx).size
3224
          except errors.OpPrereqError:
3225
            val = None
3226
        elif field == "tags":
3227
          val = list(instance.GetTags())
3228
        elif field == "serial_no":
3229
          val = instance.serial_no
3230
        elif field == "network_port":
3231
          val = instance.network_port
3232
        elif field == "hypervisor":
3233
          val = instance.hypervisor
3234
        elif field == "hvparams":
3235
          val = i_hv
3236
        elif (field.startswith(HVPREFIX) and
3237
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3238
          val = i_hv.get(field[len(HVPREFIX):], None)
3239
        elif field == "beparams":
3240
          val = i_be
3241
        elif (field.startswith(BEPREFIX) and
3242
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3243
          val = i_be.get(field[len(BEPREFIX):], None)
3244
        elif st_match and st_match.groups():
3245
          # matches a variable list
3246
          st_groups = st_match.groups()
3247
          if st_groups and st_groups[0] == "disk":
3248
            if st_groups[1] == "count":
3249
              val = len(instance.disks)
3250
            elif st_groups[1] == "sizes":
3251
              val = [disk.size for disk in instance.disks]
3252
            elif st_groups[1] == "size":
3253
              try:
3254
                val = instance.FindDisk(st_groups[2]).size
3255
              except errors.OpPrereqError:
3256
                val = None
3257
            else:
3258
              assert False, "Unhandled disk parameter"
3259
          elif st_groups[0] == "nic":
3260
            if st_groups[1] == "count":
3261
              val = len(instance.nics)
3262
            elif st_groups[1] == "macs":
3263
              val = [nic.mac for nic in instance.nics]
3264
            elif st_groups[1] == "ips":
3265
              val = [nic.ip for nic in instance.nics]
3266
            elif st_groups[1] == "bridges":
3267
              val = [nic.bridge for nic in instance.nics]
3268
            else:
3269
              # index-based item
3270
              nic_idx = int(st_groups[2])
3271
              if nic_idx >= len(instance.nics):
3272
                val = None
3273
              else:
3274
                if st_groups[1] == "mac":
3275
                  val = instance.nics[nic_idx].mac
3276
                elif st_groups[1] == "ip":
3277
                  val = instance.nics[nic_idx].ip
3278
                elif st_groups[1] == "bridge":
3279
                  val = instance.nics[nic_idx].bridge
3280
                else:
3281
                  assert False, "Unhandled NIC parameter"
3282
          else:
3283
            assert False, "Unhandled variable parameter"
3284
        else:
3285
          raise errors.ParameterError(field)
3286
        iout.append(val)
3287
      output.append(iout)
3288

    
3289
    return output
3290

    
3291

    
3292
class LUFailoverInstance(LogicalUnit):
3293
  """Failover an instance.
3294

3295
  """
3296
  HPATH = "instance-failover"
3297
  HTYPE = constants.HTYPE_INSTANCE
3298
  _OP_REQP = ["instance_name", "ignore_consistency"]
3299
  REQ_BGL = False
3300

    
3301
  def ExpandNames(self):
3302
    self._ExpandAndLockInstance()
3303
    self.needed_locks[locking.LEVEL_NODE] = []
3304
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3305

    
3306
  def DeclareLocks(self, level):
3307
    if level == locking.LEVEL_NODE:
3308
      self._LockInstancesNodes()
3309

    
3310
  def BuildHooksEnv(self):
3311
    """Build hooks env.
3312

3313
    This runs on master, primary and secondary nodes of the instance.
3314

3315
    """
3316
    env = {
3317
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3318
      }
3319
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3320
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3321
    return env, nl, nl
3322

    
3323
  def CheckPrereq(self):
3324
    """Check prerequisites.
3325

3326
    This checks that the instance is in the cluster.
3327

3328
    """
3329
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3330
    assert self.instance is not None, \
3331
      "Cannot retrieve locked instance %s" % self.op.instance_name
3332

    
3333
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3334
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3335
      raise errors.OpPrereqError("Instance's disk layout is not"
3336
                                 " network mirrored, cannot failover.")
3337

    
3338
    secondary_nodes = instance.secondary_nodes
3339
    if not secondary_nodes:
3340
      raise errors.ProgrammerError("no secondary node but using "
3341
                                   "a mirrored disk template")
3342

    
3343
    target_node = secondary_nodes[0]
3344
    _CheckNodeOnline(self, target_node)
3345
    # check memory requirements on the secondary node
3346
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3347
                         instance.name, bep[constants.BE_MEMORY],
3348
                         instance.hypervisor)
3349

    
3350
    # check bridge existance
3351
    brlist = [nic.bridge for nic in instance.nics]
3352
    result = self.rpc.call_bridges_exist(target_node, brlist)
3353
    result.Raise()
3354
    if not result.data:
3355
      raise errors.OpPrereqError("One or more target bridges %s does not"
3356
                                 " exist on destination node '%s'" %
3357
                                 (brlist, target_node))
3358

    
3359
  def Exec(self, feedback_fn):
3360
    """Failover an instance.
3361

3362
    The failover is done by shutting it down on its present node and
3363
    starting it on the secondary.
3364

3365
    """
3366
    instance = self.instance
3367

    
3368
    source_node = instance.primary_node
3369
    target_node = instance.secondary_nodes[0]
3370

    
3371
    feedback_fn("* checking disk consistency between source and target")
3372
    for dev in instance.disks:
3373
      # for drbd, these are drbd over lvm
3374
      if not _CheckDiskConsistency(self, dev, target_node, False):
3375
        if instance.admin_up and not self.op.ignore_consistency:
3376
          raise errors.OpExecError("Disk %s is degraded on target node,"
3377
                                   " aborting failover." % dev.iv_name)
3378

    
3379
    feedback_fn("* shutting down instance on source node")
3380
    logging.info("Shutting down instance %s on node %s",
3381
                 instance.name, source_node)
3382

    
3383
    result = self.rpc.call_instance_shutdown(source_node, instance)
3384
    if result.failed or not result.data:
3385
      if self.op.ignore_consistency:
3386
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3387
                             " Proceeding"
3388
                             " anyway. Please make sure node %s is down",
3389
                             instance.name, source_node, source_node)
3390
      else:
3391
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3392
                                 (instance.name, source_node))
3393

    
3394
    feedback_fn("* deactivating the instance's disks on source node")
3395
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3396
      raise errors.OpExecError("Can't shut down the instance's disks.")
3397

    
3398
    instance.primary_node = target_node
3399
    # distribute new instance config to the other nodes
3400
    self.cfg.Update(instance)
3401

    
3402
    # Only start the instance if it's marked as up
3403
    if instance.admin_up:
3404
      feedback_fn("* activating the instance's disks on target node")
3405
      logging.info("Starting instance %s on node %s",
3406
                   instance.name, target_node)
3407

    
3408
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3409
                                               ignore_secondaries=True)
3410
      if not disks_ok:
3411
        _ShutdownInstanceDisks(self, instance)
3412
        raise errors.OpExecError("Can't activate the instance's disks")
3413

    
3414
      feedback_fn("* starting the instance on the target node")
3415
      result = self.rpc.call_instance_start(target_node, instance, None)
3416
      msg = result.RemoteFailMsg()
3417
      if msg:
3418
        _ShutdownInstanceDisks(self, instance)
3419
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3420
                                 (instance.name, target_node, msg))
3421

    
3422

    
3423
class LUMigrateInstance(LogicalUnit):
3424
  """Migrate an instance.
3425

3426
  This is migration without shutting down, compared to the failover,
3427
  which is done with shutdown.
3428

3429
  """
3430
  HPATH = "instance-migrate"
3431
  HTYPE = constants.HTYPE_INSTANCE
3432
  _OP_REQP = ["instance_name", "live", "cleanup"]
3433

    
3434
  REQ_BGL = False
3435

    
3436
  def ExpandNames(self):
3437
    self._ExpandAndLockInstance()
3438
    self.needed_locks[locking.LEVEL_NODE] = []
3439
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3440

    
3441
  def DeclareLocks(self, level):
3442
    if level == locking.LEVEL_NODE:
3443
      self._LockInstancesNodes()
3444

    
3445
  def BuildHooksEnv(self):
3446
    """Build hooks env.
3447

3448
    This runs on master, primary and secondary nodes of the instance.
3449

3450
    """
3451
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3452
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3453
    return env, nl, nl
3454

    
3455
  def CheckPrereq(self):
3456
    """Check prerequisites.
3457

3458
    This checks that the instance is in the cluster.
3459

3460
    """
3461
    instance = self.cfg.GetInstanceInfo(
3462
      self.cfg.ExpandInstanceName(self.op.instance_name))
3463
    if instance is None:
3464
      raise errors.OpPrereqError("Instance '%s' not known" %
3465
                                 self.op.instance_name)
3466

    
3467
    if instance.disk_template != constants.DT_DRBD8:
3468
      raise errors.OpPrereqError("Instance's disk layout is not"
3469
                                 " drbd8, cannot migrate.")
3470

    
3471
    secondary_nodes = instance.secondary_nodes
3472
    if not secondary_nodes:
3473
      raise errors.ProgrammerError("no secondary node but using "
3474
                                   "drbd8 disk template")
3475

    
3476
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3477

    
3478
    target_node = secondary_nodes[0]
3479
    # check memory requirements on the secondary node
3480
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3481
                         instance.name, i_be[constants.BE_MEMORY],
3482
                         instance.hypervisor)
3483

    
3484
    # check bridge existance
3485
    brlist = [nic.bridge for nic in instance.nics]
3486
    result = self.rpc.call_bridges_exist(target_node, brlist)
3487
    if result.failed or not result.data:
3488
      raise errors.OpPrereqError("One or more target bridges %s does not"
3489
                                 " exist on destination node '%s'" %
3490
                                 (brlist, target_node))
3491

    
3492
    if not self.op.cleanup:
3493
      result = self.rpc.call_instance_migratable(instance.primary_node,
3494
                                                 instance)
3495
      msg = result.RemoteFailMsg()
3496
      if msg:
3497
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3498
                                   msg)
3499

    
3500
    self.instance = instance
3501

    
3502
  def _WaitUntilSync(self):
3503
    """Poll with custom rpc for disk sync.
3504

3505
    This uses our own step-based rpc call.
3506

3507
    """
3508
    self.feedback_fn("* wait until resync is done")
3509
    all_done = False
3510
    while not all_done:
3511
      all_done = True
3512
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3513
                                            self.nodes_ip,
3514
                                            self.instance.disks)
3515
      min_percent = 100
3516
      for node, nres in result.items():
3517
        msg = nres.RemoteFailMsg()
3518
        if msg:
3519
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3520
                                   (node, msg))
3521
        node_done, node_percent = nres.data[1]
3522
        all_done = all_done and node_done
3523
        if node_percent is not None:
3524
          min_percent = min(min_percent, node_percent)
3525
      if not all_done:
3526
        if min_percent < 100:
3527
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3528
        time.sleep(2)
3529

    
3530
  def _EnsureSecondary(self, node):
3531
    """Demote a node to secondary.
3532

3533
    """
3534
    self.feedback_fn("* switching node %s to secondary mode" % node)
3535

    
3536
    for dev in self.instance.disks:
3537
      self.cfg.SetDiskID(dev, node)
3538

    
3539
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3540
                                          self.instance.disks)
3541
    msg = result.RemoteFailMsg()
3542
    if msg:
3543
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3544
                               " error %s" % (node, msg))
3545

    
3546
  def _GoStandalone(self):
3547
    """Disconnect from the network.
3548

3549
    """
3550
    self.feedback_fn("* changing into standalone mode")
3551
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3552
                                               self.instance.disks)
3553
    for node, nres in result.items():
3554
      msg = nres.RemoteFailMsg()
3555
      if msg:
3556
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3557
                                 " error %s" % (node, msg))
3558

    
3559
  def _GoReconnect(self, multimaster):
3560
    """Reconnect to the network.
3561

3562
    """
3563
    if multimaster:
3564
      msg = "dual-master"
3565
    else:
3566
      msg = "single-master"
3567
    self.feedback_fn("* changing disks into %s mode" % msg)
3568
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3569
                                           self.instance.disks,
3570
                                           self.instance.name, multimaster)
3571
    for node, nres in result.items():
3572
      msg = nres.RemoteFailMsg()
3573
      if msg:
3574
        raise errors.OpExecError("Cannot change disks config on node %s,"
3575
                                 " error: %s" % (node, msg))
3576

    
3577
  def _ExecCleanup(self):
3578
    """Try to cleanup after a failed migration.
3579

3580
    The cleanup is done by:
3581
      - check that the instance is running only on one node
3582
        (and update the config if needed)
3583
      - change disks on its secondary node to secondary
3584
      - wait until disks are fully synchronized
3585
      - disconnect from the network
3586
      - change disks into single-master mode
3587
      - wait again until disks are fully synchronized
3588

3589
    """
3590
    instance = self.instance
3591
    target_node = self.target_node
3592
    source_node = self.source_node
3593

    
3594
    # check running on only one node
3595
    self.feedback_fn("* checking where the instance actually runs"
3596
                     " (if this hangs, the hypervisor might be in"
3597
                     " a bad state)")
3598
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3599
    for node, result in ins_l.items():
3600
      result.Raise()
3601
      if not isinstance(result.data, list):
3602
        raise errors.OpExecError("Can't contact node '%s'" % node)
3603

    
3604
    runningon_source = instance.name in ins_l[source_node].data
3605
    runningon_target = instance.name in ins_l[target_node].data
3606

    
3607
    if runningon_source and runningon_target:
3608
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3609
                               " or the hypervisor is confused. You will have"
3610
                               " to ensure manually that it runs only on one"
3611
                               " and restart this operation.")
3612

    
3613
    if not (runningon_source or runningon_target):
3614
      raise errors.OpExecError("Instance does not seem to be running at all."
3615
                               " In this case, it's safer to repair by"
3616
                               " running 'gnt-instance stop' to ensure disk"
3617
                               " shutdown, and then restarting it.")
3618

    
3619
    if runningon_target:
3620
      # the migration has actually succeeded, we need to update the config
3621
      self.feedback_fn("* instance running on secondary node (%s),"
3622
                       " updating config" % target_node)
3623
      instance.primary_node = target_node
3624
      self.cfg.Update(instance)
3625
      demoted_node = source_node
3626
    else:
3627
      self.feedback_fn("* instance confirmed to be running on its"
3628
                       " primary node (%s)" % source_node)
3629
      demoted_node = target_node
3630

    
3631
    self._EnsureSecondary(demoted_node)
3632
    try:
3633
      self._WaitUntilSync()
3634
    except errors.OpExecError:
3635
      # we ignore here errors, since if the device is standalone, it
3636
      # won't be able to sync
3637
      pass
3638
    self._GoStandalone()
3639
    self._GoReconnect(False)
3640
    self._WaitUntilSync()
3641

    
3642
    self.feedback_fn("* done")
3643

    
3644
  def _RevertDiskStatus(self):
3645
    """Try to revert the disk status after a failed migration.
3646

3647
    """
3648
    target_node = self.target_node
3649
    try:
3650
      self._EnsureSecondary(target_node)
3651
      self._GoStandalone()
3652
      self._GoReconnect(False)
3653
      self._WaitUntilSync()
3654
    except errors.OpExecError, err:
3655
      self.LogWarning("Migration failed and I can't reconnect the"
3656
                      " drives: error '%s'\n"
3657
                      "Please look and recover the instance status" %
3658
                      str(err))
3659

    
3660
  def _AbortMigration(self):
3661
    """Call the hypervisor code to abort a started migration.
3662

3663
    """
3664
    instance = self.instance
3665
    target_node = self.target_node
3666
    migration_info = self.migration_info
3667

    
3668
    abort_result = self.rpc.call_finalize_migration(target_node,
3669
                                                    instance,
3670
                                                    migration_info,
3671
                                                    False)
3672
    abort_msg = abort_result.RemoteFailMsg()
3673
    if abort_msg:
3674
      logging.error("Aborting migration failed on target node %s: %s" %
3675
                    (target_node, abort_msg))
3676
      # Don't raise an exception here, as we stil have to try to revert the
3677
      # disk status, even if this step failed.
3678

    
3679
  def _ExecMigration(self):
3680
    """Migrate an instance.
3681

3682
    The migrate is done by:
3683
      - change the disks into dual-master mode
3684
      - wait until disks are fully synchronized again
3685
      - migrate the instance
3686
      - change disks on the new secondary node (the old primary) to secondary
3687
      - wait until disks are fully synchronized
3688
      - change disks into single-master mode
3689

3690
    """
3691
    instance = self.instance
3692
    target_node = self.target_node
3693
    source_node = self.source_node
3694

    
3695
    self.feedback_fn("* checking disk consistency between source and target")
3696
    for dev in instance.disks:
3697
      if not _CheckDiskConsistency(self, dev, target_node, False):
3698
        raise errors.OpExecError("Disk %s is degraded or not fully"
3699
                                 " synchronized on target node,"
3700
                                 " aborting migrate." % dev.iv_name)
3701

    
3702
    # First get the migration information from the remote node
3703
    result = self.rpc.call_migration_info(source_node, instance)
3704
    msg = result.RemoteFailMsg()
3705
    if msg:
3706
      log_err = ("Failed fetching source migration information from %s: %s" %
3707
                  (source_node, msg))
3708
      logging.error(log_err)
3709
      raise errors.OpExecError(log_err)
3710

    
3711
    self.migration_info = migration_info = result.data[1]
3712

    
3713
    # Then switch the disks to master/master mode
3714
    self._EnsureSecondary(target_node)
3715
    self._GoStandalone()
3716
    self._GoReconnect(True)
3717
    self._WaitUntilSync()
3718

    
3719
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3720
    result = self.rpc.call_accept_instance(target_node,
3721
                                           instance,
3722
                                           migration_info,
3723
                                           self.nodes_ip[target_node])
3724

    
3725
    msg = result.RemoteFailMsg()
3726
    if msg:
3727
      logging.error("Instance pre-migration failed, trying to revert"
3728
                    " disk status: %s", msg)
3729
      self._AbortMigration()
3730
      self._RevertDiskStatus()
3731
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3732
                               (instance.name, msg))
3733

    
3734
    self.feedback_fn("* migrating instance to %s" % target_node)
3735
    time.sleep(10)
3736
    result = self.rpc.call_instance_migrate(source_node, instance,
3737
                                            self.nodes_ip[target_node],
3738
                                            self.op.live)
3739
    msg = result.RemoteFailMsg()
3740
    if msg:
3741
      logging.error("Instance migration failed, trying to revert"
3742
                    " disk status: %s", msg)
3743
      self._AbortMigration()
3744
      self._RevertDiskStatus()
3745
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3746
                               (instance.name, msg))
3747
    time.sleep(10)
3748

    
3749
    instance.primary_node = target_node
3750
    # distribute new instance config to the other nodes
3751
    self.cfg.Update(instance)
3752

    
3753
    result = self.rpc.call_finalize_migration(target_node,
3754
                                              instance,
3755
                                              migration_info,
3756
                                              True)
3757
    msg = result.RemoteFailMsg()
3758
    if msg:
3759
      logging.error("Instance migration succeeded, but finalization failed:"
3760
                    " %s" % msg)
3761
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3762
                               msg)
3763

    
3764
    self._EnsureSecondary(source_node)
3765
    self._WaitUntilSync()
3766
    self._GoStandalone()
3767
    self._GoReconnect(False)
3768
    self._WaitUntilSync()
3769

    
3770
    self.feedback_fn("* done")
3771

    
3772
  def Exec(self, feedback_fn):
3773
    """Perform the migration.
3774

3775
    """
3776
    self.feedback_fn = feedback_fn
3777

    
3778
    self.source_node = self.instance.primary_node
3779
    self.target_node = self.instance.secondary_nodes[0]
3780
    self.all_nodes = [self.source_node, self.target_node]
3781
    self.nodes_ip = {
3782
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3783
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3784
      }
3785
    if self.op.cleanup:
3786
      return self._ExecCleanup()
3787
    else:
3788
      return self._ExecMigration()
3789

    
3790

    
3791
def _CreateBlockDev(lu, node, instance, device, force_create,
3792
                    info, force_open):
3793
  """Create a tree of block devices on a given node.
3794

3795
  If this device type has to be created on secondaries, create it and
3796
  all its children.
3797

3798
  If not, just recurse to children keeping the same 'force' value.
3799

3800
  @param lu: the lu on whose behalf we execute
3801
  @param node: the node on which to create the device
3802
  @type instance: L{objects.Instance}
3803
  @param instance: the instance which owns the device
3804
  @type device: L{objects.Disk}
3805
  @param device: the device to create
3806
  @type force_create: boolean
3807
  @param force_create: whether to force creation of this device; this
3808
      will be change to True whenever we find a device which has
3809
      CreateOnSecondary() attribute
3810
  @param info: the extra 'metadata' we should attach to the device
3811
      (this will be represented as a LVM tag)
3812
  @type force_open: boolean
3813
  @param force_open: this parameter will be passes to the
3814
      L{backend.CreateBlockDevice} function where it specifies
3815
      whether we run on primary or not, and it affects both
3816
      the child assembly and the device own Open() execution
3817

3818
  """
3819
  if device.CreateOnSecondary():
3820
    force_create = True
3821

    
3822
  if device.children:
3823
    for child in device.children:
3824
      _CreateBlockDev(lu, node, instance, child, force_create,
3825
                      info, force_open)
3826

    
3827
  if not force_create:
3828
    return
3829

    
3830
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3831

    
3832

    
3833
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3834
  """Create a single block device on a given node.
3835

3836
  This will not recurse over children of the device, so they must be
3837
  created in advance.
3838

3839
  @param lu: the lu on whose behalf we execute
3840
  @param node: the node on which to create the device
3841
  @type instance: L{objects.Instance}
3842
  @param instance: the instance which owns the device
3843
  @type device: L{objects.Disk}
3844
  @param device: the device to create
3845
  @param info: the extra 'metadata' we should attach to the device
3846
      (this will be represented as a LVM tag)
3847
  @type force_open: boolean
3848
  @param force_open: this parameter will be passes to the
3849
      L{backend.CreateBlockDevice} function where it specifies
3850
      whether we run on primary or not, and it affects both
3851
      the child assembly and the device own Open() execution
3852

3853
  """
3854
  lu.cfg.SetDiskID(device, node)
3855
  result = lu.rpc.call_blockdev_create(node, device, device.size,
3856
                                       instance.name, force_open, info)
3857
  msg = result.RemoteFailMsg()
3858
  if msg:
3859
    raise errors.OpExecError("Can't create block device %s on"
3860
                             " node %s for instance %s: %s" %
3861
                             (device, node, instance.name, msg))
3862
  if device.physical_id is None:
3863
    device.physical_id = result.data[1]
3864

    
3865

    
3866
def _GenerateUniqueNames(lu, exts):
3867
  """Generate a suitable LV name.
3868

3869
  This will generate a logical volume name for the given instance.
3870

3871
  """
3872
  results = []
3873
  for val in exts:
3874
    new_id = lu.cfg.GenerateUniqueID()
3875
    results.append("%s%s" % (new_id, val))
3876
  return results
3877

    
3878

    
3879
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3880
                         p_minor, s_minor):
3881
  """Generate a drbd8 device complete with its children.
3882

3883
  """
3884
  port = lu.cfg.AllocatePort()
3885
  vgname = lu.cfg.GetVGName()
3886
  shared_secret = lu.cfg.GenerateDRBDSecret()
3887
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3888
                          logical_id=(vgname, names[0]))
3889
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3890
                          logical_id=(vgname, names[1]))
3891
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3892
                          logical_id=(primary, secondary, port,
3893
                                      p_minor, s_minor,
3894
                                      shared_secret),
3895
                          children=[dev_data, dev_meta],
3896
                          iv_name=iv_name)
3897
  return drbd_dev
3898

    
3899

    
3900
def _GenerateDiskTemplate(lu, template_name,
3901
                          instance_name, primary_node,
3902
                          secondary_nodes, disk_info,
3903
                          file_storage_dir, file_driver,
3904
                          base_index):
3905
  """Generate the entire disk layout for a given template type.
3906

3907
  """
3908
  #TODO: compute space requirements
3909

    
3910
  vgname = lu.cfg.GetVGName()
3911
  disk_count = len(disk_info)
3912
  disks = []
3913
  if template_name == constants.DT_DISKLESS:
3914
    pass
3915
  elif template_name == constants.DT_PLAIN:
3916
    if len(secondary_nodes) != 0:
3917
      raise errors.ProgrammerError("Wrong template configuration")
3918

    
3919
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3920
                                      for i in range(disk_count)])
3921
    for idx, disk in enumerate(disk_info):
3922
      disk_index = idx + base_index
3923
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3924
                              logical_id=(vgname, names[idx]),
3925
                              iv_name="disk/%d" % disk_index,
3926
                              mode=disk["mode"])
3927
      disks.append(disk_dev)
3928
  elif template_name == constants.DT_DRBD8:
3929
    if len(secondary_nodes) != 1:
3930
      raise errors.ProgrammerError("Wrong template configuration")
3931
    remote_node = secondary_nodes[0]
3932
    minors = lu.cfg.AllocateDRBDMinor(
3933
      [primary_node, remote_node] * len(disk_info), instance_name)
3934

    
3935
    names = []
3936
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
3937
                                               for i in range(disk_count)]):
3938
      names.append(lv_prefix + "_data")
3939
      names.append(lv_prefix + "_meta")
3940
    for idx, disk in enumerate(disk_info):
3941
      disk_index = idx + base_index
3942
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3943
                                      disk["size"], names[idx*2:idx*2+2],
3944
                                      "disk/%d" % disk_index,
3945
                                      minors[idx*2], minors[idx*2+1])
3946
      disk_dev.mode = disk["mode"]
3947
      disks.append(disk_dev)
3948
  elif template_name == constants.DT_FILE:
3949
    if len(secondary_nodes) != 0:
3950
      raise errors.ProgrammerError("Wrong template configuration")
3951

    
3952
    for idx, disk in enumerate(disk_info):
3953
      disk_index = idx + base_index
3954
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3955
                              iv_name="disk/%d" % disk_index,
3956
                              logical_id=(file_driver,
3957
                                          "%s/disk%d" % (file_storage_dir,
3958
                                                         idx)),
3959
                              mode=disk["mode"])
3960
      disks.append(disk_dev)
3961
  else:
3962
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3963
  return disks
3964

    
3965

    
3966
def _GetInstanceInfoText(instance):
3967
  """Compute that text that should be added to the disk's metadata.
3968

3969
  """
3970
  return "originstname+%s" % instance.name
3971

    
3972

    
3973
def _CreateDisks(lu, instance):
3974
  """Create all disks for an instance.
3975

3976
  This abstracts away some work from AddInstance.
3977

3978
  @type lu: L{LogicalUnit}
3979
  @param lu: the logical unit on whose behalf we execute
3980
  @type instance: L{objects.Instance}
3981
  @param instance: the instance whose disks we should create
3982
  @rtype: boolean
3983
  @return: the success of the creation
3984

3985
  """
3986
  info = _GetInstanceInfoText(instance)
3987
  pnode = instance.primary_node
3988

    
3989
  if instance.disk_template == constants.DT_FILE:
3990
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3991
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
3992

    
3993
    if result.failed or not result.data:
3994
      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
3995

    
3996
    if not result.data[0]:
3997
      raise errors.OpExecError("Failed to create directory '%s'" %
3998
                               file_storage_dir)
3999

    
4000
  # Note: this needs to be kept in sync with adding of disks in
4001
  # LUSetInstanceParams
4002
  for device in instance.disks:
4003
    logging.info("Creating volume %s for instance %s",
4004
                 device.iv_name, instance.name)
4005
    #HARDCODE
4006
    for node in instance.all_nodes:
4007
      f_create = node == pnode
4008
      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4009

    
4010

    
4011
def _RemoveDisks(lu, instance):
4012
  """Remove all disks for an instance.
4013

4014
  This abstracts away some work from `AddInstance()` and
4015
  `RemoveInstance()`. Note that in case some of the devices couldn't
4016
  be removed, the removal will continue with the other ones (compare
4017
  with `_CreateDisks()`).
4018

4019
  @type lu: L{LogicalUnit}
4020
  @param lu: the logical unit on whose behalf we execute
4021
  @type instance: L{objects.Instance}
4022
  @param instance: the instance whose disks we should remove
4023
  @rtype: boolean
4024
  @return: the success of the removal
4025

4026
  """
4027
  logging.info("Removing block devices for instance %s", instance.name)
4028

    
4029
  result = True
4030
  for device in instance.disks:
4031
    for node, disk in device.ComputeNodeTree(instance.primary_node):
4032
      lu.cfg.SetDiskID(disk, node)
4033
      result = lu.rpc.call_blockdev_remove(node, disk)
4034
      if result.failed or not result.data:
4035
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
4036
                           " continuing anyway", device.iv_name, node)
4037
        result = False
4038

    
4039
  if instance.disk_template == constants.DT_FILE:
4040
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4041
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4042
                                                 file_storage_dir)
4043
    if result.failed or not result.data:
4044
      logging.error("Could not remove directory '%s'", file_storage_dir)
4045
      result = False
4046

    
4047
  return result
4048

    
4049

    
4050
def _ComputeDiskSize(disk_template, disks):
4051
  """Compute disk size requirements in the volume group
4052

4053
  """
4054
  # Required free disk space as a function of disk and swap space
4055
  req_size_dict = {
4056
    constants.DT_DISKLESS: None,
4057
    constants.DT_PLAIN: sum(d["size"] for d in disks),
4058
    # 128 MB are added for drbd metadata for each disk
4059
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4060
    constants.DT_FILE: None,
4061
  }
4062

    
4063
  if disk_template not in req_size_dict:
4064
    raise errors.ProgrammerError("Disk template '%s' size requirement"
4065
                                 " is unknown" %  disk_template)
4066

    
4067
  return req_size_dict[disk_template]
4068

    
4069

    
4070
def _CheckHVParams(lu, nodenames, hvname, hvparams):
4071
  """Hypervisor parameter validation.
4072

4073
  This function abstract the hypervisor parameter validation to be
4074
  used in both instance create and instance modify.
4075

4076
  @type lu: L{LogicalUnit}
4077
  @param lu: the logical unit for which we check
4078
  @type nodenames: list
4079
  @param nodenames: the list of nodes on which we should check
4080
  @type hvname: string
4081
  @param hvname: the name of the hypervisor we should use
4082
  @type hvparams: dict
4083
  @param hvparams: the parameters which we need to check
4084
  @raise errors.OpPrereqError: if the parameters are not valid
4085

4086
  """
4087
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4088
                                                  hvname,
4089
                                                  hvparams)
4090
  for node in nodenames:
4091
    info = hvinfo[node]
4092
    if info.offline:
4093
      continue
4094
    info.Raise()
4095
    if not info.data or not isinstance(info.data, (tuple, list)):
4096
      raise errors.OpPrereqError("Cannot get current information"
4097
                                 " from node '%s' (%s)" % (node, info.data))
4098
    if not info.data[0]:
4099
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4100
                                 " %s" % info.data[1])
4101

    
4102

    
4103
class LUCreateInstance(LogicalUnit):
4104
  """Create an instance.
4105

4106
  """
4107
  HPATH = "instance-add"
4108
  HTYPE = constants.HTYPE_INSTANCE
4109
  _OP_REQP = ["instance_name", "disks", "disk_template",
4110
              "mode", "start",
4111
              "wait_for_sync", "ip_check", "nics",
4112
              "hvparams", "beparams"]
4113
  REQ_BGL = False
4114

    
4115
  def _ExpandNode(self, node):
4116
    """Expands and checks one node name.
4117

4118
    """
4119
    node_full = self.cfg.ExpandNodeName(node)
4120
    if node_full is None:
4121
      raise errors.OpPrereqError("Unknown node %s" % node)
4122
    return node_full
4123

    
4124
  def ExpandNames(self):
4125
    """ExpandNames for CreateInstance.
4126

4127
    Figure out the right locks for instance creation.
4128

4129
    """
4130
    self.needed_locks = {}
4131

    
4132
    # set optional parameters to none if they don't exist
4133
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4134
      if not hasattr(self.op, attr):
4135
        setattr(self.op, attr, None)
4136

    
4137
    # cheap checks, mostly valid constants given
4138

    
4139
    # verify creation mode
4140
    if self.op.mode not in (constants.INSTANCE_CREATE,
4141
                            constants.INSTANCE_IMPORT):
4142
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4143
                                 self.op.mode)
4144

    
4145
    # disk template and mirror node verification
4146
    if self.op.disk_template not in constants.DISK_TEMPLATES:
4147
      raise errors.OpPrereqError("Invalid disk template name")
4148

    
4149
    if self.op.hypervisor is None:
4150
      self.op.hypervisor = self.cfg.GetHypervisorType()
4151

    
4152
    cluster = self.cfg.GetClusterInfo()
4153
    enabled_hvs = cluster.enabled_hypervisors
4154
    if self.op.hypervisor not in enabled_hvs:
4155
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4156
                                 " cluster (%s)" % (self.op.hypervisor,
4157
                                  ",".join(enabled_hvs)))
4158

    
4159
    # check hypervisor parameter syntax (locally)
4160

    
4161
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4162
                                  self.op.hvparams)
4163
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4164
    hv_type.CheckParameterSyntax(filled_hvp)
4165

    
4166
    # fill and remember the beparams dict
4167
    utils.CheckBEParams(self.op.beparams)
4168
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4169
                                    self.op.beparams)
4170

    
4171
    #### instance parameters check
4172

    
4173
    # instance name verification
4174
    hostname1 = utils.HostInfo(self.op.instance_name)
4175
    self.op.instance_name = instance_name = hostname1.name
4176

    
4177
    # this is just a preventive check, but someone might still add this
4178
    # instance in the meantime, and creation will fail at lock-add time
4179
    if instance_name in self.cfg.GetInstanceList():
4180
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4181
                                 instance_name)
4182

    
4183
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4184

    
4185
    # NIC buildup
4186
    self.nics = []
4187
    for nic in self.op.nics:
4188
      # ip validity checks
4189
      ip = nic.get("ip", None)
4190
      if ip is None or ip.lower() == "none":
4191
        nic_ip = None
4192
      elif ip.lower() == constants.VALUE_AUTO:
4193
        nic_ip = hostname1.ip
4194
      else:
4195
        if not utils.IsValidIP(ip):
4196
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4197
                                     " like a valid IP" % ip)
4198
        nic_ip = ip
4199

    
4200
      # MAC address verification
4201
      mac = nic.get("mac", constants.VALUE_AUTO)
4202
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4203
        if not utils.IsValidMac(mac.lower()):
4204
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4205
                                     mac)
4206
      # bridge verification
4207
      bridge = nic.get("bridge", None)
4208
      if bridge is None:
4209
        bridge = self.cfg.GetDefBridge()
4210
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4211

    
4212
    # disk checks/pre-build
4213
    self.disks = []
4214
    for disk in self.op.disks:
4215
      mode = disk.get("mode", constants.DISK_RDWR)
4216
      if mode not in constants.DISK_ACCESS_SET:
4217
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4218
                                   mode)
4219
      size = disk.get("size", None)
4220
      if size is None:
4221
        raise errors.OpPrereqError("Missing disk size")
4222
      try:
4223
        size = int(size)
4224
      except ValueError:
4225
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4226
      self.disks.append({"size": size, "mode": mode})
4227

    
4228
    # used in CheckPrereq for ip ping check
4229
    self.check_ip = hostname1.ip
4230

    
4231
    # file storage checks
4232
    if (self.op.file_driver and
4233
        not self.op.file_driver in constants.FILE_DRIVER):
4234
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
4235
                                 self.op.file_driver)
4236

    
4237
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4238
      raise errors.OpPrereqError("File storage directory path not absolute")
4239

    
4240
    ### Node/iallocator related checks
4241
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
4242
      raise errors.OpPrereqError("One and only one of iallocator and primary"
4243
                                 " node must be given")
4244

    
4245
    if self.op.iallocator:
4246
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4247
    else:
4248
      self.op.pnode = self._ExpandNode(self.op.pnode)
4249
      nodelist = [self.op.pnode]
4250
      if self.op.snode is not None:
4251
        self.op.snode = self._ExpandNode(self.op.snode)
4252
        nodelist.append(self.op.snode)
4253
      self.needed_locks[locking.LEVEL_NODE] = nodelist
4254

    
4255
    # in case of import lock the source node too
4256
    if self.op.mode == constants.INSTANCE_IMPORT:
4257
      src_node = getattr(self.op, "src_node", None)
4258
      src_path = getattr(self.op, "src_path", None)
4259

    
4260
      if src_path is None:
4261
        self.op.src_path = src_path = self.op.instance_name
4262

    
4263
      if src_node is None:
4264
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4265
        self.op.src_node = None
4266
        if os.path.isabs(src_path):
4267
          raise errors.OpPrereqError("Importing an instance from an absolute"
4268
                                     " path requires a source node option.")
4269
      else:
4270
        self.op.src_node = src_node = self._ExpandNode(src_node)
4271
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4272
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
4273
        if not os.path.isabs(src_path):
4274
          self.op.src_path = src_path = \
4275
            os.path.join(constants.EXPORT_DIR, src_path)
4276

    
4277
    else: # INSTANCE_CREATE
4278
      if getattr(self.op, "os_type", None) is None:
4279
        raise errors.OpPrereqError("No guest OS specified")
4280

    
4281
  def _RunAllocator(self):
4282
    """Run the allocator based on input opcode.
4283

4284
    """
4285
    nics = [n.ToDict() for n in self.nics]
4286
    ial = IAllocator(self,
4287
                     mode=constants.IALLOCATOR_MODE_ALLOC,
4288
                     name=self.op.instance_name,
4289
                     disk_template=self.op.disk_template,
4290
                     tags=[],
4291
                     os=self.op.os_type,
4292
                     vcpus=self.be_full[constants.BE_VCPUS],
4293
                     mem_size=self.be_full[constants.BE_MEMORY],
4294
                     disks=self.disks,
4295
                     nics=nics,
4296
                     hypervisor=self.op.hypervisor,
4297
                     )
4298

    
4299
    ial.Run(self.op.iallocator)
4300

    
4301
    if not ial.success:
4302
      raise errors.OpPrereqError("Can't compute nodes using"
4303
                                 " iallocator '%s': %s" % (self.op.iallocator,
4304
                                                           ial.info))
4305
    if len(ial.nodes) != ial.required_nodes:
4306
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4307
                                 " of nodes (%s), required %s" %
4308
                                 (self.op.iallocator, len(ial.nodes),
4309
                                  ial.required_nodes))
4310
    self.op.pnode = ial.nodes[0]
4311
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4312
                 self.op.instance_name, self.op.iallocator,
4313
                 ", ".join(ial.nodes))
4314
    if ial.required_nodes == 2:
4315
      self.op.snode = ial.nodes[1]
4316

    
4317
  def BuildHooksEnv(self):
4318
    """Build hooks env.
4319

4320
    This runs on master, primary and secondary nodes of the instance.
4321

4322
    """
4323
    env = {
4324
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4325
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4326
      "INSTANCE_ADD_MODE": self.op.mode,
4327
      }
4328
    if self.op.mode == constants.INSTANCE_IMPORT:
4329
      env["INSTANCE_SRC_NODE"] = self.op.src_node
4330
      env["INSTANCE_SRC_PATH"] = self.op.src_path
4331
      env["INSTANCE_SRC_IMAGES"] = self.src_images
4332

    
4333
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4334
      primary_node=self.op.pnode,
4335
      secondary_nodes=self.secondaries,
4336
      status=self.instance_status,
4337
      os_type=self.op.os_type,
4338
      memory=self.be_full[constants.BE_MEMORY],
4339
      vcpus=self.be_full[constants.BE_VCPUS],
4340
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4341
    ))
4342

    
4343
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4344
          self.secondaries)
4345
    return env, nl, nl
4346

    
4347

    
4348
  def CheckPrereq(self):
4349
    """Check prerequisites.
4350

4351
    """
4352
    if (not self.cfg.GetVGName() and
4353
        self.op.disk_template not in constants.DTS_NOT_LVM):
4354
      raise errors.OpPrereqError("Cluster does not support lvm-based"
4355
                                 " instances")
4356

    
4357

    
4358
    if self.op.mode == constants.INSTANCE_IMPORT:
4359
      src_node = self.op.src_node
4360
      src_path = self.op.src_path
4361

    
4362
      if src_node is None:
4363
        exp_list = self.rpc.call_export_list(
4364
          self.acquired_locks[locking.LEVEL_NODE])
4365
        found = False
4366
        for node in exp_list:
4367
          if not exp_list[node].failed and src_path in exp_list[node].data:
4368
            found = True
4369
            self.op.src_node = src_node = node
4370
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4371
                                                       src_path)
4372
            break
4373
        if not found:
4374
          raise errors.OpPrereqError("No export found for relative path %s" %
4375
                                      src_path)
4376

    
4377
      _CheckNodeOnline(self, src_node)
4378
      result = self.rpc.call_export_info(src_node, src_path)
4379
      result.Raise()
4380
      if not result.data:
4381
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
4382

    
4383
      export_info = result.data
4384
      if not export_info.has_section(constants.INISECT_EXP):
4385
        raise errors.ProgrammerError("Corrupted export config")
4386

    
4387
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
4388
      if (int(ei_version) != constants.EXPORT_VERSION):
4389
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4390
                                   (ei_version, constants.EXPORT_VERSION))
4391

    
4392
      # Check that the new instance doesn't have less disks than the export
4393
      instance_disks = len(self.disks)
4394
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4395
      if instance_disks < export_disks:
4396
        raise errors.OpPrereqError("Not enough disks to import."
4397
                                   " (instance: %d, export: %d)" %
4398
                                   (instance_disks, export_disks))
4399

    
4400
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4401
      disk_images = []
4402
      for idx in range(export_disks):
4403
        option = 'disk%d_dump' % idx
4404
        if export_info.has_option(constants.INISECT_INS, option):
4405
          # FIXME: are the old os-es, disk sizes, etc. useful?
4406
          export_name = export_info.get(constants.INISECT_INS, option)
4407
          image = os.path.join(src_path, export_name)
4408
          disk_images.append(image)
4409
        else:
4410
          disk_images.append(False)
4411

    
4412
      self.src_images = disk_images
4413

    
4414
      old_name = export_info.get(constants.INISECT_INS, 'name')
4415
      # FIXME: int() here could throw a ValueError on broken exports
4416
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4417
      if self.op.instance_name == old_name:
4418
        for idx, nic in enumerate(self.nics):
4419
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4420
            nic_mac_ini = 'nic%d_mac' % idx
4421
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4422

    
4423
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
4424
    if self.op.start and not self.op.ip_check:
4425
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4426
                                 " adding an instance in start mode")
4427

    
4428
    if self.op.ip_check:
4429
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4430
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4431
                                   (self.check_ip, self.op.instance_name))
4432

    
4433
    #### allocator run
4434

    
4435
    if self.op.iallocator is not None:
4436
      self._RunAllocator()
4437

    
4438
    #### node related checks
4439

    
4440
    # check primary node
4441
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4442
    assert self.pnode is not None, \
4443
      "Cannot retrieve locked node %s" % self.op.pnode
4444
    if pnode.offline:
4445
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4446
                                 pnode.name)
4447

    
4448
    self.secondaries = []
4449

    
4450
    # mirror node verification
4451
    if self.op.disk_template in constants.DTS_NET_MIRROR:
4452
      if self.op.snode is None:
4453
        raise errors.OpPrereqError("The networked disk templates need"
4454
                                   " a mirror node")
4455
      if self.op.snode == pnode.name:
4456
        raise errors.OpPrereqError("The secondary node cannot be"
4457
                                   " the primary node.")
4458
      self.secondaries.append(self.op.snode)
4459
      _CheckNodeOnline(self, self.op.snode)
4460

    
4461
    nodenames = [pnode.name] + self.secondaries
4462

    
4463
    req_size = _ComputeDiskSize(self.op.disk_template,
4464
                                self.disks)
4465

    
4466
    # Check lv size requirements
4467
    if req_size is not None:
4468
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4469
                                         self.op.hypervisor)
4470
      for node in nodenames:
4471
        info = nodeinfo[node]
4472
        info.Raise()
4473
        info = info.data
4474
        if not info:
4475
          raise errors.OpPrereqError("Cannot get current information"
4476
                                     " from node '%s'" % node)
4477
        vg_free = info.get('vg_free', None)
4478
        if not isinstance(vg_free, int):
4479
          raise errors.OpPrereqError("Can't compute free disk space on"
4480
                                     " node %s" % node)
4481
        if req_size > info['vg_free']:
4482
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4483
                                     " %d MB available, %d MB required" %
4484
                                     (node, info['vg_free'], req_size))
4485

    
4486
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4487

    
4488
    # os verification
4489
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4490
    result.Raise()
4491
    if not isinstance(result.data, objects.OS):
4492
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
4493
                                 " primary node"  % self.op.os_type)
4494

    
4495
    # bridge check on primary node
4496
    bridges = [n.bridge for n in self.nics]
4497
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4498
    result.Raise()
4499
    if not result.data:
4500
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
4501
                                 " exist on destination node '%s'" %
4502
                                 (",".join(bridges), pnode.name))
4503

    
4504
    # memory check on primary node
4505
    if self.op.start:
4506
      _CheckNodeFreeMemory(self, self.pnode.name,
4507
                           "creating instance %s" % self.op.instance_name,
4508
                           self.be_full[constants.BE_MEMORY],
4509
                           self.op.hypervisor)
4510

    
4511
    self.instance_status = self.op.start
4512

    
4513
  def Exec(self, feedback_fn):
4514
    """Create and add the instance to the cluster.
4515

4516
    """
4517
    instance = self.op.instance_name
4518
    pnode_name = self.pnode.name
4519

    
4520
    for nic in self.nics:
4521
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4522
        nic.mac = self.cfg.GenerateMAC()
4523

    
4524
    ht_kind = self.op.hypervisor
4525
    if ht_kind in constants.HTS_REQ_PORT:
4526
      network_port = self.cfg.AllocatePort()
4527
    else:
4528
      network_port = None
4529

    
4530
    ##if self.op.vnc_bind_address is None:
4531
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4532

    
4533
    # this is needed because os.path.join does not accept None arguments
4534
    if self.op.file_storage_dir is None:
4535
      string_file_storage_dir = ""
4536
    else:
4537
      string_file_storage_dir = self.op.file_storage_dir
4538

    
4539
    # build the full file storage dir path
4540
    file_storage_dir = os.path.normpath(os.path.join(
4541
                                        self.cfg.GetFileStorageDir(),
4542
                                        string_file_storage_dir, instance))
4543

    
4544

    
4545
    disks = _GenerateDiskTemplate(self,
4546
                                  self.op.disk_template,
4547
                                  instance, pnode_name,
4548
                                  self.secondaries,
4549
                                  self.disks,
4550
                                  file_storage_dir,
4551
                                  self.op.file_driver,
4552
                                  0)
4553

    
4554
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4555
                            primary_node=pnode_name,
4556
                            nics=self.nics, disks=disks,
4557
                            disk_template=self.op.disk_template,
4558
                            admin_up=self.instance_status,
4559
                            network_port=network_port,
4560
                            beparams=self.op.beparams,
4561
                            hvparams=self.op.hvparams,
4562
                            hypervisor=self.op.hypervisor,
4563
                            )
4564

    
4565
    feedback_fn("* creating instance disks...")
4566
    try:
4567
      _CreateDisks(self, iobj)
4568
    except errors.OpExecError:
4569
      self.LogWarning("Device creation failed, reverting...")
4570
      try:
4571
        _RemoveDisks(self, iobj)
4572
      finally:
4573
        self.cfg.ReleaseDRBDMinors(instance)
4574
        raise
4575

    
4576
    feedback_fn("adding instance %s to cluster config" % instance)
4577

    
4578
    self.cfg.AddInstance(iobj)
4579
    # Declare that we don't want to remove the instance lock anymore, as we've
4580
    # added the instance to the config
4581
    del self.remove_locks[locking.LEVEL_INSTANCE]
4582
    # Unlock all the nodes
4583
    if self.op.mode == constants.INSTANCE_IMPORT:
4584
      nodes_keep = [self.op.src_node]
4585
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4586
                       if node != self.op.src_node]
4587
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4588
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4589
    else:
4590
      self.context.glm.release(locking.LEVEL_NODE)
4591
      del self.acquired_locks[locking.LEVEL_NODE]
4592

    
4593
    if self.op.wait_for_sync:
4594
      disk_abort = not _WaitForSync(self, iobj)
4595
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4596
      # make sure the disks are not degraded (still sync-ing is ok)
4597
      time.sleep(15)
4598
      feedback_fn("* checking mirrors status")
4599
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4600
    else:
4601
      disk_abort = False
4602

    
4603
    if disk_abort:
4604
      _RemoveDisks(self, iobj)
4605
      self.cfg.RemoveInstance(iobj.name)
4606
      # Make sure the instance lock gets removed
4607
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4608
      raise errors.OpExecError("There are some degraded disks for"
4609
                               " this instance")
4610

    
4611
    feedback_fn("creating os for instance %s on node %s" %
4612
                (instance, pnode_name))
4613

    
4614
    if iobj.disk_template != constants.DT_DISKLESS:
4615
      if self.op.mode == constants.INSTANCE_CREATE:
4616
        feedback_fn("* running the instance OS create scripts...")
4617
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4618
        msg = result.RemoteFailMsg()
4619
        if msg:
4620
          raise errors.OpExecError("Could not add os for instance %s"
4621
                                   " on node %s: %s" %
4622
                                   (instance, pnode_name, msg))
4623

    
4624
      elif self.op.mode == constants.INSTANCE_IMPORT:
4625
        feedback_fn("* running the instance OS import scripts...")
4626
        src_node = self.op.src_node
4627
        src_images = self.src_images
4628
        cluster_name = self.cfg.GetClusterName()
4629
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4630
                                                         src_node, src_images,
4631
                                                         cluster_name)
4632
        import_result.Raise()
4633
        for idx, result in enumerate(import_result.data):
4634
          if not result:
4635
            self.LogWarning("Could not import the image %s for instance"
4636
                            " %s, disk %d, on node %s" %
4637
                            (src_images[idx], instance, idx, pnode_name))
4638
      else:
4639
        # also checked in the prereq part
4640
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4641
                                     % self.op.mode)
4642

    
4643
    if self.op.start:
4644
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4645
      feedback_fn("* starting instance...")
4646
      result = self.rpc.call_instance_start(pnode_name, iobj, None)
4647
      msg = result.RemoteFailMsg()
4648
      if msg:
4649
        raise errors.OpExecError("Could not start instance: %s" % msg)
4650

    
4651

    
4652
class LUConnectConsole(NoHooksLU):
4653
  """Connect to an instance's console.
4654

4655
  This is somewhat special in that it returns the command line that
4656
  you need to run on the master node in order to connect to the
4657
  console.
4658

4659
  """
4660
  _OP_REQP = ["instance_name"]
4661
  REQ_BGL = False
4662

    
4663
  def ExpandNames(self):
4664
    self._ExpandAndLockInstance()
4665

    
4666
  def CheckPrereq(self):
4667
    """Check prerequisites.
4668

4669
    This checks that the instance is in the cluster.
4670

4671
    """
4672
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4673
    assert self.instance is not None, \
4674
      "Cannot retrieve locked instance %s" % self.op.instance_name
4675
    _CheckNodeOnline(self, self.instance.primary_node)
4676

    
4677
  def Exec(self, feedback_fn):
4678
    """Connect to the console of an instance
4679

4680
    """
4681
    instance = self.instance
4682
    node = instance.primary_node
4683

    
4684
    node_insts = self.rpc.call_instance_list([node],
4685
                                             [instance.hypervisor])[node]
4686
    node_insts.Raise()
4687

    
4688
    if instance.name not in node_insts.data:
4689
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4690

    
4691
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4692

    
4693
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4694
    console_cmd = hyper.GetShellCommandForConsole(instance)
4695

    
4696
    # build ssh cmdline
4697
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4698

    
4699

    
4700
class LUReplaceDisks(LogicalUnit):
4701
  """Replace the disks of an instance.
4702

4703
  """
4704
  HPATH = "mirrors-replace"
4705
  HTYPE = constants.HTYPE_INSTANCE
4706
  _OP_REQP = ["instance_name", "mode", "disks"]
4707
  REQ_BGL = False
4708

    
4709
  def CheckArguments(self):
4710
    if not hasattr(self.op, "remote_node"):
4711
      self.op.remote_node = None
4712
    if not hasattr(self.op, "iallocator"):
4713
      self.op.iallocator = None
4714

    
4715
    # check for valid parameter combination
4716
    cnt = [self.op.remote_node, self.op.iallocator].count(None)
4717
    if self.op.mode == constants.REPLACE_DISK_CHG:
4718
      if cnt == 2:
4719
        raise errors.OpPrereqError("When changing the secondary either an"
4720
                                   " iallocator script must be used or the"
4721
                                   " new node given")
4722
      elif cnt == 0:
4723
        raise errors.OpPrereqError("Give either the iallocator or the new"
4724
                                   " secondary, not both")
4725
    else: # not replacing the secondary
4726
      if cnt != 2:
4727
        raise errors.OpPrereqError("The iallocator and new node options can"
4728
                                   " be used only when changing the"
4729
                                   " secondary node")
4730

    
4731
  def ExpandNames(self):
4732
    self._ExpandAndLockInstance()
4733

    
4734
    if self.op.iallocator is not None:
4735
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4736
    elif self.op.remote_node is not None:
4737
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4738
      if remote_node is None:
4739
        raise errors.OpPrereqError("Node '%s' not known" %
4740
                                   self.op.remote_node)
4741
      self.op.remote_node = remote_node
4742
      # Warning: do not remove the locking of the new secondary here
4743
      # unless DRBD8.AddChildren is changed to work in parallel;
4744
      # currently it doesn't since parallel invocations of
4745
      # FindUnusedMinor will conflict
4746
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4747
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4748
    else:
4749
      self.needed_locks[locking.LEVEL_NODE] = []
4750
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4751

    
4752
  def DeclareLocks(self, level):
4753
    # If we're not already locking all nodes in the set we have to declare the
4754
    # instance's primary/secondary nodes.
4755
    if (level == locking.LEVEL_NODE and
4756
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4757
      self._LockInstancesNodes()
4758

    
4759
  def _RunAllocator(self):
4760
    """Compute a new secondary node using an IAllocator.
4761

4762
    """
4763
    ial = IAllocator(self,
4764
                     mode=constants.IALLOCATOR_MODE_RELOC,
4765
                     name=self.op.instance_name,
4766
                     relocate_from=[self.sec_node])
4767

    
4768
    ial.Run(self.op.iallocator)
4769

    
4770
    if not ial.success:
4771
      raise errors.OpPrereqError("Can't compute nodes using"
4772
                                 " iallocator '%s': %s" % (self.op.iallocator,
4773
                                                           ial.info))
4774
    if len(ial.nodes) != ial.required_nodes:
4775
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4776
                                 " of nodes (%s), required %s" %
4777
                                 (len(ial.nodes), ial.required_nodes))
4778
    self.op.remote_node = ial.nodes[0]
4779
    self.LogInfo("Selected new secondary for the instance: %s",
4780
                 self.op.remote_node)
4781

    
4782
  def BuildHooksEnv(self):
4783
    """Build hooks env.
4784

4785
    This runs on the master, the primary and all the secondaries.
4786

4787
    """
4788
    env = {
4789
      "MODE": self.op.mode,
4790
      "NEW_SECONDARY": self.op.remote_node,
4791
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4792
      }
4793
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4794
    nl = [
4795
      self.cfg.GetMasterNode(),
4796
      self.instance.primary_node,
4797
      ]
4798
    if self.op.remote_node is not None:
4799
      nl.append(self.op.remote_node)
4800
    return env, nl, nl
4801

    
4802
  def CheckPrereq(self):
4803
    """Check prerequisites.
4804

4805
    This checks that the instance is in the cluster.
4806

4807
    """
4808
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4809
    assert instance is not None, \
4810
      "Cannot retrieve locked instance %s" % self.op.instance_name
4811
    self.instance = instance
4812

    
4813
    if instance.disk_template != constants.DT_DRBD8:
4814
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4815
                                 " instances")
4816

    
4817
    if len(instance.secondary_nodes) != 1:
4818
      raise errors.OpPrereqError("The instance has a strange layout,"
4819
                                 " expected one secondary but found %d" %
4820
                                 len(instance.secondary_nodes))
4821

    
4822
    self.sec_node = instance.secondary_nodes[0]
4823

    
4824
    if self.op.iallocator is not None:
4825
      self._RunAllocator()
4826

    
4827
    remote_node = self.op.remote_node
4828
    if remote_node is not None:
4829
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4830
      assert self.remote_node_info is not None, \
4831
        "Cannot retrieve locked node %s" % remote_node
4832
    else:
4833
      self.remote_node_info = None
4834
    if remote_node == instance.primary_node:
4835
      raise errors.OpPrereqError("The specified node is the primary node of"
4836
                                 " the instance.")
4837
    elif remote_node == self.sec_node:
4838
      raise errors.OpPrereqError("The specified node is already the"
4839
                                 " secondary node of the instance.")
4840

    
4841
    if self.op.mode == constants.REPLACE_DISK_PRI:
4842
      n1 = self.tgt_node = instance.primary_node
4843
      n2 = self.oth_node = self.sec_node
4844
    elif self.op.mode == constants.REPLACE_DISK_SEC:
4845
      n1 = self.tgt_node = self.sec_node
4846
      n2 = self.oth_node = instance.primary_node
4847
    elif self.op.mode == constants.REPLACE_DISK_CHG:
4848
      n1 = self.new_node = remote_node
4849
      n2 = self.oth_node = instance.primary_node
4850
      self.tgt_node = self.sec_node
4851
    else:
4852
      raise errors.ProgrammerError("Unhandled disk replace mode")
4853

    
4854
    _CheckNodeOnline(self, n1)
4855
    _CheckNodeOnline(self, n2)
4856

    
4857
    if not self.op.disks:
4858
      self.op.disks = range(len(instance.disks))
4859

    
4860
    for disk_idx in self.op.disks:
4861
      instance.FindDisk(disk_idx)
4862

    
4863
  def _ExecD8DiskOnly(self, feedback_fn):
4864
    """Replace a disk on the primary or secondary for dbrd8.
4865

4866
    The algorithm for replace is quite complicated:
4867

4868
      1. for each disk to be replaced:
4869

4870
        1. create new LVs on the target node with unique names
4871
        1. detach old LVs from the drbd device
4872
        1. rename old LVs to name_replaced.<time_t>
4873
        1. rename new LVs to old LVs
4874
        1. attach the new LVs (with the old names now) to the drbd device
4875

4876
      1. wait for sync across all devices
4877

4878
      1. for each modified disk:
4879

4880
        1. remove old LVs (which have the name name_replaces.<time_t>)
4881

4882
    Failures are not very well handled.
4883

4884
    """
4885
    steps_total = 6
4886
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4887
    instance = self.instance
4888
    iv_names = {}
4889
    vgname = self.cfg.GetVGName()
4890
    # start of work
4891
    cfg = self.cfg
4892
    tgt_node = self.tgt_node
4893
    oth_node = self.oth_node
4894

    
4895
    # Step: check device activation
4896
    self.proc.LogStep(1, steps_total, "check device existence")
4897
    info("checking volume groups")
4898
    my_vg = cfg.GetVGName()
4899
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4900
    if not results:
4901
      raise errors.OpExecError("Can't list volume groups on the nodes")
4902
    for node in oth_node, tgt_node:
4903
      res = results[node]
4904
      if res.failed or not res.data or my_vg not in res.data:
4905
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4906
                                 (my_vg, node))
4907
    for idx, dev in enumerate(instance.disks):
4908
      if idx not in self.op.disks:
4909
        continue
4910
      for node in tgt_node, oth_node:
4911
        info("checking disk/%d on %s" % (idx, node))
4912
        cfg.SetDiskID(dev, node)
4913
        if not self.rpc.call_blockdev_find(node, dev):
4914
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4915
                                   (idx, node))
4916

    
4917
    # Step: check other node consistency
4918
    self.proc.LogStep(2, steps_total, "check peer consistency")
4919
    for idx, dev in enumerate(instance.disks):
4920
      if idx not in self.op.disks:
4921
        continue
4922
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4923
      if not _CheckDiskConsistency(self, dev, oth_node,
4924
                                   oth_node==instance.primary_node):
4925
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4926
                                 " to replace disks on this node (%s)" %
4927
                                 (oth_node, tgt_node))
4928

    
4929
    # Step: create new storage
4930
    self.proc.LogStep(3, steps_total, "allocate new storage")
4931
    for idx, dev in enumerate(instance.disks):
4932
      if idx not in self.op.disks:
4933
        continue
4934
      size = dev.size
4935
      cfg.SetDiskID(dev, tgt_node)
4936
      lv_names = [".disk%d_%s" % (idx, suf)
4937
                  for suf in ["data", "meta"]]
4938
      names = _GenerateUniqueNames(self, lv_names)
4939
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4940
                             logical_id=(vgname, names[0]))
4941
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4942
                             logical_id=(vgname, names[1]))
4943
      new_lvs = [lv_data, lv_meta]
4944
      old_lvs = dev.children
4945
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4946
      info("creating new local storage on %s for %s" %
4947
           (tgt_node, dev.iv_name))
4948
      # we pass force_create=True to force the LVM creation
4949
      for new_lv in new_lvs:
4950
        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
4951
                        _GetInstanceInfoText(instance), False)
4952

    
4953
    # Step: for each lv, detach+rename*2+attach
4954
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4955
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4956
      info("detaching %s drbd from local storage" % dev.iv_name)
4957
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4958
      result.Raise()
4959
      if not result.data:
4960
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4961
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4962
      #dev.children = []
4963
      #cfg.Update(instance)
4964

    
4965
      # ok, we created the new LVs, so now we know we have the needed
4966
      # storage; as such, we proceed on the target node to rename
4967
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4968
      # using the assumption that logical_id == physical_id (which in
4969
      # turn is the unique_id on that node)
4970

    
4971
      # FIXME(iustin): use a better name for the replaced LVs
4972
      temp_suffix = int(time.time())
4973
      ren_fn = lambda d, suff: (d.physical_id[0],
4974
                                d.physical_id[1] + "_replaced-%s" % suff)
4975
      # build the rename list based on what LVs exist on the node
4976
      rlist = []
4977
      for to_ren in old_lvs:
4978
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4979
        if not find_res.failed and find_res.data is not None: # device exists
4980
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4981

    
4982
      info("renaming the old LVs on the target node")
4983
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4984
      result.Raise()
4985
      if not result.data:
4986
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4987
      # now we rename the new LVs to the old LVs
4988
      info("renaming the new LVs on the target node")
4989
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4990
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4991
      result.Raise()
4992
      if not result.data:
4993
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4994

    
4995
      for old, new in zip(old_lvs, new_lvs):
4996
        new.logical_id = old.logical_id
4997
        cfg.SetDiskID(new, tgt_node)
4998

    
4999
      for disk in old_lvs:
5000
        disk.logical_id = ren_fn(disk, temp_suffix)
5001
        cfg.SetDiskID(disk, tgt_node)
5002

    
5003
      # now that the new lvs have the old name, we can add them to the device
5004
      info("adding new mirror component on %s" % tgt_node)
5005
      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5006
      if result.failed or not result.data:
5007
        for new_lv in new_lvs:
5008
          result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
5009
          if result.failed or not result.data:
5010
            warning("Can't rollback device %s", hint="manually cleanup unused"
5011
                    " logical volumes")
5012
        raise errors.OpExecError("Can't add local storage to drbd")
5013

    
5014
      dev.children = new_lvs
5015
      cfg.Update(instance)
5016

    
5017
    # Step: wait for sync
5018

    
5019
    # this can fail as the old devices are degraded and _WaitForSync
5020
    # does a combined result over all disks, so we don't check its
5021
    # return value
5022
    self.proc.LogStep(5, steps_total, "sync devices")
5023
    _WaitForSync(self, instance, unlock=True)
5024

    
5025
    # so check manually all the devices
5026
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5027
      cfg.SetDiskID(dev, instance.primary_node)
5028
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5029
      if result.failed or result.data[5]:
5030
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
5031

    
5032
    # Step: remove old storage
5033
    self.proc.LogStep(6, steps_total, "removing old storage")
5034
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5035
      info("remove logical volumes for %s" % name)
5036
      for lv in old_lvs:
5037
        cfg.SetDiskID(lv, tgt_node)
5038
        result = self.rpc.call_blockdev_remove(tgt_node, lv)
5039
        if result.failed or not result.data:
5040
          warning("Can't remove old LV", hint="manually remove unused LVs")
5041
          continue
5042

    
5043
  def _ExecD8Secondary(self, feedback_fn):
5044
    """Replace the secondary node for drbd8.
5045

5046
    The algorithm for replace is quite complicated:
5047
      - for all disks of the instance:
5048
        - create new LVs on the new node with same names
5049
        - shutdown the drbd device on the old secondary
5050
        - disconnect the drbd network on the primary
5051
        - create the drbd device on the new secondary
5052
        - network attach the drbd on the primary, using an artifice:
5053
          the drbd code for Attach() will connect to the network if it
5054
          finds a device which is connected to the good local disks but
5055
          not network enabled
5056
      - wait for sync across all devices
5057
      - remove all disks from the old secondary
5058

5059
    Failures are not very well handled.
5060

5061
    """
5062
    steps_total = 6
5063
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5064
    instance = self.instance
5065
    iv_names = {}
5066
    # start of work
5067
    cfg = self.cfg
5068
    old_node = self.tgt_node
5069
    new_node = self.new_node
5070
    pri_node = instance.primary_node
5071
    nodes_ip = {
5072
      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5073
      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5074
      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5075
      }
5076

    
5077
    # Step: check device activation
5078
    self.proc.LogStep(1, steps_total, "check device existence")
5079
    info("checking volume groups")
5080
    my_vg = cfg.GetVGName()
5081
    results = self.rpc.call_vg_list([pri_node, new_node])
5082
    for node in pri_node, new_node:
5083
      res = results[node]
5084
      if res.failed or not res.data or my_vg not in res.data:
5085
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5086
                                 (my_vg, node))
5087
    for idx, dev in enumerate(instance.disks):
5088
      if idx not in self.op.disks:
5089
        continue
5090
      info("checking disk/%d on %s" % (idx, pri_node))
5091
      cfg.SetDiskID(dev, pri_node)
5092
      result = self.rpc.call_blockdev_find(pri_node, dev)
5093
      result.Raise()
5094
      if not result.data:
5095
        raise errors.OpExecError("Can't find disk/%d on node %s" %
5096
                                 (idx, pri_node))
5097

    
5098
    # Step: check other node consistency
5099
    self.proc.LogStep(2, steps_total, "check peer consistency")
5100
    for idx, dev in enumerate(instance.disks):
5101
      if idx not in self.op.disks:
5102
        continue
5103
      info("checking disk/%d consistency on %s" % (idx, pri_node))
5104
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5105
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
5106
                                 " unsafe to replace the secondary" %
5107
                                 pri_node)
5108

    
5109
    # Step: create new storage
5110
    self.proc.LogStep(3, steps_total, "allocate new storage")
5111
    for idx, dev in enumerate(instance.disks):
5112
      info("adding new local storage on %s for disk/%d" %
5113
           (new_node, idx))
5114
      # we pass force_create=True to force LVM creation
5115
      for new_lv in dev.children:
5116
        _CreateBlockDev(self, new_node, instance, new_lv, True,
5117
                        _GetInstanceInfoText(instance), False)
5118

    
5119
    # Step 4: dbrd minors and drbd setups changes
5120
    # after this, we must manually remove the drbd minors on both the
5121
    # error and the success paths
5122
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5123
                                   instance.name)
5124
    logging.debug("Allocated minors %s" % (minors,))
5125
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
5126
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5127
      size = dev.size
5128
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5129
      # create new devices on new_node; note that we create two IDs:
5130
      # one without port, so the drbd will be activated without
5131
      # networking information on the new node at this stage, and one
5132
      # with network, for the latter activation in step 4
5133
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5134
      if pri_node == o_node1:
5135
        p_minor = o_minor1
5136
      else:
5137
        p_minor = o_minor2
5138

    
5139
      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5140
      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5141

    
5142
      iv_names[idx] = (dev, dev.children, new_net_id)
5143
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5144
                    new_net_id)
5145
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5146
                              logical_id=new_alone_id,
5147
                              children=dev.children)
5148
      try:
5149
        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5150
                              _GetInstanceInfoText(instance), False)
5151
      except errors.BlockDeviceError:
5152
        self.cfg.ReleaseDRBDMinors(instance.name)
5153
        raise
5154

    
5155
    for idx, dev in enumerate(instance.disks):
5156
      # we have new devices, shutdown the drbd on the old secondary
5157
      info("shutting down drbd for disk/%d on old node" % idx)
5158
      cfg.SetDiskID(dev, old_node)
5159
      result = self.rpc.call_blockdev_shutdown(old_node, dev)
5160
      if result.failed or not result.data:
5161
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
5162
                hint="Please cleanup this device manually as soon as possible")
5163

    
5164
    info("detaching primary drbds from the network (=> standalone)")
5165
    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5166
                                               instance.disks)[pri_node]
5167

    
5168
    msg = result.RemoteFailMsg()
5169
    if msg:
5170
      # detaches didn't succeed (unlikely)
5171
      self.cfg.ReleaseDRBDMinors(instance.name)
5172
      raise errors.OpExecError("Can't detach the disks from the network on"
5173
                               " old node: %s" % (msg,))
5174

    
5175
    # if we managed to detach at least one, we update all the disks of
5176
    # the instance to point to the new secondary
5177
    info("updating instance configuration")
5178
    for dev, _, new_logical_id in iv_names.itervalues():
5179
      dev.logical_id = new_logical_id
5180
      cfg.SetDiskID(dev, pri_node)
5181
    cfg.Update(instance)
5182

    
5183
    # and now perform the drbd attach
5184
    info("attaching primary drbds to new secondary (standalone => connected)")
5185
    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5186
                                           instance.disks, instance.name,
5187
                                           False)
5188
    for to_node, to_result in result.items():
5189
      msg = to_result.RemoteFailMsg()
5190
      if msg:
5191
        warning("can't attach drbd disks on node %s: %s", to_node, msg,
5192
                hint="please do a gnt-instance info to see the"
5193
                " status of disks")
5194

    
5195
    # this can fail as the old devices are degraded and _WaitForSync
5196
    # does a combined result over all disks, so we don't check its
5197
    # return value
5198
    self.proc.LogStep(5, steps_total, "sync devices")
5199
    _WaitForSync(self, instance, unlock=True)
5200

    
5201
    # so check manually all the devices
5202
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5203
      cfg.SetDiskID(dev, pri_node)
5204
      result = self.rpc.call_blockdev_find(pri_node, dev)
5205
      result.Raise()
5206
      if result.data[5]:
5207
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5208

    
5209
    self.proc.LogStep(6, steps_total, "removing old storage")
5210
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5211
      info("remove logical volumes for disk/%d" % idx)
5212
      for lv in old_lvs:
5213
        cfg.SetDiskID(lv, old_node)
5214
        result = self.rpc.call_blockdev_remove(old_node, lv)
5215
        if result.failed or not result.data:
5216
          warning("Can't remove LV on old secondary",
5217
                  hint="Cleanup stale volumes by hand")
5218

    
5219
  def Exec(self, feedback_fn):
5220
    """Execute disk replacement.
5221

5222
    This dispatches the disk replacement to the appropriate handler.
5223

5224
    """
5225
    instance = self.instance
5226

    
5227
    # Activate the instance disks if we're replacing them on a down instance
5228
    if not instance.admin_up:
5229
      _StartInstanceDisks(self, instance, True)
5230

    
5231
    if self.op.mode == constants.REPLACE_DISK_CHG:
5232
      fn = self._ExecD8Secondary
5233
    else:
5234
      fn = self._ExecD8DiskOnly
5235

    
5236
    ret = fn(feedback_fn)
5237

    
5238
    # Deactivate the instance disks if we're replacing them on a down instance
5239
    if not instance.admin_up:
5240
      _SafeShutdownInstanceDisks(self, instance)
5241

    
5242
    return ret
5243

    
5244

    
5245
class LUGrowDisk(LogicalUnit):
5246
  """Grow a disk of an instance.
5247

5248
  """
5249
  HPATH = "disk-grow"
5250
  HTYPE = constants.HTYPE_INSTANCE
5251
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5252
  REQ_BGL = False
5253

    
5254
  def ExpandNames(self):
5255
    self._ExpandAndLockInstance()
5256
    self.needed_locks[locking.LEVEL_NODE] = []
5257
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5258

    
5259
  def DeclareLocks(self, level):
5260
    if level == locking.LEVEL_NODE:
5261
      self._LockInstancesNodes()
5262

    
5263
  def BuildHooksEnv(self):
5264
    """Build hooks env.
5265

5266
    This runs on the master, the primary and all the secondaries.
5267

5268
    """
5269
    env = {
5270
      "DISK": self.op.disk,
5271
      "AMOUNT": self.op.amount,
5272
      }
5273
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5274
    nl = [
5275
      self.cfg.GetMasterNode(),
5276
      self.instance.primary_node,
5277
      ]
5278
    return env, nl, nl
5279

    
5280
  def CheckPrereq(self):
5281
    """Check prerequisites.
5282

5283
    This checks that the instance is in the cluster.
5284

5285
    """
5286
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5287
    assert instance is not None, \
5288
      "Cannot retrieve locked instance %s" % self.op.instance_name
5289
    nodenames = list(instance.all_nodes)
5290
    for node in nodenames:
5291
      _CheckNodeOnline(self, node)
5292

    
5293

    
5294
    self.instance = instance
5295

    
5296
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5297
      raise errors.OpPrereqError("Instance's disk layout does not support"
5298
                                 " growing.")
5299

    
5300
    self.disk = instance.FindDisk(self.op.disk)
5301

    
5302
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5303
                                       instance.hypervisor)
5304
    for node in nodenames:
5305
      info = nodeinfo[node]
5306
      if info.failed or not info.data:
5307
        raise errors.OpPrereqError("Cannot get current information"
5308
                                   " from node '%s'" % node)
5309
      vg_free = info.data.get('vg_free', None)
5310
      if not isinstance(vg_free, int):
5311
        raise errors.OpPrereqError("Can't compute free disk space on"
5312
                                   " node %s" % node)
5313
      if self.op.amount > vg_free:
5314
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
5315
                                   " %d MiB available, %d MiB required" %
5316
                                   (node, vg_free, self.op.amount))
5317

    
5318
  def Exec(self, feedback_fn):
5319
    """Execute disk grow.
5320

5321
    """
5322
    instance = self.instance
5323
    disk = self.disk
5324
    for node in instance.all_nodes:
5325
      self.cfg.SetDiskID(disk, node)
5326
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5327
      result.Raise()
5328
      if (not result.data or not isinstance(result.data, (list, tuple)) or
5329
          len(result.data) != 2):
5330
        raise errors.OpExecError("Grow request failed to node %s" % node)
5331
      elif not result.data[0]:
5332
        raise errors.OpExecError("Grow request failed to node %s: %s" %
5333
                                 (node, result.data[1]))
5334
    disk.RecordGrow(self.op.amount)
5335
    self.cfg.Update(instance)
5336
    if self.op.wait_for_sync:
5337
      disk_abort = not _WaitForSync(self, instance)
5338
      if disk_abort:
5339
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5340
                             " status.\nPlease check the instance.")
5341

    
5342

    
5343
class LUQueryInstanceData(NoHooksLU):
5344
  """Query runtime instance data.
5345

5346
  """
5347
  _OP_REQP = ["instances", "static"]
5348
  REQ_BGL = False
5349

    
5350
  def ExpandNames(self):
5351
    self.needed_locks = {}
5352
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5353

    
5354
    if not isinstance(self.op.instances, list):
5355
      raise errors.OpPrereqError("Invalid argument type 'instances'")
5356

    
5357
    if self.op.instances:
5358
      self.wanted_names = []
5359
      for name in self.op.instances:
5360
        full_name = self.cfg.ExpandInstanceName(name)
5361
        if full_name is None:
5362
          raise errors.OpPrereqError("Instance '%s' not known" % name)
5363
        self.wanted_names.append(full_name)
5364
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5365
    else:
5366
      self.wanted_names = None
5367
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5368

    
5369
    self.needed_locks[locking.LEVEL_NODE] = []
5370
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5371

    
5372
  def DeclareLocks(self, level):
5373
    if level == locking.LEVEL_NODE:
5374
      self._LockInstancesNodes()
5375

    
5376
  def CheckPrereq(self):
5377
    """Check prerequisites.
5378

5379
    This only checks the optional instance list against the existing names.
5380

5381
    """
5382
    if self.wanted_names is None:
5383
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5384

    
5385
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5386
                             in self.wanted_names]
5387
    return
5388

    
5389
  def _ComputeDiskStatus(self, instance, snode, dev):
5390
    """Compute block device status.
5391

5392
    """
5393
    static = self.op.static
5394
    if not static:
5395
      self.cfg.SetDiskID(dev, instance.primary_node)
5396
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5397
      dev_pstatus.Raise()
5398
      dev_pstatus = dev_pstatus.data
5399
    else:
5400
      dev_pstatus = None
5401

    
5402
    if dev.dev_type in constants.LDS_DRBD:
5403
      # we change the snode then (otherwise we use the one passed in)
5404
      if dev.logical_id[0] == instance.primary_node:
5405
        snode = dev.logical_id[1]
5406
      else:
5407
        snode = dev.logical_id[0]
5408

    
5409
    if snode and not static:
5410
      self.cfg.SetDiskID(dev, snode)
5411
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5412
      dev_sstatus.Raise()
5413
      dev_sstatus = dev_sstatus.data
5414
    else:
5415
      dev_sstatus = None
5416

    
5417
    if dev.children:
5418
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
5419
                      for child in dev.children]
5420
    else:
5421
      dev_children = []
5422

    
5423
    data = {
5424
      "iv_name": dev.iv_name,
5425
      "dev_type": dev.dev_type,
5426
      "logical_id": dev.logical_id,
5427
      "physical_id": dev.physical_id,
5428
      "pstatus": dev_pstatus,
5429
      "sstatus": dev_sstatus,
5430
      "children": dev_children,
5431
      "mode": dev.mode,
5432
      }
5433

    
5434
    return data
5435

    
5436
  def Exec(self, feedback_fn):
5437
    """Gather and return data"""
5438
    result = {}
5439

    
5440
    cluster = self.cfg.GetClusterInfo()
5441

    
5442
    for instance in self.wanted_instances:
5443
      if not self.op.static:
5444
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5445
                                                  instance.name,
5446
                                                  instance.hypervisor)
5447
        remote_info.Raise()
5448
        remote_info = remote_info.data
5449
        if remote_info and "state" in remote_info:
5450
          remote_state = "up"
5451
        else:
5452
          remote_state = "down"
5453
      else:
5454
        remote_state = None
5455
      if instance.admin_up:
5456
        config_state = "up"
5457
      else:
5458
        config_state = "down"
5459

    
5460
      disks = [self._ComputeDiskStatus(instance, None, device)
5461
               for device in instance.disks]
5462

    
5463
      idict = {
5464
        "name": instance.name,
5465
        "config_state": config_state,
5466
        "run_state": remote_state,
5467
        "pnode": instance.primary_node,
5468
        "snodes": instance.secondary_nodes,
5469
        "os": instance.os,
5470
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5471
        "disks": disks,
5472
        "hypervisor": instance.hypervisor,
5473
        "network_port": instance.network_port,
5474
        "hv_instance": instance.hvparams,
5475
        "hv_actual": cluster.FillHV(instance),
5476
        "be_instance": instance.beparams,
5477
        "be_actual": cluster.FillBE(instance),
5478
        }
5479

    
5480
      result[instance.name] = idict
5481

    
5482
    return result
5483

    
5484

    
5485
class LUSetInstanceParams(LogicalUnit):
5486
  """Modifies an instances's parameters.
5487

5488
  """
5489
  HPATH = "instance-modify"
5490
  HTYPE = constants.HTYPE_INSTANCE
5491
  _OP_REQP = ["instance_name"]
5492
  REQ_BGL = False
5493

    
5494
  def CheckArguments(self):
5495
    if not hasattr(self.op, 'nics'):
5496
      self.op.nics = []
5497
    if not hasattr(self.op, 'disks'):
5498
      self.op.disks = []
5499
    if not hasattr(self.op, 'beparams'):
5500
      self.op.beparams = {}
5501
    if not hasattr(self.op, 'hvparams'):
5502
      self.op.hvparams = {}
5503
    self.op.force = getattr(self.op, "force", False)
5504
    if not (self.op.nics or self.op.disks or
5505
            self.op.hvparams or self.op.beparams):
5506
      raise errors.OpPrereqError("No changes submitted")
5507

    
5508
    utils.CheckBEParams(self.op.beparams)
5509

    
5510
    # Disk validation
5511
    disk_addremove = 0
5512
    for disk_op, disk_dict in self.op.disks:
5513
      if disk_op == constants.DDM_REMOVE:
5514
        disk_addremove += 1
5515
        continue
5516
      elif disk_op == constants.DDM_ADD:
5517
        disk_addremove += 1
5518
      else:
5519
        if not isinstance(disk_op, int):
5520
          raise errors.OpPrereqError("Invalid disk index")
5521
      if disk_op == constants.DDM_ADD:
5522
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5523
        if mode not in constants.DISK_ACCESS_SET:
5524
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5525
        size = disk_dict.get('size', None)
5526
        if size is None:
5527
          raise errors.OpPrereqError("Required disk parameter size missing")
5528
        try:
5529
          size = int(size)
5530
        except ValueError, err:
5531
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5532
                                     str(err))
5533
        disk_dict['size'] = size
5534
      else:
5535
        # modification of disk
5536
        if 'size' in disk_dict:
5537
          raise errors.OpPrereqError("Disk size change not possible, use"
5538
                                     " grow-disk")
5539

    
5540
    if disk_addremove > 1:
5541
      raise errors.OpPrereqError("Only one disk add or remove operation"
5542
                                 " supported at a time")
5543

    
5544
    # NIC validation
5545
    nic_addremove = 0
5546
    for nic_op, nic_dict in self.op.nics:
5547
      if nic_op == constants.DDM_REMOVE:
5548
        nic_addremove += 1
5549
        continue
5550
      elif nic_op == constants.DDM_ADD:
5551
        nic_addremove += 1
5552
      else:
5553
        if not isinstance(nic_op, int):
5554
          raise errors.OpPrereqError("Invalid nic index")
5555

    
5556
      # nic_dict should be a dict
5557
      nic_ip = nic_dict.get('ip', None)
5558
      if nic_ip is not None:
5559
        if nic_ip.lower() == "none":
5560
          nic_dict['ip'] = None
5561
        else:
5562
          if not utils.IsValidIP(nic_ip):
5563
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5564
      # we can only check None bridges and assign the default one
5565
      nic_bridge = nic_dict.get('bridge', None)
5566
      if nic_bridge is None:
5567
        nic_dict['bridge'] = self.cfg.GetDefBridge()
5568
      # but we can validate MACs
5569
      nic_mac = nic_dict.get('mac', None)
5570
      if nic_mac is not None:
5571
        if self.cfg.IsMacInUse(nic_mac):
5572
          raise errors.OpPrereqError("MAC address %s already in use"
5573
                                     " in cluster" % nic_mac)
5574
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5575
          if not utils.IsValidMac(nic_mac):
5576
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5577
    if nic_addremove > 1:
5578
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5579
                                 " supported at a time")
5580

    
5581
  def ExpandNames(self):
5582
    self._ExpandAndLockInstance()
5583
    self.needed_locks[locking.LEVEL_NODE] = []
5584
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5585

    
5586
  def DeclareLocks(self, level):
5587
    if level == locking.LEVEL_NODE:
5588
      self._LockInstancesNodes()
5589

    
5590
  def BuildHooksEnv(self):
5591
    """Build hooks env.
5592

5593
    This runs on the master, primary and secondaries.
5594

5595
    """
5596
    args = dict()
5597
    if constants.BE_MEMORY in self.be_new:
5598
      args['memory'] = self.be_new[constants.BE_MEMORY]
5599
    if constants.BE_VCPUS in self.be_new:
5600
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5601
    # FIXME: readd disk/nic changes
5602
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5603
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5604
    return env, nl, nl
5605

    
5606
  def CheckPrereq(self):
5607
    """Check prerequisites.
5608

5609
    This only checks the instance list against the existing names.
5610

5611
    """
5612
    force = self.force = self.op.force
5613

    
5614
    # checking the new params on the primary/secondary nodes
5615

    
5616
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5617
    assert self.instance is not None, \
5618
      "Cannot retrieve locked instance %s" % self.op.instance_name
5619
    pnode = instance.primary_node
5620
    nodelist = list(instance.all_nodes)
5621

    
5622
    # hvparams processing
5623
    if self.op.hvparams:
5624
      i_hvdict = copy.deepcopy(instance.hvparams)
5625
      for key, val in self.op.hvparams.iteritems():
5626
        if val == constants.VALUE_DEFAULT:
5627
          try:
5628
            del i_hvdict[key]
5629
          except KeyError:
5630
            pass
5631
        elif val == constants.VALUE_NONE:
5632
          i_hvdict[key] = None
5633
        else:
5634
          i_hvdict[key] = val
5635
      cluster = self.cfg.GetClusterInfo()
5636
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5637
                                i_hvdict)
5638
      # local check
5639
      hypervisor.GetHypervisor(
5640
        instance.hypervisor).CheckParameterSyntax(hv_new)
5641
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5642
      self.hv_new = hv_new # the new actual values
5643
      self.hv_inst = i_hvdict # the new dict (without defaults)
5644
    else:
5645
      self.hv_new = self.hv_inst = {}
5646

    
5647
    # beparams processing
5648
    if self.op.beparams:
5649
      i_bedict = copy.deepcopy(instance.beparams)
5650
      for key, val in self.op.beparams.iteritems():
5651
        if val == constants.VALUE_DEFAULT:
5652
          try:
5653
            del i_bedict[key]
5654
          except KeyError:
5655
            pass
5656
        else:
5657
          i_bedict[key] = val
5658
      cluster = self.cfg.GetClusterInfo()
5659
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5660
                                i_bedict)
5661
      self.be_new = be_new # the new actual values
5662
      self.be_inst = i_bedict # the new dict (without defaults)
5663
    else:
5664
      self.be_new = self.be_inst = {}
5665

    
5666
    self.warn = []
5667

    
5668
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5669
      mem_check_list = [pnode]
5670
      if be_new[constants.BE_AUTO_BALANCE]:
5671
        # either we changed auto_balance to yes or it was from before
5672
        mem_check_list.extend(instance.secondary_nodes)
5673
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5674
                                                  instance.hypervisor)
5675
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5676
                                         instance.hypervisor)
5677
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5678
        # Assume the primary node is unreachable and go ahead
5679
        self.warn.append("Can't get info from primary node %s" % pnode)
5680
      else:
5681
        if not instance_info.failed and instance_info.data:
5682
          current_mem = instance_info.data['memory']
5683
        else:
5684
          # Assume instance not running
5685
          # (there is a slight race condition here, but it's not very probable,
5686
          # and we have no other way to check)
5687
          current_mem = 0
5688
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5689
                    nodeinfo[pnode].data['memory_free'])
5690
        if miss_mem > 0:
5691
          raise errors.OpPrereqError("This change will prevent the instance"
5692
                                     " from starting, due to %d MB of memory"
5693
                                     " missing on its primary node" % miss_mem)
5694

    
5695
      if be_new[constants.BE_AUTO_BALANCE]:
5696
        for node, nres in nodeinfo.iteritems():
5697
          if node not in instance.secondary_nodes:
5698
            continue
5699
          if nres.failed or not isinstance(nres.data, dict):
5700
            self.warn.append("Can't get info from secondary node %s" % node)
5701
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5702
            self.warn.append("Not enough memory to failover instance to"
5703
                             " secondary node %s" % node)
5704

    
5705
    # NIC processing
5706
    for nic_op, nic_dict in self.op.nics:
5707
      if nic_op == constants.DDM_REMOVE:
5708
        if not instance.nics:
5709
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5710
        continue
5711
      if nic_op != constants.DDM_ADD:
5712
        # an existing nic
5713
        if nic_op < 0 or nic_op >= len(instance.nics):
5714
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5715
                                     " are 0 to %d" %
5716
                                     (nic_op, len(instance.nics)))
5717
      nic_bridge = nic_dict.get('bridge', None)
5718
      if nic_bridge is not None:
5719
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5720
          msg = ("Bridge '%s' doesn't exist on one of"
5721
                 " the instance nodes" % nic_bridge)
5722
          if self.force:
5723
            self.warn.append(msg)
5724
          else:
5725
            raise errors.OpPrereqError(msg)
5726

    
5727
    # DISK processing
5728
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5729
      raise errors.OpPrereqError("Disk operations not supported for"
5730
                                 " diskless instances")
5731
    for disk_op, disk_dict in self.op.disks:
5732
      if disk_op == constants.DDM_REMOVE:
5733
        if len(instance.disks) == 1:
5734
          raise errors.OpPrereqError("Cannot remove the last disk of"
5735
                                     " an instance")
5736
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5737
        ins_l = ins_l[pnode]
5738
        if ins_l.failed or not isinstance(ins_l.data, list):
5739
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5740
        if instance.name in ins_l.data:
5741
          raise errors.OpPrereqError("Instance is running, can't remove"
5742
                                     " disks.")
5743

    
5744
      if (disk_op == constants.DDM_ADD and
5745
          len(instance.nics) >= constants.MAX_DISKS):
5746
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5747
                                   " add more" % constants.MAX_DISKS)
5748
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5749
        # an existing disk
5750
        if disk_op < 0 or disk_op >= len(instance.disks):
5751
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5752
                                     " are 0 to %d" %
5753
                                     (disk_op, len(instance.disks)))
5754

    
5755
    return
5756

    
5757
  def Exec(self, feedback_fn):
5758
    """Modifies an instance.
5759

5760
    All parameters take effect only at the next restart of the instance.
5761

5762
    """
5763
    # Process here the warnings from CheckPrereq, as we don't have a
5764
    # feedback_fn there.
5765
    for warn in self.warn:
5766
      feedback_fn("WARNING: %s" % warn)
5767

    
5768
    result = []
5769
    instance = self.instance
5770
    # disk changes
5771
    for disk_op, disk_dict in self.op.disks:
5772
      if disk_op == constants.DDM_REMOVE:
5773
        # remove the last disk
5774
        device = instance.disks.pop()
5775
        device_idx = len(instance.disks)
5776
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5777
          self.cfg.SetDiskID(disk, node)
5778
          rpc_result = self.rpc.call_blockdev_remove(node, disk)
5779
          if rpc_result.failed or not rpc_result.data:
5780
            self.proc.LogWarning("Could not remove disk/%d on node %s,"
5781
                                 " continuing anyway", device_idx, node)
5782
        result.append(("disk/%d" % device_idx, "remove"))
5783
      elif disk_op == constants.DDM_ADD:
5784
        # add a new disk
5785
        if instance.disk_template == constants.DT_FILE:
5786
          file_driver, file_path = instance.disks[0].logical_id
5787
          file_path = os.path.dirname(file_path)
5788
        else:
5789
          file_driver = file_path = None
5790
        disk_idx_base = len(instance.disks)
5791
        new_disk = _GenerateDiskTemplate(self,
5792
                                         instance.disk_template,
5793
                                         instance.name, instance.primary_node,
5794
                                         instance.secondary_nodes,
5795
                                         [disk_dict],
5796
                                         file_path,
5797
                                         file_driver,
5798
                                         disk_idx_base)[0]
5799
        instance.disks.append(new_disk)
5800
        info = _GetInstanceInfoText(instance)
5801

    
5802
        logging.info("Creating volume %s for instance %s",
5803
                     new_disk.iv_name, instance.name)
5804
        # Note: this needs to be kept in sync with _CreateDisks
5805
        #HARDCODE
5806
        for node in instance.all_nodes:
5807
          f_create = node == instance.primary_node
5808
          try:
5809
            _CreateBlockDev(self, node, instance, new_disk,
5810
                            f_create, info, f_create)
5811
          except errors.OpExecError, err:
5812
            self.LogWarning("Failed to create volume %s (%s) on"
5813
                            " node %s: %s",
5814
                            new_disk.iv_name, new_disk, node, err)
5815
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5816
                       (new_disk.size, new_disk.mode)))
5817
      else:
5818
        # change a given disk
5819
        instance.disks[disk_op].mode = disk_dict['mode']
5820
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5821
    # NIC changes
5822
    for nic_op, nic_dict in self.op.nics:
5823
      if nic_op == constants.DDM_REMOVE:
5824
        # remove the last nic
5825
        del instance.nics[-1]
5826
        result.append(("nic.%d" % len(instance.nics), "remove"))
5827
      elif nic_op == constants.DDM_ADD:
5828
        # add a new nic
5829
        if 'mac' not in nic_dict:
5830
          mac = constants.VALUE_GENERATE
5831
        else:
5832
          mac = nic_dict['mac']
5833
        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5834
          mac = self.cfg.GenerateMAC()
5835
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5836
                              bridge=nic_dict.get('bridge', None))
5837
        instance.nics.append(new_nic)
5838
        result.append(("nic.%d" % (len(instance.nics) - 1),
5839
                       "add:mac=%s,ip=%s,bridge=%s" %
5840
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
5841
      else:
5842
        # change a given nic
5843
        for key in 'mac', 'ip', 'bridge':
5844
          if key in nic_dict:
5845
            setattr(instance.nics[nic_op], key, nic_dict[key])
5846
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5847

    
5848
    # hvparams changes
5849
    if self.op.hvparams:
5850
      instance.hvparams = self.hv_new
5851
      for key, val in self.op.hvparams.iteritems():
5852
        result.append(("hv/%s" % key, val))
5853

    
5854
    # beparams changes
5855
    if self.op.beparams:
5856
      instance.beparams = self.be_inst
5857
      for key, val in self.op.beparams.iteritems():
5858
        result.append(("be/%s" % key, val))
5859

    
5860
    self.cfg.Update(instance)
5861

    
5862
    return result
5863

    
5864

    
5865
class LUQueryExports(NoHooksLU):
5866
  """Query the exports list
5867

5868
  """
5869
  _OP_REQP = ['nodes']
5870
  REQ_BGL = False
5871

    
5872
  def ExpandNames(self):
5873
    self.needed_locks = {}
5874
    self.share_locks[locking.LEVEL_NODE] = 1
5875
    if not self.op.nodes:
5876
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5877
    else:
5878
      self.needed_locks[locking.LEVEL_NODE] = \
5879
        _GetWantedNodes(self, self.op.nodes)
5880

    
5881
  def CheckPrereq(self):
5882
    """Check prerequisites.
5883

5884
    """
5885
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5886

    
5887
  def Exec(self, feedback_fn):
5888
    """Compute the list of all the exported system images.
5889

5890
    @rtype: dict
5891
    @return: a dictionary with the structure node->(export-list)
5892
        where export-list is a list of the instances exported on
5893
        that node.
5894

5895
    """
5896
    rpcresult = self.rpc.call_export_list(self.nodes)
5897
    result = {}
5898
    for node in rpcresult:
5899
      if rpcresult[node].failed:
5900
        result[node] = False
5901
      else:
5902
        result[node] = rpcresult[node].data
5903

    
5904
    return result
5905

    
5906

    
5907
class LUExportInstance(LogicalUnit):
5908
  """Export an instance to an image in the cluster.
5909

5910
  """
5911
  HPATH = "instance-export"
5912
  HTYPE = constants.HTYPE_INSTANCE
5913
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
5914
  REQ_BGL = False
5915

    
5916
  def ExpandNames(self):
5917
    self._ExpandAndLockInstance()
5918
    # FIXME: lock only instance primary and destination node
5919
    #
5920
    # Sad but true, for now we have do lock all nodes, as we don't know where
5921
    # the previous export might be, and and in this LU we search for it and
5922
    # remove it from its current node. In the future we could fix this by:
5923
    #  - making a tasklet to search (share-lock all), then create the new one,
5924
    #    then one to remove, after
5925
    #  - removing the removal operation altoghether
5926
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5927

    
5928
  def DeclareLocks(self, level):
5929
    """Last minute lock declaration."""
5930
    # All nodes are locked anyway, so nothing to do here.
5931

    
5932
  def BuildHooksEnv(self):
5933
    """Build hooks env.
5934

5935
    This will run on the master, primary node and target node.
5936

5937
    """
5938
    env = {
5939
      "EXPORT_NODE": self.op.target_node,
5940
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5941
      }
5942
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5943
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5944
          self.op.target_node]
5945
    return env, nl, nl
5946

    
5947
  def CheckPrereq(self):
5948
    """Check prerequisites.
5949

5950
    This checks that the instance and node names are valid.
5951

5952
    """
5953
    instance_name = self.op.instance_name
5954
    self.instance = self.cfg.GetInstanceInfo(instance_name)
5955
    assert self.instance is not None, \
5956
          "Cannot retrieve locked instance %s" % self.op.instance_name
5957
    _CheckNodeOnline(self, self.instance.primary_node)
5958

    
5959
    self.dst_node = self.cfg.GetNodeInfo(
5960
      self.cfg.ExpandNodeName(self.op.target_node))
5961

    
5962
    if self.dst_node is None:
5963
      # This is wrong node name, not a non-locked node
5964
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5965
    _CheckNodeOnline(self, self.dst_node.name)
5966

    
5967
    # instance disk type verification
5968
    for disk in self.instance.disks:
5969
      if disk.dev_type == constants.LD_FILE:
5970
        raise errors.OpPrereqError("Export not supported for instances with"
5971
                                   " file-based disks")
5972

    
5973
  def Exec(self, feedback_fn):
5974
    """Export an instance to an image in the cluster.
5975

5976
    """
5977
    instance = self.instance
5978
    dst_node = self.dst_node
5979
    src_node = instance.primary_node
5980
    if self.op.shutdown:
5981
      # shutdown the instance, but not the disks
5982
      result = self.rpc.call_instance_shutdown(src_node, instance)
5983
      result.Raise()
5984
      if not result.data:
5985
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5986
                                 (instance.name, src_node))
5987

    
5988
    vgname = self.cfg.GetVGName()
5989

    
5990
    snap_disks = []
5991

    
5992
    # set the disks ID correctly since call_instance_start needs the
5993
    # correct drbd minor to create the symlinks
5994
    for disk in instance.disks:
5995
      self.cfg.SetDiskID(disk, src_node)
5996

    
5997
    try:
5998
      for disk in instance.disks:
5999
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6000
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6001
        if new_dev_name.failed or not new_dev_name.data:
6002
          self.LogWarning("Could not snapshot block device %s on node %s",
6003
                          disk.logical_id[1], src_node)
6004
          snap_disks.append(False)
6005
        else:
6006
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6007
                                 logical_id=(vgname, new_dev_name.data),
6008
                                 physical_id=(vgname, new_dev_name.data),
6009
                                 iv_name=disk.iv_name)
6010
          snap_disks.append(new_dev)
6011

    
6012
    finally:
6013
      if self.op.shutdown and instance.admin_up:
6014
        result = self.rpc.call_instance_start(src_node, instance, None)
6015
        msg = result.RemoteFailMsg()
6016
        if msg:
6017
          _ShutdownInstanceDisks(self, instance)
6018
          raise errors.OpExecError("Could not start instance: %s" % msg)
6019

    
6020
    # TODO: check for size
6021

    
6022
    cluster_name = self.cfg.GetClusterName()
6023
    for idx, dev in enumerate(snap_disks):
6024
      if dev:
6025
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6026
                                               instance, cluster_name, idx)
6027
        if result.failed or not result.data:
6028
          self.LogWarning("Could not export block device %s from node %s to"
6029
                          " node %s", dev.logical_id[1], src_node,
6030
                          dst_node.name)
6031
        result = self.rpc.call_blockdev_remove(src_node, dev)
6032
        if result.failed or not result.data:
6033
          self.LogWarning("Could not remove snapshot block device %s from node"
6034
                          " %s", dev.logical_id[1], src_node)
6035

    
6036
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6037
    if result.failed or not result.data:
6038
      self.LogWarning("Could not finalize export for instance %s on node %s",
6039
                      instance.name, dst_node.name)
6040

    
6041
    nodelist = self.cfg.GetNodeList()
6042
    nodelist.remove(dst_node.name)
6043

    
6044
    # on one-node clusters nodelist will be empty after the removal
6045
    # if we proceed the backup would be removed because OpQueryExports
6046
    # substitutes an empty list with the full cluster node list.
6047
    if nodelist:
6048
      exportlist = self.rpc.call_export_list(nodelist)
6049
      for node in exportlist:
6050
        if exportlist[node].failed:
6051
          continue
6052
        if instance.name in exportlist[node].data:
6053
          if not self.rpc.call_export_remove(node, instance.name):
6054
            self.LogWarning("Could not remove older export for instance %s"
6055
                            " on node %s", instance.name, node)
6056

    
6057

    
6058
class LURemoveExport(NoHooksLU):
6059
  """Remove exports related to the named instance.
6060

6061
  """
6062
  _OP_REQP = ["instance_name"]
6063
  REQ_BGL = False
6064

    
6065
  def ExpandNames(self):
6066
    self.needed_locks = {}
6067
    # We need all nodes to be locked in order for RemoveExport to work, but we
6068
    # don't need to lock the instance itself, as nothing will happen to it (and
6069
    # we can remove exports also for a removed instance)
6070
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6071

    
6072
  def CheckPrereq(self):
6073
    """Check prerequisites.
6074
    """
6075
    pass
6076

    
6077
  def Exec(self, feedback_fn):
6078
    """Remove any export.
6079

6080
    """
6081
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6082
    # If the instance was not found we'll try with the name that was passed in.
6083
    # This will only work if it was an FQDN, though.
6084
    fqdn_warn = False
6085
    if not instance_name:
6086
      fqdn_warn = True
6087
      instance_name = self.op.instance_name
6088

    
6089
    exportlist = self.rpc.call_export_list(self.acquired_locks[
6090
      locking.LEVEL_NODE])
6091
    found = False
6092
    for node in exportlist:
6093
      if exportlist[node].failed:
6094
        self.LogWarning("Failed to query node %s, continuing" % node)
6095
        continue
6096
      if instance_name in exportlist[node].data:
6097
        found = True
6098
        result = self.rpc.call_export_remove(node, instance_name)
6099
        if result.failed or not result.data:
6100
          logging.error("Could not remove export for instance %s"
6101
                        " on node %s", instance_name, node)
6102

    
6103
    if fqdn_warn and not found:
6104
      feedback_fn("Export not found. If trying to remove an export belonging"
6105
                  " to a deleted instance please use its Fully Qualified"
6106
                  " Domain Name.")
6107

    
6108

    
6109
class TagsLU(NoHooksLU):
6110
  """Generic tags LU.
6111

6112
  This is an abstract class which is the parent of all the other tags LUs.
6113

6114
  """
6115

    
6116
  def ExpandNames(self):
6117
    self.needed_locks = {}
6118
    if self.op.kind == constants.TAG_NODE:
6119
      name = self.cfg.ExpandNodeName(self.op.name)
6120
      if name is None:
6121
        raise errors.OpPrereqError("Invalid node name (%s)" %
6122
                                   (self.op.name,))
6123
      self.op.name = name
6124
      self.needed_locks[locking.LEVEL_NODE] = name
6125
    elif self.op.kind == constants.TAG_INSTANCE:
6126
      name = self.cfg.ExpandInstanceName(self.op.name)
6127
      if name is None:
6128
        raise errors.OpPrereqError("Invalid instance name (%s)" %
6129
                                   (self.op.name,))
6130
      self.op.name = name
6131
      self.needed_locks[locking.LEVEL_INSTANCE] = name
6132

    
6133
  def CheckPrereq(self):
6134
    """Check prerequisites.
6135

6136
    """
6137
    if self.op.kind == constants.TAG_CLUSTER:
6138
      self.target = self.cfg.GetClusterInfo()
6139
    elif self.op.kind == constants.TAG_NODE:
6140
      self.target = self.cfg.GetNodeInfo(self.op.name)
6141
    elif self.op.kind == constants.TAG_INSTANCE:
6142
      self.target = self.cfg.GetInstanceInfo(self.op.name)
6143
    else:
6144
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6145
                                 str(self.op.kind))
6146

    
6147

    
6148
class LUGetTags(TagsLU):
6149
  """Returns the tags of a given object.
6150

6151
  """
6152
  _OP_REQP = ["kind", "name"]
6153
  REQ_BGL = False
6154

    
6155
  def Exec(self, feedback_fn):
6156
    """Returns the tag list.
6157

6158
    """
6159
    return list(self.target.GetTags())
6160

    
6161

    
6162
class LUSearchTags(NoHooksLU):
6163
  """Searches the tags for a given pattern.
6164

6165
  """
6166
  _OP_REQP = ["pattern"]
6167
  REQ_BGL = False
6168

    
6169
  def ExpandNames(self):
6170
    self.needed_locks = {}
6171

    
6172
  def CheckPrereq(self):
6173
    """Check prerequisites.
6174

6175
    This checks the pattern passed for validity by compiling it.
6176

6177
    """
6178
    try:
6179
      self.re = re.compile(self.op.pattern)
6180
    except re.error, err:
6181
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6182
                                 (self.op.pattern, err))
6183

    
6184
  def Exec(self, feedback_fn):
6185
    """Returns the tag list.
6186

6187
    """
6188
    cfg = self.cfg
6189
    tgts = [("/cluster", cfg.GetClusterInfo())]
6190
    ilist = cfg.GetAllInstancesInfo().values()
6191
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6192
    nlist = cfg.GetAllNodesInfo().values()
6193
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6194
    results = []
6195
    for path, target in tgts:
6196
      for tag in target.GetTags():
6197
        if self.re.search(tag):
6198
          results.append((path, tag))
6199
    return results
6200

    
6201

    
6202
class LUAddTags(TagsLU):
6203
  """Sets a tag on a given object.
6204

6205
  """
6206
  _OP_REQP = ["kind", "name", "tags"]
6207
  REQ_BGL = False
6208

    
6209
  def CheckPrereq(self):
6210
    """Check prerequisites.
6211

6212
    This checks the type and length of the tag name and value.
6213

6214
    """
6215
    TagsLU.CheckPrereq(self)
6216
    for tag in self.op.tags:
6217
      objects.TaggableObject.ValidateTag(tag)
6218

    
6219
  def Exec(self, feedback_fn):
6220
    """Sets the tag.
6221

6222
    """
6223
    try:
6224
      for tag in self.op.tags:
6225
        self.target.AddTag(tag)
6226
    except errors.TagError, err:
6227
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
6228
    try:
6229
      self.cfg.Update(self.target)
6230
    except errors.ConfigurationError:
6231
      raise errors.OpRetryError("There has been a modification to the"
6232
                                " config file and the operation has been"
6233
                                " aborted. Please retry.")
6234

    
6235

    
6236
class LUDelTags(TagsLU):
6237
  """Delete a list of tags from a given object.
6238

6239
  """
6240
  _OP_REQP = ["kind", "name", "tags"]
6241
  REQ_BGL = False
6242

    
6243
  def CheckPrereq(self):
6244
    """Check prerequisites.
6245

6246
    This checks that we have the given tag.
6247

6248
    """
6249
    TagsLU.CheckPrereq(self)
6250
    for tag in self.op.tags:
6251
      objects.TaggableObject.ValidateTag(tag)
6252
    del_tags = frozenset(self.op.tags)
6253
    cur_tags = self.target.GetTags()
6254
    if not del_tags <= cur_tags:
6255
      diff_tags = del_tags - cur_tags
6256
      diff_names = ["'%s'" % tag for tag in diff_tags]
6257
      diff_names.sort()
6258
      raise errors.OpPrereqError("Tag(s) %s not found" %
6259
                                 (",".join(diff_names)))
6260

    
6261
  def Exec(self, feedback_fn):
6262
    """Remove the tag from the object.
6263

6264
    """
6265
    for tag in self.op.tags:
6266
      self.target.RemoveTag(tag)
6267
    try:
6268
      self.cfg.Update(self.target)
6269
    except errors.ConfigurationError:
6270
      raise errors.OpRetryError("There has been a modification to the"
6271
                                " config file and the operation has been"
6272
                                " aborted. Please retry.")
6273

    
6274

    
6275
class LUTestDelay(NoHooksLU):
6276
  """Sleep for a specified amount of time.
6277

6278
  This LU sleeps on the master and/or nodes for a specified amount of
6279
  time.
6280

6281
  """
6282
  _OP_REQP = ["duration", "on_master", "on_nodes"]
6283
  REQ_BGL = False
6284

    
6285
  def ExpandNames(self):
6286
    """Expand names and set required locks.
6287

6288
    This expands the node list, if any.
6289

6290
    """
6291
    self.needed_locks = {}
6292
    if self.op.on_nodes:
6293
      # _GetWantedNodes can be used here, but is not always appropriate to use
6294
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6295
      # more information.
6296
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6297
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6298

    
6299
  def CheckPrereq(self):
6300
    """Check prerequisites.
6301

6302
    """
6303

    
6304
  def Exec(self, feedback_fn):
6305
    """Do the actual sleep.
6306

6307
    """
6308
    if self.op.on_master:
6309
      if not utils.TestDelay(self.op.duration):
6310
        raise errors.OpExecError("Error during master delay test")
6311
    if self.op.on_nodes:
6312
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6313
      if not result:
6314
        raise errors.OpExecError("Complete failure from rpc call")
6315
      for node, node_result in result.items():
6316
        node_result.Raise()
6317
        if not node_result.data:
6318
          raise errors.OpExecError("Failure during rpc call to node %s,"
6319
                                   " result: %s" % (node, node_result.data))
6320

    
6321

    
6322
class IAllocator(object):
6323
  """IAllocator framework.
6324

6325
  An IAllocator instance has three sets of attributes:
6326
    - cfg that is needed to query the cluster
6327
    - input data (all members of the _KEYS class attribute are required)
6328
    - four buffer attributes (in|out_data|text), that represent the
6329
      input (to the external script) in text and data structure format,
6330
      and the output from it, again in two formats
6331
    - the result variables from the script (success, info, nodes) for
6332
      easy usage
6333

6334
  """
6335
  _ALLO_KEYS = [
6336
    "mem_size", "disks", "disk_template",
6337
    "os", "tags", "nics", "vcpus", "hypervisor",
6338
    ]
6339
  _RELO_KEYS = [
6340
    "relocate_from",
6341
    ]
6342

    
6343
  def __init__(self, lu, mode, name, **kwargs):
6344
    self.lu = lu
6345
    # init buffer variables
6346
    self.in_text = self.out_text = self.in_data = self.out_data = None
6347
    # init all input fields so that pylint is happy
6348
    self.mode = mode
6349
    self.name = name
6350
    self.mem_size = self.disks = self.disk_template = None
6351
    self.os = self.tags = self.nics = self.vcpus = None
6352
    self.hypervisor = None
6353
    self.relocate_from = None
6354
    # computed fields
6355
    self.required_nodes = None
6356
    # init result fields
6357
    self.success = self.info = self.nodes = None
6358
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6359
      keyset = self._ALLO_KEYS
6360
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6361
      keyset = self._RELO_KEYS
6362
    else:
6363
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6364
                                   " IAllocator" % self.mode)
6365
    for key in kwargs:
6366
      if key not in keyset:
6367
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
6368
                                     " IAllocator" % key)
6369
      setattr(self, key, kwargs[key])
6370
    for key in keyset:
6371
      if key not in kwargs:
6372
        raise errors.ProgrammerError("Missing input parameter '%s' to"
6373
                                     " IAllocator" % key)
6374
    self._BuildInputData()
6375

    
6376
  def _ComputeClusterData(self):
6377
    """Compute the generic allocator input data.
6378

6379
    This is the data that is independent of the actual operation.
6380

6381
    """
6382
    cfg = self.lu.cfg
6383
    cluster_info = cfg.GetClusterInfo()
6384
    # cluster data
6385
    data = {
6386
      "version": 1,
6387
      "cluster_name": cfg.GetClusterName(),
6388
      "cluster_tags": list(cluster_info.GetTags()),
6389
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6390
      # we don't have job IDs
6391
      }
6392
    iinfo = cfg.GetAllInstancesInfo().values()
6393
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6394

    
6395
    # node data
6396
    node_results = {}
6397
    node_list = cfg.GetNodeList()
6398

    
6399
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6400
      hypervisor_name = self.hypervisor
6401
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6402
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6403

    
6404
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6405
                                           hypervisor_name)
6406
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6407
                       cluster_info.enabled_hypervisors)
6408
    for nname, nresult in node_data.items():
6409
      # first fill in static (config-based) values
6410
      ninfo = cfg.GetNodeInfo(nname)
6411
      pnr = {
6412
        "tags": list(ninfo.GetTags()),
6413
        "primary_ip": ninfo.primary_ip,
6414
        "secondary_ip": ninfo.secondary_ip,
6415
        "offline": ninfo.offline,
6416
        "master_candidate": ninfo.master_candidate,
6417
        }
6418

    
6419
      if not ninfo.offline:
6420
        nresult.Raise()
6421
        if not isinstance(nresult.data, dict):
6422
          raise errors.OpExecError("Can't get data for node %s" % nname)
6423
        remote_info = nresult.data
6424
        for attr in ['memory_total', 'memory_free', 'memory_dom0',
6425
                     'vg_size', 'vg_free', 'cpu_total']:
6426
          if attr not in remote_info:
6427
            raise errors.OpExecError("Node '%s' didn't return attribute"
6428
                                     " '%s'" % (nname, attr))
6429
          try:
6430
            remote_info[attr] = int(remote_info[attr])
6431
          except ValueError, err:
6432
            raise errors.OpExecError("Node '%s' returned invalid value"
6433
                                     " for '%s': %s" % (nname, attr, err))
6434
        # compute memory used by primary instances
6435
        i_p_mem = i_p_up_mem = 0
6436
        for iinfo, beinfo in i_list:
6437
          if iinfo.primary_node == nname:
6438
            i_p_mem += beinfo[constants.BE_MEMORY]
6439
            if iinfo.name not in node_iinfo[nname].data:
6440
              i_used_mem = 0
6441
            else:
6442
              i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6443
            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6444
            remote_info['memory_free'] -= max(0, i_mem_diff)
6445

    
6446
            if iinfo.admin_up:
6447
              i_p_up_mem += beinfo[constants.BE_MEMORY]
6448

    
6449
        # compute memory used by instances
6450
        pnr_dyn = {
6451
          "total_memory": remote_info['memory_total'],
6452
          "reserved_memory": remote_info['memory_dom0'],
6453
          "free_memory": remote_info['memory_free'],
6454
          "total_disk": remote_info['vg_size'],
6455
          "free_disk": remote_info['vg_free'],
6456
          "total_cpus": remote_info['cpu_total'],
6457
          "i_pri_memory": i_p_mem,
6458
          "i_pri_up_memory": i_p_up_mem,
6459
          }
6460
        pnr.update(pnr_dyn)
6461

    
6462
      node_results[nname] = pnr
6463
    data["nodes"] = node_results
6464

    
6465
    # instance data
6466
    instance_data = {}
6467
    for iinfo, beinfo in i_list:
6468
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6469
                  for n in iinfo.nics]
6470
      pir = {
6471
        "tags": list(iinfo.GetTags()),
6472
        "admin_up": iinfo.admin_up,
6473
        "vcpus": beinfo[constants.BE_VCPUS],
6474
        "memory": beinfo[constants.BE_MEMORY],
6475
        "os": iinfo.os,
6476
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6477
        "nics": nic_data,
6478
        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6479
        "disk_template": iinfo.disk_template,
6480
        "hypervisor": iinfo.hypervisor,
6481
        }
6482
      instance_data[iinfo.name] = pir
6483

    
6484
    data["instances"] = instance_data
6485

    
6486
    self.in_data = data
6487

    
6488
  def _AddNewInstance(self):
6489
    """Add new instance data to allocator structure.
6490

6491
    This in combination with _AllocatorGetClusterData will create the
6492
    correct structure needed as input for the allocator.
6493

6494
    The checks for the completeness of the opcode must have already been
6495
    done.
6496

6497
    """
6498
    data = self.in_data
6499

    
6500
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6501

    
6502
    if self.disk_template in constants.DTS_NET_MIRROR:
6503
      self.required_nodes = 2
6504
    else:
6505
      self.required_nodes = 1
6506
    request = {
6507
      "type": "allocate",
6508
      "name": self.name,
6509
      "disk_template": self.disk_template,
6510
      "tags": self.tags,
6511
      "os": self.os,
6512
      "vcpus": self.vcpus,
6513
      "memory": self.mem_size,
6514
      "disks": self.disks,
6515
      "disk_space_total": disk_space,
6516
      "nics": self.nics,
6517
      "required_nodes": self.required_nodes,
6518
      }
6519
    data["request"] = request
6520

    
6521
  def _AddRelocateInstance(self):
6522
    """Add relocate instance data to allocator structure.
6523

6524
    This in combination with _IAllocatorGetClusterData will create the
6525
    correct structure needed as input for the allocator.
6526

6527
    The checks for the completeness of the opcode must have already been
6528
    done.
6529

6530
    """
6531
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6532
    if instance is None:
6533
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6534
                                   " IAllocator" % self.name)
6535

    
6536
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6537
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6538

    
6539
    if len(instance.secondary_nodes) != 1:
6540
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6541

    
6542
    self.required_nodes = 1
6543
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6544
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6545

    
6546
    request = {
6547
      "type": "relocate",
6548
      "name": self.name,
6549
      "disk_space_total": disk_space,
6550
      "required_nodes": self.required_nodes,
6551
      "relocate_from": self.relocate_from,
6552
      }
6553
    self.in_data["request"] = request
6554

    
6555
  def _BuildInputData(self):
6556
    """Build input data structures.
6557

6558
    """
6559
    self._ComputeClusterData()
6560

    
6561
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6562
      self._AddNewInstance()
6563
    else:
6564
      self._AddRelocateInstance()
6565

    
6566
    self.in_text = serializer.Dump(self.in_data)
6567

    
6568
  def Run(self, name, validate=True, call_fn=None):
6569
    """Run an instance allocator and return the results.
6570

6571
    """
6572
    if call_fn is None:
6573
      call_fn = self.lu.rpc.call_iallocator_runner
6574
    data = self.in_text
6575

    
6576
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6577
    result.Raise()
6578

    
6579
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6580
      raise errors.OpExecError("Invalid result from master iallocator runner")
6581

    
6582
    rcode, stdout, stderr, fail = result.data
6583

    
6584
    if rcode == constants.IARUN_NOTFOUND:
6585
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6586
    elif rcode == constants.IARUN_FAILURE:
6587
      raise errors.OpExecError("Instance allocator call failed: %s,"
6588
                               " output: %s" % (fail, stdout+stderr))
6589
    self.out_text = stdout
6590
    if validate:
6591
      self._ValidateResult()
6592

    
6593
  def _ValidateResult(self):
6594
    """Process the allocator results.
6595

6596
    This will process and if successful save the result in
6597
    self.out_data and the other parameters.
6598

6599
    """
6600
    try:
6601
      rdict = serializer.Load(self.out_text)
6602
    except Exception, err:
6603
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6604

    
6605
    if not isinstance(rdict, dict):
6606
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6607

    
6608
    for key in "success", "info", "nodes":
6609
      if key not in rdict:
6610
        raise errors.OpExecError("Can't parse iallocator results:"
6611
                                 " missing key '%s'" % key)
6612
      setattr(self, key, rdict[key])
6613

    
6614
    if not isinstance(rdict["nodes"], list):
6615
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6616
                               " is not a list")
6617
    self.out_data = rdict
6618

    
6619

    
6620
class LUTestAllocator(NoHooksLU):
6621
  """Run allocator tests.
6622

6623
  This LU runs the allocator tests
6624

6625
  """
6626
  _OP_REQP = ["direction", "mode", "name"]
6627

    
6628
  def CheckPrereq(self):
6629
    """Check prerequisites.
6630

6631
    This checks the opcode parameters depending on the director and mode test.
6632

6633
    """
6634
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6635
      for attr in ["name", "mem_size", "disks", "disk_template",
6636
                   "os", "tags", "nics", "vcpus"]:
6637
        if not hasattr(self.op, attr):
6638
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6639
                                     attr)
6640
      iname = self.cfg.ExpandInstanceName(self.op.name)
6641
      if iname is not None:
6642
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6643
                                   iname)
6644
      if not isinstance(self.op.nics, list):
6645
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6646
      for row in self.op.nics:
6647
        if (not isinstance(row, dict) or
6648
            "mac" not in row or
6649
            "ip" not in row or
6650
            "bridge" not in row):
6651
          raise errors.OpPrereqError("Invalid contents of the"
6652
                                     " 'nics' parameter")
6653
      if not isinstance(self.op.disks, list):
6654
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6655
      for row in self.op.disks:
6656
        if (not isinstance(row, dict) or
6657
            "size" not in row or
6658
            not isinstance(row["size"], int) or
6659
            "mode" not in row or
6660
            row["mode"] not in ['r', 'w']):
6661
          raise errors.OpPrereqError("Invalid contents of the"
6662
                                     " 'disks' parameter")
6663
      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6664
        self.op.hypervisor = self.cfg.GetHypervisorType()
6665
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6666
      if not hasattr(self.op, "name"):
6667
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6668
      fname = self.cfg.ExpandInstanceName(self.op.name)
6669
      if fname is None:
6670
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6671
                                   self.op.name)
6672
      self.op.name = fname
6673
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6674
    else:
6675
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6676
                                 self.op.mode)
6677

    
6678
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6679
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6680
        raise errors.OpPrereqError("Missing allocator name")
6681
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6682
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6683
                                 self.op.direction)
6684

    
6685
  def Exec(self, feedback_fn):
6686
    """Run the allocator test.
6687

6688
    """
6689
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6690
      ial = IAllocator(self,
6691
                       mode=self.op.mode,
6692
                       name=self.op.name,
6693
                       mem_size=self.op.mem_size,
6694
                       disks=self.op.disks,
6695
                       disk_template=self.op.disk_template,
6696
                       os=self.op.os,
6697
                       tags=self.op.tags,
6698
                       nics=self.op.nics,
6699
                       vcpus=self.op.vcpus,
6700
                       hypervisor=self.op.hypervisor,
6701
                       )
6702
    else:
6703
      ial = IAllocator(self,
6704
                       mode=self.op.mode,
6705
                       name=self.op.name,
6706
                       relocate_from=list(self.relocate_from),
6707
                       )
6708

    
6709
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6710
      result = ial.in_text
6711
    else:
6712
      ial.Run(self.op.allocator, validate=False)
6713
      result = ial.out_text
6714
    return result