Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 6906a9d8

History | View | Annotate | Download (231.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = lu.cfg.GetInstanceList()
396
  return utils.NiceSort(wanted)
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _CheckBooleanOpField(op, name):
419
  """Validates boolean opcode parameters.
420

421
  This will ensure that an opcode parameter is either a boolean value,
422
  or None (but that it always exists).
423

424
  """
425
  val = getattr(op, name, None)
426
  if not (val is None or isinstance(val, bool)):
427
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428
                               (name, str(val)))
429
  setattr(op, name, val)
430

    
431

    
432
def _CheckNodeOnline(lu, node):
433
  """Ensure that a given node is online.
434

435
  @param lu: the LU on behalf of which we make the check
436
  @param node: the node to check
437
  @raise errors.OpPrereqError: if the nodes is offline
438

439
  """
440
  if lu.cfg.GetNodeInfo(node).offline:
441
    raise errors.OpPrereqError("Can't use offline node %s" % node)
442

    
443

    
444
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445
                          memory, vcpus, nics):
446
  """Builds instance related env variables for hooks
447

448
  This builds the hook environment from individual variables.
449

450
  @type name: string
451
  @param name: the name of the instance
452
  @type primary_node: string
453
  @param primary_node: the name of the instance's primary node
454
  @type secondary_nodes: list
455
  @param secondary_nodes: list of secondary nodes as strings
456
  @type os_type: string
457
  @param os_type: the name of the instance's OS
458
  @type status: string
459
  @param status: the desired status of the instances
460
  @type memory: string
461
  @param memory: the memory size of the instance
462
  @type vcpus: string
463
  @param vcpus: the count of VCPUs the instance has
464
  @type nics: list
465
  @param nics: list of tuples (ip, bridge, mac) representing
466
      the NICs the instance  has
467
  @rtype: dict
468
  @return: the hook environment for this instance
469

470
  """
471
  env = {
472
    "OP_TARGET": name,
473
    "INSTANCE_NAME": name,
474
    "INSTANCE_PRIMARY": primary_node,
475
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
476
    "INSTANCE_OS_TYPE": os_type,
477
    "INSTANCE_STATUS": status,
478
    "INSTANCE_MEMORY": memory,
479
    "INSTANCE_VCPUS": vcpus,
480
  }
481

    
482
  if nics:
483
    nic_count = len(nics)
484
    for idx, (ip, bridge, mac) in enumerate(nics):
485
      if ip is None:
486
        ip = ""
487
      env["INSTANCE_NIC%d_IP" % idx] = ip
488
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
489
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
490
  else:
491
    nic_count = 0
492

    
493
  env["INSTANCE_NIC_COUNT"] = nic_count
494

    
495
  return env
496

    
497

    
498
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
499
  """Builds instance related env variables for hooks from an object.
500

501
  @type lu: L{LogicalUnit}
502
  @param lu: the logical unit on whose behalf we execute
503
  @type instance: L{objects.Instance}
504
  @param instance: the instance for which we should build the
505
      environment
506
  @type override: dict
507
  @param override: dictionary with key/values that will override
508
      our values
509
  @rtype: dict
510
  @return: the hook environment dictionary
511

512
  """
513
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
514
  args = {
515
    'name': instance.name,
516
    'primary_node': instance.primary_node,
517
    'secondary_nodes': instance.secondary_nodes,
518
    'os_type': instance.os,
519
    'status': instance.os,
520
    'memory': bep[constants.BE_MEMORY],
521
    'vcpus': bep[constants.BE_VCPUS],
522
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
523
  }
524
  if override:
525
    args.update(override)
526
  return _BuildInstanceHookEnv(**args)
527

    
528

    
529
def _AdjustCandidatePool(lu):
530
  """Adjust the candidate pool after node operations.
531

532
  """
533
  mod_list = lu.cfg.MaintainCandidatePool()
534
  if mod_list:
535
    lu.LogInfo("Promoted nodes to master candidate role: %s",
536
               ", ".join(node.name for node in mod_list))
537
    for name in mod_list:
538
      lu.context.ReaddNode(name)
539
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
540
  if mc_now > mc_max:
541
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
542
               (mc_now, mc_max))
543

    
544

    
545
def _CheckInstanceBridgesExist(lu, instance):
546
  """Check that the brigdes needed by an instance exist.
547

548
  """
549
  # check bridges existance
550
  brlist = [nic.bridge for nic in instance.nics]
551
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
552
  result.Raise()
553
  if not result.data:
554
    raise errors.OpPrereqError("One or more target bridges %s does not"
555
                               " exist on destination node '%s'" %
556
                               (brlist, instance.primary_node))
557

    
558

    
559
class LUDestroyCluster(NoHooksLU):
560
  """Logical unit for destroying the cluster.
561

562
  """
563
  _OP_REQP = []
564

    
565
  def CheckPrereq(self):
566
    """Check prerequisites.
567

568
    This checks whether the cluster is empty.
569

570
    Any errors are signalled by raising errors.OpPrereqError.
571

572
    """
573
    master = self.cfg.GetMasterNode()
574

    
575
    nodelist = self.cfg.GetNodeList()
576
    if len(nodelist) != 1 or nodelist[0] != master:
577
      raise errors.OpPrereqError("There are still %d node(s) in"
578
                                 " this cluster." % (len(nodelist) - 1))
579
    instancelist = self.cfg.GetInstanceList()
580
    if instancelist:
581
      raise errors.OpPrereqError("There are still %d instance(s) in"
582
                                 " this cluster." % len(instancelist))
583

    
584
  def Exec(self, feedback_fn):
585
    """Destroys the cluster.
586

587
    """
588
    master = self.cfg.GetMasterNode()
589
    result = self.rpc.call_node_stop_master(master, False)
590
    result.Raise()
591
    if not result.data:
592
      raise errors.OpExecError("Could not disable the master role")
593
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594
    utils.CreateBackup(priv_key)
595
    utils.CreateBackup(pub_key)
596
    return master
597

    
598

    
599
class LUVerifyCluster(LogicalUnit):
600
  """Verifies the cluster status.
601

602
  """
603
  HPATH = "cluster-verify"
604
  HTYPE = constants.HTYPE_CLUSTER
605
  _OP_REQP = ["skip_checks"]
606
  REQ_BGL = False
607

    
608
  def ExpandNames(self):
609
    self.needed_locks = {
610
      locking.LEVEL_NODE: locking.ALL_SET,
611
      locking.LEVEL_INSTANCE: locking.ALL_SET,
612
    }
613
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
614

    
615
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
616
                  node_result, feedback_fn, master_files,
617
                  drbd_map):
618
    """Run multiple tests against a node.
619

620
    Test list:
621

622
      - compares ganeti version
623
      - checks vg existance and size > 20G
624
      - checks config file checksum
625
      - checks ssh to other nodes
626

627
    @type nodeinfo: L{objects.Node}
628
    @param nodeinfo: the node to check
629
    @param file_list: required list of files
630
    @param local_cksum: dictionary of local files and their checksums
631
    @param node_result: the results from the node
632
    @param feedback_fn: function used to accumulate results
633
    @param master_files: list of files that only masters should have
634
    @param drbd_map: the useddrbd minors for this node, in
635
        form of minor: (instance, must_exist) which correspond to instances
636
        and their running status
637

638
    """
639
    node = nodeinfo.name
640

    
641
    # main result, node_result should be a non-empty dict
642
    if not node_result or not isinstance(node_result, dict):
643
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
644
      return True
645

    
646
    # compares ganeti version
647
    local_version = constants.PROTOCOL_VERSION
648
    remote_version = node_result.get('version', None)
649
    if not remote_version:
650
      feedback_fn("  - ERROR: connection to %s failed" % (node))
651
      return True
652

    
653
    if local_version != remote_version:
654
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
655
                      (local_version, node, remote_version))
656
      return True
657

    
658
    # checks vg existance and size > 20G
659

    
660
    bad = False
661
    vglist = node_result.get(constants.NV_VGLIST, None)
662
    if not vglist:
663
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
664
                      (node,))
665
      bad = True
666
    else:
667
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
668
                                            constants.MIN_VG_SIZE)
669
      if vgstatus:
670
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
671
        bad = True
672

    
673
    # checks config file checksum
674

    
675
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
676
    if not isinstance(remote_cksum, dict):
677
      bad = True
678
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
679
    else:
680
      for file_name in file_list:
681
        node_is_mc = nodeinfo.master_candidate
682
        must_have_file = file_name not in master_files
683
        if file_name not in remote_cksum:
684
          if node_is_mc or must_have_file:
685
            bad = True
686
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
687
        elif remote_cksum[file_name] != local_cksum[file_name]:
688
          if node_is_mc or must_have_file:
689
            bad = True
690
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
691
          else:
692
            # not candidate and this is not a must-have file
693
            bad = True
694
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
695
                        " '%s'" % file_name)
696
        else:
697
          # all good, except non-master/non-must have combination
698
          if not node_is_mc and not must_have_file:
699
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
700
                        " candidates" % file_name)
701

    
702
    # checks ssh to any
703

    
704
    if constants.NV_NODELIST not in node_result:
705
      bad = True
706
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
707
    else:
708
      if node_result[constants.NV_NODELIST]:
709
        bad = True
710
        for node in node_result[constants.NV_NODELIST]:
711
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
712
                          (node, node_result[constants.NV_NODELIST][node]))
713

    
714
    if constants.NV_NODENETTEST not in node_result:
715
      bad = True
716
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
717
    else:
718
      if node_result[constants.NV_NODENETTEST]:
719
        bad = True
720
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
721
        for node in nlist:
722
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
723
                          (node, node_result[constants.NV_NODENETTEST][node]))
724

    
725
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
726
    if isinstance(hyp_result, dict):
727
      for hv_name, hv_result in hyp_result.iteritems():
728
        if hv_result is not None:
729
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
730
                      (hv_name, hv_result))
731

    
732
    # check used drbd list
733
    used_minors = node_result.get(constants.NV_DRBDLIST, [])
734
    for minor, (iname, must_exist) in drbd_map.items():
735
      if minor not in used_minors and must_exist:
736
        feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
737
                    (minor, iname))
738
        bad = True
739
    for minor in used_minors:
740
      if minor not in drbd_map:
741
        feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
742
        bad = True
743

    
744
    return bad
745

    
746
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
747
                      node_instance, feedback_fn, n_offline):
748
    """Verify an instance.
749

750
    This function checks to see if the required block devices are
751
    available on the instance's node.
752

753
    """
754
    bad = False
755

    
756
    node_current = instanceconfig.primary_node
757

    
758
    node_vol_should = {}
759
    instanceconfig.MapLVsByNode(node_vol_should)
760

    
761
    for node in node_vol_should:
762
      if node in n_offline:
763
        # ignore missing volumes on offline nodes
764
        continue
765
      for volume in node_vol_should[node]:
766
        if node not in node_vol_is or volume not in node_vol_is[node]:
767
          feedback_fn("  - ERROR: volume %s missing on node %s" %
768
                          (volume, node))
769
          bad = True
770

    
771
    if not instanceconfig.status == 'down':
772
      if ((node_current not in node_instance or
773
          not instance in node_instance[node_current]) and
774
          node_current not in n_offline):
775
        feedback_fn("  - ERROR: instance %s not running on node %s" %
776
                        (instance, node_current))
777
        bad = True
778

    
779
    for node in node_instance:
780
      if (not node == node_current):
781
        if instance in node_instance[node]:
782
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
783
                          (instance, node))
784
          bad = True
785

    
786
    return bad
787

    
788
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
789
    """Verify if there are any unknown volumes in the cluster.
790

791
    The .os, .swap and backup volumes are ignored. All other volumes are
792
    reported as unknown.
793

794
    """
795
    bad = False
796

    
797
    for node in node_vol_is:
798
      for volume in node_vol_is[node]:
799
        if node not in node_vol_should or volume not in node_vol_should[node]:
800
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
801
                      (volume, node))
802
          bad = True
803
    return bad
804

    
805
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
806
    """Verify the list of running instances.
807

808
    This checks what instances are running but unknown to the cluster.
809

810
    """
811
    bad = False
812
    for node in node_instance:
813
      for runninginstance in node_instance[node]:
814
        if runninginstance not in instancelist:
815
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
816
                          (runninginstance, node))
817
          bad = True
818
    return bad
819

    
820
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
821
    """Verify N+1 Memory Resilience.
822

823
    Check that if one single node dies we can still start all the instances it
824
    was primary for.
825

826
    """
827
    bad = False
828

    
829
    for node, nodeinfo in node_info.iteritems():
830
      # This code checks that every node which is now listed as secondary has
831
      # enough memory to host all instances it is supposed to should a single
832
      # other node in the cluster fail.
833
      # FIXME: not ready for failover to an arbitrary node
834
      # FIXME: does not support file-backed instances
835
      # WARNING: we currently take into account down instances as well as up
836
      # ones, considering that even if they're down someone might want to start
837
      # them even in the event of a node failure.
838
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
839
        needed_mem = 0
840
        for instance in instances:
841
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
842
          if bep[constants.BE_AUTO_BALANCE]:
843
            needed_mem += bep[constants.BE_MEMORY]
844
        if nodeinfo['mfree'] < needed_mem:
845
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
846
                      " failovers should node %s fail" % (node, prinode))
847
          bad = True
848
    return bad
849

    
850
  def CheckPrereq(self):
851
    """Check prerequisites.
852

853
    Transform the list of checks we're going to skip into a set and check that
854
    all its members are valid.
855

856
    """
857
    self.skip_set = frozenset(self.op.skip_checks)
858
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
859
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
860

    
861
  def BuildHooksEnv(self):
862
    """Build hooks env.
863

864
    Cluster-Verify hooks just rone in the post phase and their failure makes
865
    the output be logged in the verify output and the verification to fail.
866

867
    """
868
    all_nodes = self.cfg.GetNodeList()
869
    # TODO: populate the environment with useful information for verify hooks
870
    env = {}
871
    return env, [], all_nodes
872

    
873
  def Exec(self, feedback_fn):
874
    """Verify integrity of cluster, performing various test on nodes.
875

876
    """
877
    bad = False
878
    feedback_fn("* Verifying global settings")
879
    for msg in self.cfg.VerifyConfig():
880
      feedback_fn("  - ERROR: %s" % msg)
881

    
882
    vg_name = self.cfg.GetVGName()
883
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
884
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
885
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
886
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
887
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
888
                        for iname in instancelist)
889
    i_non_redundant = [] # Non redundant instances
890
    i_non_a_balanced = [] # Non auto-balanced instances
891
    n_offline = [] # List of offline nodes
892
    node_volume = {}
893
    node_instance = {}
894
    node_info = {}
895
    instance_cfg = {}
896

    
897
    # FIXME: verify OS list
898
    # do local checksums
899
    master_files = [constants.CLUSTER_CONF_FILE]
900

    
901
    file_names = ssconf.SimpleStore().GetFileList()
902
    file_names.append(constants.SSL_CERT_FILE)
903
    file_names.append(constants.RAPI_CERT_FILE)
904
    file_names.extend(master_files)
905

    
906
    local_checksums = utils.FingerprintFiles(file_names)
907

    
908
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
909
    node_verify_param = {
910
      constants.NV_FILELIST: file_names,
911
      constants.NV_NODELIST: [node.name for node in nodeinfo
912
                              if not node.offline],
913
      constants.NV_HYPERVISOR: hypervisors,
914
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
915
                                  node.secondary_ip) for node in nodeinfo
916
                                 if not node.offline],
917
      constants.NV_LVLIST: vg_name,
918
      constants.NV_INSTANCELIST: hypervisors,
919
      constants.NV_VGLIST: None,
920
      constants.NV_VERSION: None,
921
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
922
      constants.NV_DRBDLIST: None,
923
      }
924
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
925
                                           self.cfg.GetClusterName())
926

    
927
    cluster = self.cfg.GetClusterInfo()
928
    master_node = self.cfg.GetMasterNode()
929
    all_drbd_map = self.cfg.ComputeDRBDMap()
930

    
931
    for node_i in nodeinfo:
932
      node = node_i.name
933
      nresult = all_nvinfo[node].data
934

    
935
      if node_i.offline:
936
        feedback_fn("* Skipping offline node %s" % (node,))
937
        n_offline.append(node)
938
        continue
939

    
940
      if node == master_node:
941
        ntype = "master"
942
      elif node_i.master_candidate:
943
        ntype = "master candidate"
944
      else:
945
        ntype = "regular"
946
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
947

    
948
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
949
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
950
        bad = True
951
        continue
952

    
953
      node_drbd = {}
954
      for minor, instance in all_drbd_map[node].items():
955
        instance = instanceinfo[instance]
956
        node_drbd[minor] = (instance.name, instance.status == "up")
957
      result = self._VerifyNode(node_i, file_names, local_checksums,
958
                                nresult, feedback_fn, master_files,
959
                                node_drbd)
960
      bad = bad or result
961

    
962
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
963
      if isinstance(lvdata, basestring):
964
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
965
                    (node, lvdata.encode('string_escape')))
966
        bad = True
967
        node_volume[node] = {}
968
      elif not isinstance(lvdata, dict):
969
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
970
        bad = True
971
        continue
972
      else:
973
        node_volume[node] = lvdata
974

    
975
      # node_instance
976
      idata = nresult.get(constants.NV_INSTANCELIST, None)
977
      if not isinstance(idata, list):
978
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
979
                    (node,))
980
        bad = True
981
        continue
982

    
983
      node_instance[node] = idata
984

    
985
      # node_info
986
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
987
      if not isinstance(nodeinfo, dict):
988
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
989
        bad = True
990
        continue
991

    
992
      try:
993
        node_info[node] = {
994
          "mfree": int(nodeinfo['memory_free']),
995
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
996
          "pinst": [],
997
          "sinst": [],
998
          # dictionary holding all instances this node is secondary for,
999
          # grouped by their primary node. Each key is a cluster node, and each
1000
          # value is a list of instances which have the key as primary and the
1001
          # current node as secondary.  this is handy to calculate N+1 memory
1002
          # availability if you can only failover from a primary to its
1003
          # secondary.
1004
          "sinst-by-pnode": {},
1005
        }
1006
      except ValueError:
1007
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1008
        bad = True
1009
        continue
1010

    
1011
    node_vol_should = {}
1012

    
1013
    for instance in instancelist:
1014
      feedback_fn("* Verifying instance %s" % instance)
1015
      inst_config = instanceinfo[instance]
1016
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1017
                                     node_instance, feedback_fn, n_offline)
1018
      bad = bad or result
1019
      inst_nodes_offline = []
1020

    
1021
      inst_config.MapLVsByNode(node_vol_should)
1022

    
1023
      instance_cfg[instance] = inst_config
1024

    
1025
      pnode = inst_config.primary_node
1026
      if pnode in node_info:
1027
        node_info[pnode]['pinst'].append(instance)
1028
      elif pnode not in n_offline:
1029
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1030
                    " %s failed" % (instance, pnode))
1031
        bad = True
1032

    
1033
      if pnode in n_offline:
1034
        inst_nodes_offline.append(pnode)
1035

    
1036
      # If the instance is non-redundant we cannot survive losing its primary
1037
      # node, so we are not N+1 compliant. On the other hand we have no disk
1038
      # templates with more than one secondary so that situation is not well
1039
      # supported either.
1040
      # FIXME: does not support file-backed instances
1041
      if len(inst_config.secondary_nodes) == 0:
1042
        i_non_redundant.append(instance)
1043
      elif len(inst_config.secondary_nodes) > 1:
1044
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1045
                    % instance)
1046

    
1047
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1048
        i_non_a_balanced.append(instance)
1049

    
1050
      for snode in inst_config.secondary_nodes:
1051
        if snode in node_info:
1052
          node_info[snode]['sinst'].append(instance)
1053
          if pnode not in node_info[snode]['sinst-by-pnode']:
1054
            node_info[snode]['sinst-by-pnode'][pnode] = []
1055
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1056
        elif snode not in n_offline:
1057
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1058
                      " %s failed" % (instance, snode))
1059
          bad = True
1060
        if snode in n_offline:
1061
          inst_nodes_offline.append(snode)
1062

    
1063
      if inst_nodes_offline:
1064
        # warn that the instance lives on offline nodes, and set bad=True
1065
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1066
                    ", ".join(inst_nodes_offline))
1067
        bad = True
1068

    
1069
    feedback_fn("* Verifying orphan volumes")
1070
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1071
                                       feedback_fn)
1072
    bad = bad or result
1073

    
1074
    feedback_fn("* Verifying remaining instances")
1075
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1076
                                         feedback_fn)
1077
    bad = bad or result
1078

    
1079
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1080
      feedback_fn("* Verifying N+1 Memory redundancy")
1081
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1082
      bad = bad or result
1083

    
1084
    feedback_fn("* Other Notes")
1085
    if i_non_redundant:
1086
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1087
                  % len(i_non_redundant))
1088

    
1089
    if i_non_a_balanced:
1090
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1091
                  % len(i_non_a_balanced))
1092

    
1093
    if n_offline:
1094
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1095

    
1096
    return not bad
1097

    
1098
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1099
    """Analize the post-hooks' result
1100

1101
    This method analyses the hook result, handles it, and sends some
1102
    nicely-formatted feedback back to the user.
1103

1104
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1105
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1106
    @param hooks_results: the results of the multi-node hooks rpc call
1107
    @param feedback_fn: function used send feedback back to the caller
1108
    @param lu_result: previous Exec result
1109
    @return: the new Exec result, based on the previous result
1110
        and hook results
1111

1112
    """
1113
    # We only really run POST phase hooks, and are only interested in
1114
    # their results
1115
    if phase == constants.HOOKS_PHASE_POST:
1116
      # Used to change hooks' output to proper indentation
1117
      indent_re = re.compile('^', re.M)
1118
      feedback_fn("* Hooks Results")
1119
      if not hooks_results:
1120
        feedback_fn("  - ERROR: general communication failure")
1121
        lu_result = 1
1122
      else:
1123
        for node_name in hooks_results:
1124
          show_node_header = True
1125
          res = hooks_results[node_name]
1126
          if res.failed or res.data is False or not isinstance(res.data, list):
1127
            if res.offline:
1128
              # no need to warn or set fail return value
1129
              continue
1130
            feedback_fn("    Communication failure in hooks execution")
1131
            lu_result = 1
1132
            continue
1133
          for script, hkr, output in res.data:
1134
            if hkr == constants.HKR_FAIL:
1135
              # The node header is only shown once, if there are
1136
              # failing hooks on that node
1137
              if show_node_header:
1138
                feedback_fn("  Node %s:" % node_name)
1139
                show_node_header = False
1140
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1141
              output = indent_re.sub('      ', output)
1142
              feedback_fn("%s" % output)
1143
              lu_result = 1
1144

    
1145
      return lu_result
1146

    
1147

    
1148
class LUVerifyDisks(NoHooksLU):
1149
  """Verifies the cluster disks status.
1150

1151
  """
1152
  _OP_REQP = []
1153
  REQ_BGL = False
1154

    
1155
  def ExpandNames(self):
1156
    self.needed_locks = {
1157
      locking.LEVEL_NODE: locking.ALL_SET,
1158
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1159
    }
1160
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1161

    
1162
  def CheckPrereq(self):
1163
    """Check prerequisites.
1164

1165
    This has no prerequisites.
1166

1167
    """
1168
    pass
1169

    
1170
  def Exec(self, feedback_fn):
1171
    """Verify integrity of cluster disks.
1172

1173
    """
1174
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1175

    
1176
    vg_name = self.cfg.GetVGName()
1177
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1178
    instances = [self.cfg.GetInstanceInfo(name)
1179
                 for name in self.cfg.GetInstanceList()]
1180

    
1181
    nv_dict = {}
1182
    for inst in instances:
1183
      inst_lvs = {}
1184
      if (inst.status != "up" or
1185
          inst.disk_template not in constants.DTS_NET_MIRROR):
1186
        continue
1187
      inst.MapLVsByNode(inst_lvs)
1188
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1189
      for node, vol_list in inst_lvs.iteritems():
1190
        for vol in vol_list:
1191
          nv_dict[(node, vol)] = inst
1192

    
1193
    if not nv_dict:
1194
      return result
1195

    
1196
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1197

    
1198
    to_act = set()
1199
    for node in nodes:
1200
      # node_volume
1201
      lvs = node_lvs[node]
1202
      if lvs.failed:
1203
        if not lvs.offline:
1204
          self.LogWarning("Connection to node %s failed: %s" %
1205
                          (node, lvs.data))
1206
        continue
1207
      lvs = lvs.data
1208
      if isinstance(lvs, basestring):
1209
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1210
        res_nlvm[node] = lvs
1211
      elif not isinstance(lvs, dict):
1212
        logging.warning("Connection to node %s failed or invalid data"
1213
                        " returned", node)
1214
        res_nodes.append(node)
1215
        continue
1216

    
1217
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1218
        inst = nv_dict.pop((node, lv_name), None)
1219
        if (not lv_online and inst is not None
1220
            and inst.name not in res_instances):
1221
          res_instances.append(inst.name)
1222

    
1223
    # any leftover items in nv_dict are missing LVs, let's arrange the
1224
    # data better
1225
    for key, inst in nv_dict.iteritems():
1226
      if inst.name not in res_missing:
1227
        res_missing[inst.name] = []
1228
      res_missing[inst.name].append(key)
1229

    
1230
    return result
1231

    
1232

    
1233
class LURenameCluster(LogicalUnit):
1234
  """Rename the cluster.
1235

1236
  """
1237
  HPATH = "cluster-rename"
1238
  HTYPE = constants.HTYPE_CLUSTER
1239
  _OP_REQP = ["name"]
1240

    
1241
  def BuildHooksEnv(self):
1242
    """Build hooks env.
1243

1244
    """
1245
    env = {
1246
      "OP_TARGET": self.cfg.GetClusterName(),
1247
      "NEW_NAME": self.op.name,
1248
      }
1249
    mn = self.cfg.GetMasterNode()
1250
    return env, [mn], [mn]
1251

    
1252
  def CheckPrereq(self):
1253
    """Verify that the passed name is a valid one.
1254

1255
    """
1256
    hostname = utils.HostInfo(self.op.name)
1257

    
1258
    new_name = hostname.name
1259
    self.ip = new_ip = hostname.ip
1260
    old_name = self.cfg.GetClusterName()
1261
    old_ip = self.cfg.GetMasterIP()
1262
    if new_name == old_name and new_ip == old_ip:
1263
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1264
                                 " cluster has changed")
1265
    if new_ip != old_ip:
1266
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1267
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1268
                                   " reachable on the network. Aborting." %
1269
                                   new_ip)
1270

    
1271
    self.op.name = new_name
1272

    
1273
  def Exec(self, feedback_fn):
1274
    """Rename the cluster.
1275

1276
    """
1277
    clustername = self.op.name
1278
    ip = self.ip
1279

    
1280
    # shutdown the master IP
1281
    master = self.cfg.GetMasterNode()
1282
    result = self.rpc.call_node_stop_master(master, False)
1283
    if result.failed or not result.data:
1284
      raise errors.OpExecError("Could not disable the master role")
1285

    
1286
    try:
1287
      cluster = self.cfg.GetClusterInfo()
1288
      cluster.cluster_name = clustername
1289
      cluster.master_ip = ip
1290
      self.cfg.Update(cluster)
1291

    
1292
      # update the known hosts file
1293
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1294
      node_list = self.cfg.GetNodeList()
1295
      try:
1296
        node_list.remove(master)
1297
      except ValueError:
1298
        pass
1299
      result = self.rpc.call_upload_file(node_list,
1300
                                         constants.SSH_KNOWN_HOSTS_FILE)
1301
      for to_node, to_result in result.iteritems():
1302
        if to_result.failed or not to_result.data:
1303
          logging.error("Copy of file %s to node %s failed",
1304
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1305

    
1306
    finally:
1307
      result = self.rpc.call_node_start_master(master, False)
1308
      if result.failed or not result.data:
1309
        self.LogWarning("Could not re-enable the master role on"
1310
                        " the master, please restart manually.")
1311

    
1312

    
1313
def _RecursiveCheckIfLVMBased(disk):
1314
  """Check if the given disk or its children are lvm-based.
1315

1316
  @type disk: L{objects.Disk}
1317
  @param disk: the disk to check
1318
  @rtype: booleean
1319
  @return: boolean indicating whether a LD_LV dev_type was found or not
1320

1321
  """
1322
  if disk.children:
1323
    for chdisk in disk.children:
1324
      if _RecursiveCheckIfLVMBased(chdisk):
1325
        return True
1326
  return disk.dev_type == constants.LD_LV
1327

    
1328

    
1329
class LUSetClusterParams(LogicalUnit):
1330
  """Change the parameters of the cluster.
1331

1332
  """
1333
  HPATH = "cluster-modify"
1334
  HTYPE = constants.HTYPE_CLUSTER
1335
  _OP_REQP = []
1336
  REQ_BGL = False
1337

    
1338
  def CheckParameters(self):
1339
    """Check parameters
1340

1341
    """
1342
    if not hasattr(self.op, "candidate_pool_size"):
1343
      self.op.candidate_pool_size = None
1344
    if self.op.candidate_pool_size is not None:
1345
      try:
1346
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1347
      except ValueError, err:
1348
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1349
                                   str(err))
1350
      if self.op.candidate_pool_size < 1:
1351
        raise errors.OpPrereqError("At least one master candidate needed")
1352

    
1353
  def ExpandNames(self):
1354
    # FIXME: in the future maybe other cluster params won't require checking on
1355
    # all nodes to be modified.
1356
    self.needed_locks = {
1357
      locking.LEVEL_NODE: locking.ALL_SET,
1358
    }
1359
    self.share_locks[locking.LEVEL_NODE] = 1
1360

    
1361
  def BuildHooksEnv(self):
1362
    """Build hooks env.
1363

1364
    """
1365
    env = {
1366
      "OP_TARGET": self.cfg.GetClusterName(),
1367
      "NEW_VG_NAME": self.op.vg_name,
1368
      }
1369
    mn = self.cfg.GetMasterNode()
1370
    return env, [mn], [mn]
1371

    
1372
  def CheckPrereq(self):
1373
    """Check prerequisites.
1374

1375
    This checks whether the given params don't conflict and
1376
    if the given volume group is valid.
1377

1378
    """
1379
    # FIXME: This only works because there is only one parameter that can be
1380
    # changed or removed.
1381
    if self.op.vg_name is not None and not self.op.vg_name:
1382
      instances = self.cfg.GetAllInstancesInfo().values()
1383
      for inst in instances:
1384
        for disk in inst.disks:
1385
          if _RecursiveCheckIfLVMBased(disk):
1386
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1387
                                       " lvm-based instances exist")
1388

    
1389
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1390

    
1391
    # if vg_name not None, checks given volume group on all nodes
1392
    if self.op.vg_name:
1393
      vglist = self.rpc.call_vg_list(node_list)
1394
      for node in node_list:
1395
        if vglist[node].failed:
1396
          # ignoring down node
1397
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1398
          continue
1399
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1400
                                              self.op.vg_name,
1401
                                              constants.MIN_VG_SIZE)
1402
        if vgstatus:
1403
          raise errors.OpPrereqError("Error on node '%s': %s" %
1404
                                     (node, vgstatus))
1405

    
1406
    self.cluster = cluster = self.cfg.GetClusterInfo()
1407
    # validate beparams changes
1408
    if self.op.beparams:
1409
      utils.CheckBEParams(self.op.beparams)
1410
      self.new_beparams = cluster.FillDict(
1411
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1412

    
1413
    # hypervisor list/parameters
1414
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1415
    if self.op.hvparams:
1416
      if not isinstance(self.op.hvparams, dict):
1417
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1418
      for hv_name, hv_dict in self.op.hvparams.items():
1419
        if hv_name not in self.new_hvparams:
1420
          self.new_hvparams[hv_name] = hv_dict
1421
        else:
1422
          self.new_hvparams[hv_name].update(hv_dict)
1423

    
1424
    if self.op.enabled_hypervisors is not None:
1425
      self.hv_list = self.op.enabled_hypervisors
1426
    else:
1427
      self.hv_list = cluster.enabled_hypervisors
1428

    
1429
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1430
      # either the enabled list has changed, or the parameters have, validate
1431
      for hv_name, hv_params in self.new_hvparams.items():
1432
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1433
            (self.op.enabled_hypervisors and
1434
             hv_name in self.op.enabled_hypervisors)):
1435
          # either this is a new hypervisor, or its parameters have changed
1436
          hv_class = hypervisor.GetHypervisor(hv_name)
1437
          hv_class.CheckParameterSyntax(hv_params)
1438
          _CheckHVParams(self, node_list, hv_name, hv_params)
1439

    
1440
  def Exec(self, feedback_fn):
1441
    """Change the parameters of the cluster.
1442

1443
    """
1444
    if self.op.vg_name is not None:
1445
      if self.op.vg_name != self.cfg.GetVGName():
1446
        self.cfg.SetVGName(self.op.vg_name)
1447
      else:
1448
        feedback_fn("Cluster LVM configuration already in desired"
1449
                    " state, not changing")
1450
    if self.op.hvparams:
1451
      self.cluster.hvparams = self.new_hvparams
1452
    if self.op.enabled_hypervisors is not None:
1453
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1454
    if self.op.beparams:
1455
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1456
    if self.op.candidate_pool_size is not None:
1457
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1458

    
1459
    self.cfg.Update(self.cluster)
1460

    
1461
    # we want to update nodes after the cluster so that if any errors
1462
    # happen, we have recorded and saved the cluster info
1463
    if self.op.candidate_pool_size is not None:
1464
      _AdjustCandidatePool(self)
1465

    
1466

    
1467
class LURedistributeConfig(NoHooksLU):
1468
  """Force the redistribution of cluster configuration.
1469

1470
  This is a very simple LU.
1471

1472
  """
1473
  _OP_REQP = []
1474
  REQ_BGL = False
1475

    
1476
  def ExpandNames(self):
1477
    self.needed_locks = {
1478
      locking.LEVEL_NODE: locking.ALL_SET,
1479
    }
1480
    self.share_locks[locking.LEVEL_NODE] = 1
1481

    
1482
  def CheckPrereq(self):
1483
    """Check prerequisites.
1484

1485
    """
1486

    
1487
  def Exec(self, feedback_fn):
1488
    """Redistribute the configuration.
1489

1490
    """
1491
    self.cfg.Update(self.cfg.GetClusterInfo())
1492

    
1493

    
1494
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1495
  """Sleep and poll for an instance's disk to sync.
1496

1497
  """
1498
  if not instance.disks:
1499
    return True
1500

    
1501
  if not oneshot:
1502
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1503

    
1504
  node = instance.primary_node
1505

    
1506
  for dev in instance.disks:
1507
    lu.cfg.SetDiskID(dev, node)
1508

    
1509
  retries = 0
1510
  while True:
1511
    max_time = 0
1512
    done = True
1513
    cumul_degraded = False
1514
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1515
    if rstats.failed or not rstats.data:
1516
      lu.LogWarning("Can't get any data from node %s", node)
1517
      retries += 1
1518
      if retries >= 10:
1519
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1520
                                 " aborting." % node)
1521
      time.sleep(6)
1522
      continue
1523
    rstats = rstats.data
1524
    retries = 0
1525
    for i, mstat in enumerate(rstats):
1526
      if mstat is None:
1527
        lu.LogWarning("Can't compute data for node %s/%s",
1528
                           node, instance.disks[i].iv_name)
1529
        continue
1530
      # we ignore the ldisk parameter
1531
      perc_done, est_time, is_degraded, _ = mstat
1532
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1533
      if perc_done is not None:
1534
        done = False
1535
        if est_time is not None:
1536
          rem_time = "%d estimated seconds remaining" % est_time
1537
          max_time = est_time
1538
        else:
1539
          rem_time = "no time estimate"
1540
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1541
                        (instance.disks[i].iv_name, perc_done, rem_time))
1542
    if done or oneshot:
1543
      break
1544

    
1545
    time.sleep(min(60, max_time))
1546

    
1547
  if done:
1548
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1549
  return not cumul_degraded
1550

    
1551

    
1552
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1553
  """Check that mirrors are not degraded.
1554

1555
  The ldisk parameter, if True, will change the test from the
1556
  is_degraded attribute (which represents overall non-ok status for
1557
  the device(s)) to the ldisk (representing the local storage status).
1558

1559
  """
1560
  lu.cfg.SetDiskID(dev, node)
1561
  if ldisk:
1562
    idx = 6
1563
  else:
1564
    idx = 5
1565

    
1566
  result = True
1567
  if on_primary or dev.AssembleOnSecondary():
1568
    rstats = lu.rpc.call_blockdev_find(node, dev)
1569
    if rstats.failed or not rstats.data:
1570
      logging.warning("Node %s: disk degraded, not found or node down", node)
1571
      result = False
1572
    else:
1573
      result = result and (not rstats.data[idx])
1574
  if dev.children:
1575
    for child in dev.children:
1576
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1577

    
1578
  return result
1579

    
1580

    
1581
class LUDiagnoseOS(NoHooksLU):
1582
  """Logical unit for OS diagnose/query.
1583

1584
  """
1585
  _OP_REQP = ["output_fields", "names"]
1586
  REQ_BGL = False
1587
  _FIELDS_STATIC = utils.FieldSet()
1588
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1589

    
1590
  def ExpandNames(self):
1591
    if self.op.names:
1592
      raise errors.OpPrereqError("Selective OS query not supported")
1593

    
1594
    _CheckOutputFields(static=self._FIELDS_STATIC,
1595
                       dynamic=self._FIELDS_DYNAMIC,
1596
                       selected=self.op.output_fields)
1597

    
1598
    # Lock all nodes, in shared mode
1599
    self.needed_locks = {}
1600
    self.share_locks[locking.LEVEL_NODE] = 1
1601
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1602

    
1603
  def CheckPrereq(self):
1604
    """Check prerequisites.
1605

1606
    """
1607

    
1608
  @staticmethod
1609
  def _DiagnoseByOS(node_list, rlist):
1610
    """Remaps a per-node return list into an a per-os per-node dictionary
1611

1612
    @param node_list: a list with the names of all nodes
1613
    @param rlist: a map with node names as keys and OS objects as values
1614

1615
    @rtype: dict
1616
    @returns: a dictionary with osnames as keys and as value another map, with
1617
        nodes as keys and list of OS objects as values, eg::
1618

1619
          {"debian-etch": {"node1": [<object>,...],
1620
                           "node2": [<object>,]}
1621
          }
1622

1623
    """
1624
    all_os = {}
1625
    for node_name, nr in rlist.iteritems():
1626
      if nr.failed or not nr.data:
1627
        continue
1628
      for os_obj in nr.data:
1629
        if os_obj.name not in all_os:
1630
          # build a list of nodes for this os containing empty lists
1631
          # for each node in node_list
1632
          all_os[os_obj.name] = {}
1633
          for nname in node_list:
1634
            all_os[os_obj.name][nname] = []
1635
        all_os[os_obj.name][node_name].append(os_obj)
1636
    return all_os
1637

    
1638
  def Exec(self, feedback_fn):
1639
    """Compute the list of OSes.
1640

1641
    """
1642
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1643
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1644
                   if node in node_list]
1645
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1646
    if node_data == False:
1647
      raise errors.OpExecError("Can't gather the list of OSes")
1648
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1649
    output = []
1650
    for os_name, os_data in pol.iteritems():
1651
      row = []
1652
      for field in self.op.output_fields:
1653
        if field == "name":
1654
          val = os_name
1655
        elif field == "valid":
1656
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1657
        elif field == "node_status":
1658
          val = {}
1659
          for node_name, nos_list in os_data.iteritems():
1660
            val[node_name] = [(v.status, v.path) for v in nos_list]
1661
        else:
1662
          raise errors.ParameterError(field)
1663
        row.append(val)
1664
      output.append(row)
1665

    
1666
    return output
1667

    
1668

    
1669
class LURemoveNode(LogicalUnit):
1670
  """Logical unit for removing a node.
1671

1672
  """
1673
  HPATH = "node-remove"
1674
  HTYPE = constants.HTYPE_NODE
1675
  _OP_REQP = ["node_name"]
1676

    
1677
  def BuildHooksEnv(self):
1678
    """Build hooks env.
1679

1680
    This doesn't run on the target node in the pre phase as a failed
1681
    node would then be impossible to remove.
1682

1683
    """
1684
    env = {
1685
      "OP_TARGET": self.op.node_name,
1686
      "NODE_NAME": self.op.node_name,
1687
      }
1688
    all_nodes = self.cfg.GetNodeList()
1689
    all_nodes.remove(self.op.node_name)
1690
    return env, all_nodes, all_nodes
1691

    
1692
  def CheckPrereq(self):
1693
    """Check prerequisites.
1694

1695
    This checks:
1696
     - the node exists in the configuration
1697
     - it does not have primary or secondary instances
1698
     - it's not the master
1699

1700
    Any errors are signalled by raising errors.OpPrereqError.
1701

1702
    """
1703
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1704
    if node is None:
1705
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1706

    
1707
    instance_list = self.cfg.GetInstanceList()
1708

    
1709
    masternode = self.cfg.GetMasterNode()
1710
    if node.name == masternode:
1711
      raise errors.OpPrereqError("Node is the master node,"
1712
                                 " you need to failover first.")
1713

    
1714
    for instance_name in instance_list:
1715
      instance = self.cfg.GetInstanceInfo(instance_name)
1716
      if node.name in instance.all_nodes:
1717
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1718
                                   " please remove first." % instance_name)
1719
    self.op.node_name = node.name
1720
    self.node = node
1721

    
1722
  def Exec(self, feedback_fn):
1723
    """Removes the node from the cluster.
1724

1725
    """
1726
    node = self.node
1727
    logging.info("Stopping the node daemon and removing configs from node %s",
1728
                 node.name)
1729

    
1730
    self.context.RemoveNode(node.name)
1731

    
1732
    self.rpc.call_node_leave_cluster(node.name)
1733

    
1734
    # Promote nodes to master candidate as needed
1735
    _AdjustCandidatePool(self)
1736

    
1737

    
1738
class LUQueryNodes(NoHooksLU):
1739
  """Logical unit for querying nodes.
1740

1741
  """
1742
  _OP_REQP = ["output_fields", "names"]
1743
  REQ_BGL = False
1744
  _FIELDS_DYNAMIC = utils.FieldSet(
1745
    "dtotal", "dfree",
1746
    "mtotal", "mnode", "mfree",
1747
    "bootid",
1748
    "ctotal",
1749
    )
1750

    
1751
  _FIELDS_STATIC = utils.FieldSet(
1752
    "name", "pinst_cnt", "sinst_cnt",
1753
    "pinst_list", "sinst_list",
1754
    "pip", "sip", "tags",
1755
    "serial_no",
1756
    "master_candidate",
1757
    "master",
1758
    "offline",
1759
    )
1760

    
1761
  def ExpandNames(self):
1762
    _CheckOutputFields(static=self._FIELDS_STATIC,
1763
                       dynamic=self._FIELDS_DYNAMIC,
1764
                       selected=self.op.output_fields)
1765

    
1766
    self.needed_locks = {}
1767
    self.share_locks[locking.LEVEL_NODE] = 1
1768

    
1769
    if self.op.names:
1770
      self.wanted = _GetWantedNodes(self, self.op.names)
1771
    else:
1772
      self.wanted = locking.ALL_SET
1773

    
1774
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1775
    if self.do_locking:
1776
      # if we don't request only static fields, we need to lock the nodes
1777
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1778

    
1779

    
1780
  def CheckPrereq(self):
1781
    """Check prerequisites.
1782

1783
    """
1784
    # The validation of the node list is done in the _GetWantedNodes,
1785
    # if non empty, and if empty, there's no validation to do
1786
    pass
1787

    
1788
  def Exec(self, feedback_fn):
1789
    """Computes the list of nodes and their attributes.
1790

1791
    """
1792
    all_info = self.cfg.GetAllNodesInfo()
1793
    if self.do_locking:
1794
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1795
    elif self.wanted != locking.ALL_SET:
1796
      nodenames = self.wanted
1797
      missing = set(nodenames).difference(all_info.keys())
1798
      if missing:
1799
        raise errors.OpExecError(
1800
          "Some nodes were removed before retrieving their data: %s" % missing)
1801
    else:
1802
      nodenames = all_info.keys()
1803

    
1804
    nodenames = utils.NiceSort(nodenames)
1805
    nodelist = [all_info[name] for name in nodenames]
1806

    
1807
    # begin data gathering
1808

    
1809
    if self.do_locking:
1810
      live_data = {}
1811
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1812
                                          self.cfg.GetHypervisorType())
1813
      for name in nodenames:
1814
        nodeinfo = node_data[name]
1815
        if not nodeinfo.failed and nodeinfo.data:
1816
          nodeinfo = nodeinfo.data
1817
          fn = utils.TryConvert
1818
          live_data[name] = {
1819
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1820
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1821
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1822
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1823
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1824
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1825
            "bootid": nodeinfo.get('bootid', None),
1826
            }
1827
        else:
1828
          live_data[name] = {}
1829
    else:
1830
      live_data = dict.fromkeys(nodenames, {})
1831

    
1832
    node_to_primary = dict([(name, set()) for name in nodenames])
1833
    node_to_secondary = dict([(name, set()) for name in nodenames])
1834

    
1835
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1836
                             "sinst_cnt", "sinst_list"))
1837
    if inst_fields & frozenset(self.op.output_fields):
1838
      instancelist = self.cfg.GetInstanceList()
1839

    
1840
      for instance_name in instancelist:
1841
        inst = self.cfg.GetInstanceInfo(instance_name)
1842
        if inst.primary_node in node_to_primary:
1843
          node_to_primary[inst.primary_node].add(inst.name)
1844
        for secnode in inst.secondary_nodes:
1845
          if secnode in node_to_secondary:
1846
            node_to_secondary[secnode].add(inst.name)
1847

    
1848
    master_node = self.cfg.GetMasterNode()
1849

    
1850
    # end data gathering
1851

    
1852
    output = []
1853
    for node in nodelist:
1854
      node_output = []
1855
      for field in self.op.output_fields:
1856
        if field == "name":
1857
          val = node.name
1858
        elif field == "pinst_list":
1859
          val = list(node_to_primary[node.name])
1860
        elif field == "sinst_list":
1861
          val = list(node_to_secondary[node.name])
1862
        elif field == "pinst_cnt":
1863
          val = len(node_to_primary[node.name])
1864
        elif field == "sinst_cnt":
1865
          val = len(node_to_secondary[node.name])
1866
        elif field == "pip":
1867
          val = node.primary_ip
1868
        elif field == "sip":
1869
          val = node.secondary_ip
1870
        elif field == "tags":
1871
          val = list(node.GetTags())
1872
        elif field == "serial_no":
1873
          val = node.serial_no
1874
        elif field == "master_candidate":
1875
          val = node.master_candidate
1876
        elif field == "master":
1877
          val = node.name == master_node
1878
        elif field == "offline":
1879
          val = node.offline
1880
        elif self._FIELDS_DYNAMIC.Matches(field):
1881
          val = live_data[node.name].get(field, None)
1882
        else:
1883
          raise errors.ParameterError(field)
1884
        node_output.append(val)
1885
      output.append(node_output)
1886

    
1887
    return output
1888

    
1889

    
1890
class LUQueryNodeVolumes(NoHooksLU):
1891
  """Logical unit for getting volumes on node(s).
1892

1893
  """
1894
  _OP_REQP = ["nodes", "output_fields"]
1895
  REQ_BGL = False
1896
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1897
  _FIELDS_STATIC = utils.FieldSet("node")
1898

    
1899
  def ExpandNames(self):
1900
    _CheckOutputFields(static=self._FIELDS_STATIC,
1901
                       dynamic=self._FIELDS_DYNAMIC,
1902
                       selected=self.op.output_fields)
1903

    
1904
    self.needed_locks = {}
1905
    self.share_locks[locking.LEVEL_NODE] = 1
1906
    if not self.op.nodes:
1907
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1908
    else:
1909
      self.needed_locks[locking.LEVEL_NODE] = \
1910
        _GetWantedNodes(self, self.op.nodes)
1911

    
1912
  def CheckPrereq(self):
1913
    """Check prerequisites.
1914

1915
    This checks that the fields required are valid output fields.
1916

1917
    """
1918
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1919

    
1920
  def Exec(self, feedback_fn):
1921
    """Computes the list of nodes and their attributes.
1922

1923
    """
1924
    nodenames = self.nodes
1925
    volumes = self.rpc.call_node_volumes(nodenames)
1926

    
1927
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1928
             in self.cfg.GetInstanceList()]
1929

    
1930
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1931

    
1932
    output = []
1933
    for node in nodenames:
1934
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1935
        continue
1936

    
1937
      node_vols = volumes[node].data[:]
1938
      node_vols.sort(key=lambda vol: vol['dev'])
1939

    
1940
      for vol in node_vols:
1941
        node_output = []
1942
        for field in self.op.output_fields:
1943
          if field == "node":
1944
            val = node
1945
          elif field == "phys":
1946
            val = vol['dev']
1947
          elif field == "vg":
1948
            val = vol['vg']
1949
          elif field == "name":
1950
            val = vol['name']
1951
          elif field == "size":
1952
            val = int(float(vol['size']))
1953
          elif field == "instance":
1954
            for inst in ilist:
1955
              if node not in lv_by_node[inst]:
1956
                continue
1957
              if vol['name'] in lv_by_node[inst][node]:
1958
                val = inst.name
1959
                break
1960
            else:
1961
              val = '-'
1962
          else:
1963
            raise errors.ParameterError(field)
1964
          node_output.append(str(val))
1965

    
1966
        output.append(node_output)
1967

    
1968
    return output
1969

    
1970

    
1971
class LUAddNode(LogicalUnit):
1972
  """Logical unit for adding node to the cluster.
1973

1974
  """
1975
  HPATH = "node-add"
1976
  HTYPE = constants.HTYPE_NODE
1977
  _OP_REQP = ["node_name"]
1978

    
1979
  def BuildHooksEnv(self):
1980
    """Build hooks env.
1981

1982
    This will run on all nodes before, and on all nodes + the new node after.
1983

1984
    """
1985
    env = {
1986
      "OP_TARGET": self.op.node_name,
1987
      "NODE_NAME": self.op.node_name,
1988
      "NODE_PIP": self.op.primary_ip,
1989
      "NODE_SIP": self.op.secondary_ip,
1990
      }
1991
    nodes_0 = self.cfg.GetNodeList()
1992
    nodes_1 = nodes_0 + [self.op.node_name, ]
1993
    return env, nodes_0, nodes_1
1994

    
1995
  def CheckPrereq(self):
1996
    """Check prerequisites.
1997

1998
    This checks:
1999
     - the new node is not already in the config
2000
     - it is resolvable
2001
     - its parameters (single/dual homed) matches the cluster
2002

2003
    Any errors are signalled by raising errors.OpPrereqError.
2004

2005
    """
2006
    node_name = self.op.node_name
2007
    cfg = self.cfg
2008

    
2009
    dns_data = utils.HostInfo(node_name)
2010

    
2011
    node = dns_data.name
2012
    primary_ip = self.op.primary_ip = dns_data.ip
2013
    secondary_ip = getattr(self.op, "secondary_ip", None)
2014
    if secondary_ip is None:
2015
      secondary_ip = primary_ip
2016
    if not utils.IsValidIP(secondary_ip):
2017
      raise errors.OpPrereqError("Invalid secondary IP given")
2018
    self.op.secondary_ip = secondary_ip
2019

    
2020
    node_list = cfg.GetNodeList()
2021
    if not self.op.readd and node in node_list:
2022
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2023
                                 node)
2024
    elif self.op.readd and node not in node_list:
2025
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2026

    
2027
    for existing_node_name in node_list:
2028
      existing_node = cfg.GetNodeInfo(existing_node_name)
2029

    
2030
      if self.op.readd and node == existing_node_name:
2031
        if (existing_node.primary_ip != primary_ip or
2032
            existing_node.secondary_ip != secondary_ip):
2033
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2034
                                     " address configuration as before")
2035
        continue
2036

    
2037
      if (existing_node.primary_ip == primary_ip or
2038
          existing_node.secondary_ip == primary_ip or
2039
          existing_node.primary_ip == secondary_ip or
2040
          existing_node.secondary_ip == secondary_ip):
2041
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2042
                                   " existing node %s" % existing_node.name)
2043

    
2044
    # check that the type of the node (single versus dual homed) is the
2045
    # same as for the master
2046
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2047
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2048
    newbie_singlehomed = secondary_ip == primary_ip
2049
    if master_singlehomed != newbie_singlehomed:
2050
      if master_singlehomed:
2051
        raise errors.OpPrereqError("The master has no private ip but the"
2052
                                   " new node has one")
2053
      else:
2054
        raise errors.OpPrereqError("The master has a private ip but the"
2055
                                   " new node doesn't have one")
2056

    
2057
    # checks reachablity
2058
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2059
      raise errors.OpPrereqError("Node not reachable by ping")
2060

    
2061
    if not newbie_singlehomed:
2062
      # check reachability from my secondary ip to newbie's secondary ip
2063
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2064
                           source=myself.secondary_ip):
2065
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2066
                                   " based ping to noded port")
2067

    
2068
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2069
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2070
    master_candidate = mc_now < cp_size
2071

    
2072
    self.new_node = objects.Node(name=node,
2073
                                 primary_ip=primary_ip,
2074
                                 secondary_ip=secondary_ip,
2075
                                 master_candidate=master_candidate,
2076
                                 offline=False)
2077

    
2078
  def Exec(self, feedback_fn):
2079
    """Adds the new node to the cluster.
2080

2081
    """
2082
    new_node = self.new_node
2083
    node = new_node.name
2084

    
2085
    # check connectivity
2086
    result = self.rpc.call_version([node])[node]
2087
    result.Raise()
2088
    if result.data:
2089
      if constants.PROTOCOL_VERSION == result.data:
2090
        logging.info("Communication to node %s fine, sw version %s match",
2091
                     node, result.data)
2092
      else:
2093
        raise errors.OpExecError("Version mismatch master version %s,"
2094
                                 " node version %s" %
2095
                                 (constants.PROTOCOL_VERSION, result.data))
2096
    else:
2097
      raise errors.OpExecError("Cannot get version from the new node")
2098

    
2099
    # setup ssh on node
2100
    logging.info("Copy ssh key to node %s", node)
2101
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2102
    keyarray = []
2103
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2104
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2105
                priv_key, pub_key]
2106

    
2107
    for i in keyfiles:
2108
      f = open(i, 'r')
2109
      try:
2110
        keyarray.append(f.read())
2111
      finally:
2112
        f.close()
2113

    
2114
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2115
                                    keyarray[2],
2116
                                    keyarray[3], keyarray[4], keyarray[5])
2117

    
2118
    if result.failed or not result.data:
2119
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2120

    
2121
    # Add node to our /etc/hosts, and add key to known_hosts
2122
    utils.AddHostToEtcHosts(new_node.name)
2123

    
2124
    if new_node.secondary_ip != new_node.primary_ip:
2125
      result = self.rpc.call_node_has_ip_address(new_node.name,
2126
                                                 new_node.secondary_ip)
2127
      if result.failed or not result.data:
2128
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2129
                                 " you gave (%s). Please fix and re-run this"
2130
                                 " command." % new_node.secondary_ip)
2131

    
2132
    node_verify_list = [self.cfg.GetMasterNode()]
2133
    node_verify_param = {
2134
      'nodelist': [node],
2135
      # TODO: do a node-net-test as well?
2136
    }
2137

    
2138
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2139
                                       self.cfg.GetClusterName())
2140
    for verifier in node_verify_list:
2141
      if result[verifier].failed or not result[verifier].data:
2142
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2143
                                 " for remote verification" % verifier)
2144
      if result[verifier].data['nodelist']:
2145
        for failed in result[verifier].data['nodelist']:
2146
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2147
                      (verifier, result[verifier]['nodelist'][failed]))
2148
        raise errors.OpExecError("ssh/hostname verification failed.")
2149

    
2150
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2151
    # including the node just added
2152
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2153
    dist_nodes = self.cfg.GetNodeList()
2154
    if not self.op.readd:
2155
      dist_nodes.append(node)
2156
    if myself.name in dist_nodes:
2157
      dist_nodes.remove(myself.name)
2158

    
2159
    logging.debug("Copying hosts and known_hosts to all nodes")
2160
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2161
      result = self.rpc.call_upload_file(dist_nodes, fname)
2162
      for to_node, to_result in result.iteritems():
2163
        if to_result.failed or not to_result.data:
2164
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2165

    
2166
    to_copy = []
2167
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2168
      to_copy.append(constants.VNC_PASSWORD_FILE)
2169
    for fname in to_copy:
2170
      result = self.rpc.call_upload_file([node], fname)
2171
      if result[node].failed or not result[node]:
2172
        logging.error("Could not copy file %s to node %s", fname, node)
2173

    
2174
    if self.op.readd:
2175
      self.context.ReaddNode(new_node)
2176
    else:
2177
      self.context.AddNode(new_node)
2178

    
2179

    
2180
class LUSetNodeParams(LogicalUnit):
2181
  """Modifies the parameters of a node.
2182

2183
  """
2184
  HPATH = "node-modify"
2185
  HTYPE = constants.HTYPE_NODE
2186
  _OP_REQP = ["node_name"]
2187
  REQ_BGL = False
2188

    
2189
  def CheckArguments(self):
2190
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2191
    if node_name is None:
2192
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2193
    self.op.node_name = node_name
2194
    _CheckBooleanOpField(self.op, 'master_candidate')
2195
    _CheckBooleanOpField(self.op, 'offline')
2196
    if self.op.master_candidate is None and self.op.offline is None:
2197
      raise errors.OpPrereqError("Please pass at least one modification")
2198
    if self.op.offline == True and self.op.master_candidate == True:
2199
      raise errors.OpPrereqError("Can't set the node into offline and"
2200
                                 " master_candidate at the same time")
2201

    
2202
  def ExpandNames(self):
2203
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2204

    
2205
  def BuildHooksEnv(self):
2206
    """Build hooks env.
2207

2208
    This runs on the master node.
2209

2210
    """
2211
    env = {
2212
      "OP_TARGET": self.op.node_name,
2213
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2214
      "OFFLINE": str(self.op.offline),
2215
      }
2216
    nl = [self.cfg.GetMasterNode(),
2217
          self.op.node_name]
2218
    return env, nl, nl
2219

    
2220
  def CheckPrereq(self):
2221
    """Check prerequisites.
2222

2223
    This only checks the instance list against the existing names.
2224

2225
    """
2226
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2227

    
2228
    if ((self.op.master_candidate == False or self.op.offline == True)
2229
        and node.master_candidate):
2230
      # we will demote the node from master_candidate
2231
      if self.op.node_name == self.cfg.GetMasterNode():
2232
        raise errors.OpPrereqError("The master node has to be a"
2233
                                   " master candidate and online")
2234
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2235
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2236
      if num_candidates <= cp_size:
2237
        msg = ("Not enough master candidates (desired"
2238
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2239
        if self.op.force:
2240
          self.LogWarning(msg)
2241
        else:
2242
          raise errors.OpPrereqError(msg)
2243

    
2244
    if (self.op.master_candidate == True and node.offline and
2245
        not self.op.offline == False):
2246
      raise errors.OpPrereqError("Can't set an offline node to"
2247
                                 " master_candidate")
2248

    
2249
    return
2250

    
2251
  def Exec(self, feedback_fn):
2252
    """Modifies a node.
2253

2254
    """
2255
    node = self.node
2256

    
2257
    result = []
2258

    
2259
    if self.op.offline is not None:
2260
      node.offline = self.op.offline
2261
      result.append(("offline", str(self.op.offline)))
2262
      if self.op.offline == True and node.master_candidate:
2263
        node.master_candidate = False
2264
        result.append(("master_candidate", "auto-demotion due to offline"))
2265

    
2266
    if self.op.master_candidate is not None:
2267
      node.master_candidate = self.op.master_candidate
2268
      result.append(("master_candidate", str(self.op.master_candidate)))
2269
      if self.op.master_candidate == False:
2270
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2271
        if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2272
            or len(rrc.data) != 2):
2273
          self.LogWarning("Node rpc error: %s" % rrc.error)
2274
        elif not rrc.data[0]:
2275
          self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2276

    
2277
    # this will trigger configuration file update, if needed
2278
    self.cfg.Update(node)
2279
    # this will trigger job queue propagation or cleanup
2280
    if self.op.node_name != self.cfg.GetMasterNode():
2281
      self.context.ReaddNode(node)
2282

    
2283
    return result
2284

    
2285

    
2286
class LUQueryClusterInfo(NoHooksLU):
2287
  """Query cluster configuration.
2288

2289
  """
2290
  _OP_REQP = []
2291
  REQ_BGL = False
2292

    
2293
  def ExpandNames(self):
2294
    self.needed_locks = {}
2295

    
2296
  def CheckPrereq(self):
2297
    """No prerequsites needed for this LU.
2298

2299
    """
2300
    pass
2301

    
2302
  def Exec(self, feedback_fn):
2303
    """Return cluster config.
2304

2305
    """
2306
    cluster = self.cfg.GetClusterInfo()
2307
    result = {
2308
      "software_version": constants.RELEASE_VERSION,
2309
      "protocol_version": constants.PROTOCOL_VERSION,
2310
      "config_version": constants.CONFIG_VERSION,
2311
      "os_api_version": constants.OS_API_VERSION,
2312
      "export_version": constants.EXPORT_VERSION,
2313
      "architecture": (platform.architecture()[0], platform.machine()),
2314
      "name": cluster.cluster_name,
2315
      "master": cluster.master_node,
2316
      "default_hypervisor": cluster.default_hypervisor,
2317
      "enabled_hypervisors": cluster.enabled_hypervisors,
2318
      "hvparams": cluster.hvparams,
2319
      "beparams": cluster.beparams,
2320
      "candidate_pool_size": cluster.candidate_pool_size,
2321
      }
2322

    
2323
    return result
2324

    
2325

    
2326
class LUQueryConfigValues(NoHooksLU):
2327
  """Return configuration values.
2328

2329
  """
2330
  _OP_REQP = []
2331
  REQ_BGL = False
2332
  _FIELDS_DYNAMIC = utils.FieldSet()
2333
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2334

    
2335
  def ExpandNames(self):
2336
    self.needed_locks = {}
2337

    
2338
    _CheckOutputFields(static=self._FIELDS_STATIC,
2339
                       dynamic=self._FIELDS_DYNAMIC,
2340
                       selected=self.op.output_fields)
2341

    
2342
  def CheckPrereq(self):
2343
    """No prerequisites.
2344

2345
    """
2346
    pass
2347

    
2348
  def Exec(self, feedback_fn):
2349
    """Dump a representation of the cluster config to the standard output.
2350

2351
    """
2352
    values = []
2353
    for field in self.op.output_fields:
2354
      if field == "cluster_name":
2355
        entry = self.cfg.GetClusterName()
2356
      elif field == "master_node":
2357
        entry = self.cfg.GetMasterNode()
2358
      elif field == "drain_flag":
2359
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2360
      else:
2361
        raise errors.ParameterError(field)
2362
      values.append(entry)
2363
    return values
2364

    
2365

    
2366
class LUActivateInstanceDisks(NoHooksLU):
2367
  """Bring up an instance's disks.
2368

2369
  """
2370
  _OP_REQP = ["instance_name"]
2371
  REQ_BGL = False
2372

    
2373
  def ExpandNames(self):
2374
    self._ExpandAndLockInstance()
2375
    self.needed_locks[locking.LEVEL_NODE] = []
2376
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2377

    
2378
  def DeclareLocks(self, level):
2379
    if level == locking.LEVEL_NODE:
2380
      self._LockInstancesNodes()
2381

    
2382
  def CheckPrereq(self):
2383
    """Check prerequisites.
2384

2385
    This checks that the instance is in the cluster.
2386

2387
    """
2388
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2389
    assert self.instance is not None, \
2390
      "Cannot retrieve locked instance %s" % self.op.instance_name
2391
    _CheckNodeOnline(self, self.instance.primary_node)
2392

    
2393
  def Exec(self, feedback_fn):
2394
    """Activate the disks.
2395

2396
    """
2397
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2398
    if not disks_ok:
2399
      raise errors.OpExecError("Cannot activate block devices")
2400

    
2401
    return disks_info
2402

    
2403

    
2404
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2405
  """Prepare the block devices for an instance.
2406

2407
  This sets up the block devices on all nodes.
2408

2409
  @type lu: L{LogicalUnit}
2410
  @param lu: the logical unit on whose behalf we execute
2411
  @type instance: L{objects.Instance}
2412
  @param instance: the instance for whose disks we assemble
2413
  @type ignore_secondaries: boolean
2414
  @param ignore_secondaries: if true, errors on secondary nodes
2415
      won't result in an error return from the function
2416
  @return: False if the operation failed, otherwise a list of
2417
      (host, instance_visible_name, node_visible_name)
2418
      with the mapping from node devices to instance devices
2419

2420
  """
2421
  device_info = []
2422
  disks_ok = True
2423
  iname = instance.name
2424
  # With the two passes mechanism we try to reduce the window of
2425
  # opportunity for the race condition of switching DRBD to primary
2426
  # before handshaking occured, but we do not eliminate it
2427

    
2428
  # The proper fix would be to wait (with some limits) until the
2429
  # connection has been made and drbd transitions from WFConnection
2430
  # into any other network-connected state (Connected, SyncTarget,
2431
  # SyncSource, etc.)
2432

    
2433
  # 1st pass, assemble on all nodes in secondary mode
2434
  for inst_disk in instance.disks:
2435
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2436
      lu.cfg.SetDiskID(node_disk, node)
2437
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2438
      if result.failed or not result:
2439
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2440
                           " (is_primary=False, pass=1)",
2441
                           inst_disk.iv_name, node)
2442
        if not ignore_secondaries:
2443
          disks_ok = False
2444

    
2445
  # FIXME: race condition on drbd migration to primary
2446

    
2447
  # 2nd pass, do only the primary node
2448
  for inst_disk in instance.disks:
2449
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2450
      if node != instance.primary_node:
2451
        continue
2452
      lu.cfg.SetDiskID(node_disk, node)
2453
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2454
      if result.failed or not result:
2455
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2456
                           " (is_primary=True, pass=2)",
2457
                           inst_disk.iv_name, node)
2458
        disks_ok = False
2459
    device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
2460

    
2461
  # leave the disks configured for the primary node
2462
  # this is a workaround that would be fixed better by
2463
  # improving the logical/physical id handling
2464
  for disk in instance.disks:
2465
    lu.cfg.SetDiskID(disk, instance.primary_node)
2466

    
2467
  return disks_ok, device_info
2468

    
2469

    
2470
def _StartInstanceDisks(lu, instance, force):
2471
  """Start the disks of an instance.
2472

2473
  """
2474
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2475
                                           ignore_secondaries=force)
2476
  if not disks_ok:
2477
    _ShutdownInstanceDisks(lu, instance)
2478
    if force is not None and not force:
2479
      lu.proc.LogWarning("", hint="If the message above refers to a"
2480
                         " secondary node,"
2481
                         " you can retry the operation using '--force'.")
2482
    raise errors.OpExecError("Disk consistency error")
2483

    
2484

    
2485
class LUDeactivateInstanceDisks(NoHooksLU):
2486
  """Shutdown an instance's disks.
2487

2488
  """
2489
  _OP_REQP = ["instance_name"]
2490
  REQ_BGL = False
2491

    
2492
  def ExpandNames(self):
2493
    self._ExpandAndLockInstance()
2494
    self.needed_locks[locking.LEVEL_NODE] = []
2495
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2496

    
2497
  def DeclareLocks(self, level):
2498
    if level == locking.LEVEL_NODE:
2499
      self._LockInstancesNodes()
2500

    
2501
  def CheckPrereq(self):
2502
    """Check prerequisites.
2503

2504
    This checks that the instance is in the cluster.
2505

2506
    """
2507
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2508
    assert self.instance is not None, \
2509
      "Cannot retrieve locked instance %s" % self.op.instance_name
2510

    
2511
  def Exec(self, feedback_fn):
2512
    """Deactivate the disks
2513

2514
    """
2515
    instance = self.instance
2516
    _SafeShutdownInstanceDisks(self, instance)
2517

    
2518

    
2519
def _SafeShutdownInstanceDisks(lu, instance):
2520
  """Shutdown block devices of an instance.
2521

2522
  This function checks if an instance is running, before calling
2523
  _ShutdownInstanceDisks.
2524

2525
  """
2526
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2527
                                      [instance.hypervisor])
2528
  ins_l = ins_l[instance.primary_node]
2529
  if ins_l.failed or not isinstance(ins_l.data, list):
2530
    raise errors.OpExecError("Can't contact node '%s'" %
2531
                             instance.primary_node)
2532

    
2533
  if instance.name in ins_l.data:
2534
    raise errors.OpExecError("Instance is running, can't shutdown"
2535
                             " block devices.")
2536

    
2537
  _ShutdownInstanceDisks(lu, instance)
2538

    
2539

    
2540
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2541
  """Shutdown block devices of an instance.
2542

2543
  This does the shutdown on all nodes of the instance.
2544

2545
  If the ignore_primary is false, errors on the primary node are
2546
  ignored.
2547

2548
  """
2549
  result = True
2550
  for disk in instance.disks:
2551
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2552
      lu.cfg.SetDiskID(top_disk, node)
2553
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2554
      if result.failed or not result.data:
2555
        logging.error("Could not shutdown block device %s on node %s",
2556
                      disk.iv_name, node)
2557
        if not ignore_primary or node != instance.primary_node:
2558
          result = False
2559
  return result
2560

    
2561

    
2562
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2563
  """Checks if a node has enough free memory.
2564

2565
  This function check if a given node has the needed amount of free
2566
  memory. In case the node has less memory or we cannot get the
2567
  information from the node, this function raise an OpPrereqError
2568
  exception.
2569

2570
  @type lu: C{LogicalUnit}
2571
  @param lu: a logical unit from which we get configuration data
2572
  @type node: C{str}
2573
  @param node: the node to check
2574
  @type reason: C{str}
2575
  @param reason: string to use in the error message
2576
  @type requested: C{int}
2577
  @param requested: the amount of memory in MiB to check for
2578
  @type hypervisor_name: C{str}
2579
  @param hypervisor_name: the hypervisor to ask for memory stats
2580
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2581
      we cannot check the node
2582

2583
  """
2584
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2585
  nodeinfo[node].Raise()
2586
  free_mem = nodeinfo[node].data.get('memory_free')
2587
  if not isinstance(free_mem, int):
2588
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2589
                             " was '%s'" % (node, free_mem))
2590
  if requested > free_mem:
2591
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2592
                             " needed %s MiB, available %s MiB" %
2593
                             (node, reason, requested, free_mem))
2594

    
2595

    
2596
class LUStartupInstance(LogicalUnit):
2597
  """Starts an instance.
2598

2599
  """
2600
  HPATH = "instance-start"
2601
  HTYPE = constants.HTYPE_INSTANCE
2602
  _OP_REQP = ["instance_name", "force"]
2603
  REQ_BGL = False
2604

    
2605
  def ExpandNames(self):
2606
    self._ExpandAndLockInstance()
2607

    
2608
  def BuildHooksEnv(self):
2609
    """Build hooks env.
2610

2611
    This runs on master, primary and secondary nodes of the instance.
2612

2613
    """
2614
    env = {
2615
      "FORCE": self.op.force,
2616
      }
2617
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2618
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2619
    return env, nl, nl
2620

    
2621
  def CheckPrereq(self):
2622
    """Check prerequisites.
2623

2624
    This checks that the instance is in the cluster.
2625

2626
    """
2627
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2628
    assert self.instance is not None, \
2629
      "Cannot retrieve locked instance %s" % self.op.instance_name
2630

    
2631
    _CheckNodeOnline(self, instance.primary_node)
2632

    
2633
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2634
    # check bridges existance
2635
    _CheckInstanceBridgesExist(self, instance)
2636

    
2637
    _CheckNodeFreeMemory(self, instance.primary_node,
2638
                         "starting instance %s" % instance.name,
2639
                         bep[constants.BE_MEMORY], instance.hypervisor)
2640

    
2641
  def Exec(self, feedback_fn):
2642
    """Start the instance.
2643

2644
    """
2645
    instance = self.instance
2646
    force = self.op.force
2647
    extra_args = getattr(self.op, "extra_args", "")
2648

    
2649
    self.cfg.MarkInstanceUp(instance.name)
2650

    
2651
    node_current = instance.primary_node
2652

    
2653
    _StartInstanceDisks(self, instance, force)
2654

    
2655
    result = self.rpc.call_instance_start(node_current, instance, extra_args)
2656
    msg = result.RemoteFailMsg()
2657
    if msg:
2658
      _ShutdownInstanceDisks(self, instance)
2659
      raise errors.OpExecError("Could not start instance: %s" % msg)
2660

    
2661

    
2662
class LURebootInstance(LogicalUnit):
2663
  """Reboot an instance.
2664

2665
  """
2666
  HPATH = "instance-reboot"
2667
  HTYPE = constants.HTYPE_INSTANCE
2668
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2669
  REQ_BGL = False
2670

    
2671
  def ExpandNames(self):
2672
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2673
                                   constants.INSTANCE_REBOOT_HARD,
2674
                                   constants.INSTANCE_REBOOT_FULL]:
2675
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2676
                                  (constants.INSTANCE_REBOOT_SOFT,
2677
                                   constants.INSTANCE_REBOOT_HARD,
2678
                                   constants.INSTANCE_REBOOT_FULL))
2679
    self._ExpandAndLockInstance()
2680

    
2681
  def BuildHooksEnv(self):
2682
    """Build hooks env.
2683

2684
    This runs on master, primary and secondary nodes of the instance.
2685

2686
    """
2687
    env = {
2688
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2689
      }
2690
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2691
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2692
    return env, nl, nl
2693

    
2694
  def CheckPrereq(self):
2695
    """Check prerequisites.
2696

2697
    This checks that the instance is in the cluster.
2698

2699
    """
2700
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2701
    assert self.instance is not None, \
2702
      "Cannot retrieve locked instance %s" % self.op.instance_name
2703

    
2704
    _CheckNodeOnline(self, instance.primary_node)
2705

    
2706
    # check bridges existance
2707
    _CheckInstanceBridgesExist(self, instance)
2708

    
2709
  def Exec(self, feedback_fn):
2710
    """Reboot the instance.
2711

2712
    """
2713
    instance = self.instance
2714
    ignore_secondaries = self.op.ignore_secondaries
2715
    reboot_type = self.op.reboot_type
2716
    extra_args = getattr(self.op, "extra_args", "")
2717

    
2718
    node_current = instance.primary_node
2719

    
2720
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2721
                       constants.INSTANCE_REBOOT_HARD]:
2722
      result = self.rpc.call_instance_reboot(node_current, instance,
2723
                                             reboot_type, extra_args)
2724
      if result.failed or not result.data:
2725
        raise errors.OpExecError("Could not reboot instance")
2726
    else:
2727
      if not self.rpc.call_instance_shutdown(node_current, instance):
2728
        raise errors.OpExecError("could not shutdown instance for full reboot")
2729
      _ShutdownInstanceDisks(self, instance)
2730
      _StartInstanceDisks(self, instance, ignore_secondaries)
2731
      result = self.rpc.call_instance_start(node_current, instance, extra_args)
2732
      msg = result.RemoteFailMsg()
2733
      if msg:
2734
        _ShutdownInstanceDisks(self, instance)
2735
        raise errors.OpExecError("Could not start instance for"
2736
                                 " full reboot: %s" % msg)
2737

    
2738
    self.cfg.MarkInstanceUp(instance.name)
2739

    
2740

    
2741
class LUShutdownInstance(LogicalUnit):
2742
  """Shutdown an instance.
2743

2744
  """
2745
  HPATH = "instance-stop"
2746
  HTYPE = constants.HTYPE_INSTANCE
2747
  _OP_REQP = ["instance_name"]
2748
  REQ_BGL = False
2749

    
2750
  def ExpandNames(self):
2751
    self._ExpandAndLockInstance()
2752

    
2753
  def BuildHooksEnv(self):
2754
    """Build hooks env.
2755

2756
    This runs on master, primary and secondary nodes of the instance.
2757

2758
    """
2759
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2760
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2761
    return env, nl, nl
2762

    
2763
  def CheckPrereq(self):
2764
    """Check prerequisites.
2765

2766
    This checks that the instance is in the cluster.
2767

2768
    """
2769
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2770
    assert self.instance is not None, \
2771
      "Cannot retrieve locked instance %s" % self.op.instance_name
2772
    _CheckNodeOnline(self, self.instance.primary_node)
2773

    
2774
  def Exec(self, feedback_fn):
2775
    """Shutdown the instance.
2776

2777
    """
2778
    instance = self.instance
2779
    node_current = instance.primary_node
2780
    self.cfg.MarkInstanceDown(instance.name)
2781
    result = self.rpc.call_instance_shutdown(node_current, instance)
2782
    if result.failed or not result.data:
2783
      self.proc.LogWarning("Could not shutdown instance")
2784

    
2785
    _ShutdownInstanceDisks(self, instance)
2786

    
2787

    
2788
class LUReinstallInstance(LogicalUnit):
2789
  """Reinstall an instance.
2790

2791
  """
2792
  HPATH = "instance-reinstall"
2793
  HTYPE = constants.HTYPE_INSTANCE
2794
  _OP_REQP = ["instance_name"]
2795
  REQ_BGL = False
2796

    
2797
  def ExpandNames(self):
2798
    self._ExpandAndLockInstance()
2799

    
2800
  def BuildHooksEnv(self):
2801
    """Build hooks env.
2802

2803
    This runs on master, primary and secondary nodes of the instance.
2804

2805
    """
2806
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2807
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2808
    return env, nl, nl
2809

    
2810
  def CheckPrereq(self):
2811
    """Check prerequisites.
2812

2813
    This checks that the instance is in the cluster and is not running.
2814

2815
    """
2816
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2817
    assert instance is not None, \
2818
      "Cannot retrieve locked instance %s" % self.op.instance_name
2819
    _CheckNodeOnline(self, instance.primary_node)
2820

    
2821
    if instance.disk_template == constants.DT_DISKLESS:
2822
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2823
                                 self.op.instance_name)
2824
    if instance.status != "down":
2825
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2826
                                 self.op.instance_name)
2827
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2828
                                              instance.name,
2829
                                              instance.hypervisor)
2830
    if remote_info.failed or remote_info.data:
2831
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2832
                                 (self.op.instance_name,
2833
                                  instance.primary_node))
2834

    
2835
    self.op.os_type = getattr(self.op, "os_type", None)
2836
    if self.op.os_type is not None:
2837
      # OS verification
2838
      pnode = self.cfg.GetNodeInfo(
2839
        self.cfg.ExpandNodeName(instance.primary_node))
2840
      if pnode is None:
2841
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2842
                                   self.op.pnode)
2843
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2844
      result.Raise()
2845
      if not isinstance(result.data, objects.OS):
2846
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2847
                                   " primary node"  % self.op.os_type)
2848

    
2849
    self.instance = instance
2850

    
2851
  def Exec(self, feedback_fn):
2852
    """Reinstall the instance.
2853

2854
    """
2855
    inst = self.instance
2856

    
2857
    if self.op.os_type is not None:
2858
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2859
      inst.os = self.op.os_type
2860
      self.cfg.Update(inst)
2861

    
2862
    _StartInstanceDisks(self, inst, None)
2863
    try:
2864
      feedback_fn("Running the instance OS create scripts...")
2865
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2866
      msg = result.RemoteFailMsg()
2867
      if msg:
2868
        raise errors.OpExecError("Could not install OS for instance %s"
2869
                                 " on node %s: %s" %
2870
                                 (inst.name, inst.primary_node, msg))
2871
    finally:
2872
      _ShutdownInstanceDisks(self, inst)
2873

    
2874

    
2875
class LURenameInstance(LogicalUnit):
2876
  """Rename an instance.
2877

2878
  """
2879
  HPATH = "instance-rename"
2880
  HTYPE = constants.HTYPE_INSTANCE
2881
  _OP_REQP = ["instance_name", "new_name"]
2882

    
2883
  def BuildHooksEnv(self):
2884
    """Build hooks env.
2885

2886
    This runs on master, primary and secondary nodes of the instance.
2887

2888
    """
2889
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2890
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2891
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2892
    return env, nl, nl
2893

    
2894
  def CheckPrereq(self):
2895
    """Check prerequisites.
2896

2897
    This checks that the instance is in the cluster and is not running.
2898

2899
    """
2900
    instance = self.cfg.GetInstanceInfo(
2901
      self.cfg.ExpandInstanceName(self.op.instance_name))
2902
    if instance is None:
2903
      raise errors.OpPrereqError("Instance '%s' not known" %
2904
                                 self.op.instance_name)
2905
    _CheckNodeOnline(self, instance.primary_node)
2906

    
2907
    if instance.status != "down":
2908
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2909
                                 self.op.instance_name)
2910
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2911
                                              instance.name,
2912
                                              instance.hypervisor)
2913
    remote_info.Raise()
2914
    if remote_info.data:
2915
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2916
                                 (self.op.instance_name,
2917
                                  instance.primary_node))
2918
    self.instance = instance
2919

    
2920
    # new name verification
2921
    name_info = utils.HostInfo(self.op.new_name)
2922

    
2923
    self.op.new_name = new_name = name_info.name
2924
    instance_list = self.cfg.GetInstanceList()
2925
    if new_name in instance_list:
2926
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2927
                                 new_name)
2928

    
2929
    if not getattr(self.op, "ignore_ip", False):
2930
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2931
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2932
                                   (name_info.ip, new_name))
2933

    
2934

    
2935
  def Exec(self, feedback_fn):
2936
    """Reinstall the instance.
2937

2938
    """
2939
    inst = self.instance
2940
    old_name = inst.name
2941

    
2942
    if inst.disk_template == constants.DT_FILE:
2943
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2944

    
2945
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2946
    # Change the instance lock. This is definitely safe while we hold the BGL
2947
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2948
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2949

    
2950
    # re-read the instance from the configuration after rename
2951
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2952

    
2953
    if inst.disk_template == constants.DT_FILE:
2954
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2955
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2956
                                                     old_file_storage_dir,
2957
                                                     new_file_storage_dir)
2958
      result.Raise()
2959
      if not result.data:
2960
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2961
                                 " directory '%s' to '%s' (but the instance"
2962
                                 " has been renamed in Ganeti)" % (
2963
                                 inst.primary_node, old_file_storage_dir,
2964
                                 new_file_storage_dir))
2965

    
2966
      if not result.data[0]:
2967
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2968
                                 " (but the instance has been renamed in"
2969
                                 " Ganeti)" % (old_file_storage_dir,
2970
                                               new_file_storage_dir))
2971

    
2972
    _StartInstanceDisks(self, inst, None)
2973
    try:
2974
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2975
                                                 old_name)
2976
      msg = result.RemoteFailMsg()
2977
      if msg:
2978
        msg = ("Could not run OS rename script for instance %s on node %s"
2979
               " (but the instance has been renamed in Ganeti): %s" %
2980
               (inst.name, inst.primary_node, msg))
2981
        self.proc.LogWarning(msg)
2982
    finally:
2983
      _ShutdownInstanceDisks(self, inst)
2984

    
2985

    
2986
class LURemoveInstance(LogicalUnit):
2987
  """Remove an instance.
2988

2989
  """
2990
  HPATH = "instance-remove"
2991
  HTYPE = constants.HTYPE_INSTANCE
2992
  _OP_REQP = ["instance_name", "ignore_failures"]
2993
  REQ_BGL = False
2994

    
2995
  def ExpandNames(self):
2996
    self._ExpandAndLockInstance()
2997
    self.needed_locks[locking.LEVEL_NODE] = []
2998
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2999

    
3000
  def DeclareLocks(self, level):
3001
    if level == locking.LEVEL_NODE:
3002
      self._LockInstancesNodes()
3003

    
3004
  def BuildHooksEnv(self):
3005
    """Build hooks env.
3006

3007
    This runs on master, primary and secondary nodes of the instance.
3008

3009
    """
3010
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3011
    nl = [self.cfg.GetMasterNode()]
3012
    return env, nl, nl
3013

    
3014
  def CheckPrereq(self):
3015
    """Check prerequisites.
3016

3017
    This checks that the instance is in the cluster.
3018

3019
    """
3020
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3021
    assert self.instance is not None, \
3022
      "Cannot retrieve locked instance %s" % self.op.instance_name
3023

    
3024
  def Exec(self, feedback_fn):
3025
    """Remove the instance.
3026

3027
    """
3028
    instance = self.instance
3029
    logging.info("Shutting down instance %s on node %s",
3030
                 instance.name, instance.primary_node)
3031

    
3032
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3033
    if result.failed or not result.data:
3034
      if self.op.ignore_failures:
3035
        feedback_fn("Warning: can't shutdown instance")
3036
      else:
3037
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3038
                                 (instance.name, instance.primary_node))
3039

    
3040
    logging.info("Removing block devices for instance %s", instance.name)
3041

    
3042
    if not _RemoveDisks(self, instance):
3043
      if self.op.ignore_failures:
3044
        feedback_fn("Warning: can't remove instance's disks")
3045
      else:
3046
        raise errors.OpExecError("Can't remove instance's disks")
3047

    
3048
    logging.info("Removing instance %s out of cluster config", instance.name)
3049

    
3050
    self.cfg.RemoveInstance(instance.name)
3051
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3052

    
3053

    
3054
class LUQueryInstances(NoHooksLU):
3055
  """Logical unit for querying instances.
3056

3057
  """
3058
  _OP_REQP = ["output_fields", "names"]
3059
  REQ_BGL = False
3060
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3061
                                    "admin_state", "admin_ram",
3062
                                    "disk_template", "ip", "mac", "bridge",
3063
                                    "sda_size", "sdb_size", "vcpus", "tags",
3064
                                    "network_port", "beparams",
3065
                                    "(disk).(size)/([0-9]+)",
3066
                                    "(disk).(sizes)",
3067
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
3068
                                    "(nic).(macs|ips|bridges)",
3069
                                    "(disk|nic).(count)",
3070
                                    "serial_no", "hypervisor", "hvparams",] +
3071
                                  ["hv/%s" % name
3072
                                   for name in constants.HVS_PARAMETERS] +
3073
                                  ["be/%s" % name
3074
                                   for name in constants.BES_PARAMETERS])
3075
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3076

    
3077

    
3078
  def ExpandNames(self):
3079
    _CheckOutputFields(static=self._FIELDS_STATIC,
3080
                       dynamic=self._FIELDS_DYNAMIC,
3081
                       selected=self.op.output_fields)
3082

    
3083
    self.needed_locks = {}
3084
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3085
    self.share_locks[locking.LEVEL_NODE] = 1
3086

    
3087
    if self.op.names:
3088
      self.wanted = _GetWantedInstances(self, self.op.names)
3089
    else:
3090
      self.wanted = locking.ALL_SET
3091

    
3092
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3093
    if self.do_locking:
3094
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3095
      self.needed_locks[locking.LEVEL_NODE] = []
3096
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3097

    
3098
  def DeclareLocks(self, level):
3099
    if level == locking.LEVEL_NODE and self.do_locking:
3100
      self._LockInstancesNodes()
3101

    
3102
  def CheckPrereq(self):
3103
    """Check prerequisites.
3104

3105
    """
3106
    pass
3107

    
3108
  def Exec(self, feedback_fn):
3109
    """Computes the list of nodes and their attributes.
3110

3111
    """
3112
    all_info = self.cfg.GetAllInstancesInfo()
3113
    if self.do_locking:
3114
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3115
    elif self.wanted != locking.ALL_SET:
3116
      instance_names = self.wanted
3117
      missing = set(instance_names).difference(all_info.keys())
3118
      if missing:
3119
        raise errors.OpExecError(
3120
          "Some instances were removed before retrieving their data: %s"
3121
          % missing)
3122
    else:
3123
      instance_names = all_info.keys()
3124

    
3125
    instance_names = utils.NiceSort(instance_names)
3126
    instance_list = [all_info[iname] for iname in instance_names]
3127

    
3128
    # begin data gathering
3129

    
3130
    nodes = frozenset([inst.primary_node for inst in instance_list])
3131
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3132

    
3133
    bad_nodes = []
3134
    off_nodes = []
3135
    if self.do_locking:
3136
      live_data = {}
3137
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3138
      for name in nodes:
3139
        result = node_data[name]
3140
        if result.offline:
3141
          # offline nodes will be in both lists
3142
          off_nodes.append(name)
3143
        if result.failed:
3144
          bad_nodes.append(name)
3145
        else:
3146
          if result.data:
3147
            live_data.update(result.data)
3148
            # else no instance is alive
3149
    else:
3150
      live_data = dict([(name, {}) for name in instance_names])
3151

    
3152
    # end data gathering
3153

    
3154
    HVPREFIX = "hv/"
3155
    BEPREFIX = "be/"
3156
    output = []
3157
    for instance in instance_list:
3158
      iout = []
3159
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3160
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3161
      for field in self.op.output_fields:
3162
        st_match = self._FIELDS_STATIC.Matches(field)
3163
        if field == "name":
3164
          val = instance.name
3165
        elif field == "os":
3166
          val = instance.os
3167
        elif field == "pnode":
3168
          val = instance.primary_node
3169
        elif field == "snodes":
3170
          val = list(instance.secondary_nodes)
3171
        elif field == "admin_state":
3172
          val = (instance.status != "down")
3173
        elif field == "oper_state":
3174
          if instance.primary_node in bad_nodes:
3175
            val = None
3176
          else:
3177
            val = bool(live_data.get(instance.name))
3178
        elif field == "status":
3179
          if instance.primary_node in off_nodes:
3180
            val = "ERROR_nodeoffline"
3181
          elif instance.primary_node in bad_nodes:
3182
            val = "ERROR_nodedown"
3183
          else:
3184
            running = bool(live_data.get(instance.name))
3185
            if running:
3186
              if instance.status != "down":
3187
                val = "running"
3188
              else:
3189
                val = "ERROR_up"
3190
            else:
3191
              if instance.status != "down":
3192
                val = "ERROR_down"
3193
              else:
3194
                val = "ADMIN_down"
3195
        elif field == "oper_ram":
3196
          if instance.primary_node in bad_nodes:
3197
            val = None
3198
          elif instance.name in live_data:
3199
            val = live_data[instance.name].get("memory", "?")
3200
          else:
3201
            val = "-"
3202
        elif field == "disk_template":
3203
          val = instance.disk_template
3204
        elif field == "ip":
3205
          val = instance.nics[0].ip
3206
        elif field == "bridge":
3207
          val = instance.nics[0].bridge
3208
        elif field == "mac":
3209
          val = instance.nics[0].mac
3210
        elif field == "sda_size" or field == "sdb_size":
3211
          idx = ord(field[2]) - ord('a')
3212
          try:
3213
            val = instance.FindDisk(idx).size
3214
          except errors.OpPrereqError:
3215
            val = None
3216
        elif field == "tags":
3217
          val = list(instance.GetTags())
3218
        elif field == "serial_no":
3219
          val = instance.serial_no
3220
        elif field == "network_port":
3221
          val = instance.network_port
3222
        elif field == "hypervisor":
3223
          val = instance.hypervisor
3224
        elif field == "hvparams":
3225
          val = i_hv
3226
        elif (field.startswith(HVPREFIX) and
3227
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3228
          val = i_hv.get(field[len(HVPREFIX):], None)
3229
        elif field == "beparams":
3230
          val = i_be
3231
        elif (field.startswith(BEPREFIX) and
3232
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3233
          val = i_be.get(field[len(BEPREFIX):], None)
3234
        elif st_match and st_match.groups():
3235
          # matches a variable list
3236
          st_groups = st_match.groups()
3237
          if st_groups and st_groups[0] == "disk":
3238
            if st_groups[1] == "count":
3239
              val = len(instance.disks)
3240
            elif st_groups[1] == "sizes":
3241
              val = [disk.size for disk in instance.disks]
3242
            elif st_groups[1] == "size":
3243
              try:
3244
                val = instance.FindDisk(st_groups[2]).size
3245
              except errors.OpPrereqError:
3246
                val = None
3247
            else:
3248
              assert False, "Unhandled disk parameter"
3249
          elif st_groups[0] == "nic":
3250
            if st_groups[1] == "count":
3251
              val = len(instance.nics)
3252
            elif st_groups[1] == "macs":
3253
              val = [nic.mac for nic in instance.nics]
3254
            elif st_groups[1] == "ips":
3255
              val = [nic.ip for nic in instance.nics]
3256
            elif st_groups[1] == "bridges":
3257
              val = [nic.bridge for nic in instance.nics]
3258
            else:
3259
              # index-based item
3260
              nic_idx = int(st_groups[2])
3261
              if nic_idx >= len(instance.nics):
3262
                val = None
3263
              else:
3264
                if st_groups[1] == "mac":
3265
                  val = instance.nics[nic_idx].mac
3266
                elif st_groups[1] == "ip":
3267
                  val = instance.nics[nic_idx].ip
3268
                elif st_groups[1] == "bridge":
3269
                  val = instance.nics[nic_idx].bridge
3270
                else:
3271
                  assert False, "Unhandled NIC parameter"
3272
          else:
3273
            assert False, "Unhandled variable parameter"
3274
        else:
3275
          raise errors.ParameterError(field)
3276
        iout.append(val)
3277
      output.append(iout)
3278

    
3279
    return output
3280

    
3281

    
3282
class LUFailoverInstance(LogicalUnit):
3283
  """Failover an instance.
3284

3285
  """
3286
  HPATH = "instance-failover"
3287
  HTYPE = constants.HTYPE_INSTANCE
3288
  _OP_REQP = ["instance_name", "ignore_consistency"]
3289
  REQ_BGL = False
3290

    
3291
  def ExpandNames(self):
3292
    self._ExpandAndLockInstance()
3293
    self.needed_locks[locking.LEVEL_NODE] = []
3294
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3295

    
3296
  def DeclareLocks(self, level):
3297
    if level == locking.LEVEL_NODE:
3298
      self._LockInstancesNodes()
3299

    
3300
  def BuildHooksEnv(self):
3301
    """Build hooks env.
3302

3303
    This runs on master, primary and secondary nodes of the instance.
3304

3305
    """
3306
    env = {
3307
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3308
      }
3309
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3310
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3311
    return env, nl, nl
3312

    
3313
  def CheckPrereq(self):
3314
    """Check prerequisites.
3315

3316
    This checks that the instance is in the cluster.
3317

3318
    """
3319
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3320
    assert self.instance is not None, \
3321
      "Cannot retrieve locked instance %s" % self.op.instance_name
3322

    
3323
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3324
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3325
      raise errors.OpPrereqError("Instance's disk layout is not"
3326
                                 " network mirrored, cannot failover.")
3327

    
3328
    secondary_nodes = instance.secondary_nodes
3329
    if not secondary_nodes:
3330
      raise errors.ProgrammerError("no secondary node but using "
3331
                                   "a mirrored disk template")
3332

    
3333
    target_node = secondary_nodes[0]
3334
    _CheckNodeOnline(self, target_node)
3335
    # check memory requirements on the secondary node
3336
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3337
                         instance.name, bep[constants.BE_MEMORY],
3338
                         instance.hypervisor)
3339

    
3340
    # check bridge existance
3341
    brlist = [nic.bridge for nic in instance.nics]
3342
    result = self.rpc.call_bridges_exist(target_node, brlist)
3343
    result.Raise()
3344
    if not result.data:
3345
      raise errors.OpPrereqError("One or more target bridges %s does not"
3346
                                 " exist on destination node '%s'" %
3347
                                 (brlist, target_node))
3348

    
3349
  def Exec(self, feedback_fn):
3350
    """Failover an instance.
3351

3352
    The failover is done by shutting it down on its present node and
3353
    starting it on the secondary.
3354

3355
    """
3356
    instance = self.instance
3357

    
3358
    source_node = instance.primary_node
3359
    target_node = instance.secondary_nodes[0]
3360

    
3361
    feedback_fn("* checking disk consistency between source and target")
3362
    for dev in instance.disks:
3363
      # for drbd, these are drbd over lvm
3364
      if not _CheckDiskConsistency(self, dev, target_node, False):
3365
        if instance.status == "up" and not self.op.ignore_consistency:
3366
          raise errors.OpExecError("Disk %s is degraded on target node,"
3367
                                   " aborting failover." % dev.iv_name)
3368

    
3369
    feedback_fn("* shutting down instance on source node")
3370
    logging.info("Shutting down instance %s on node %s",
3371
                 instance.name, source_node)
3372

    
3373
    result = self.rpc.call_instance_shutdown(source_node, instance)
3374
    if result.failed or not result.data:
3375
      if self.op.ignore_consistency:
3376
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3377
                             " Proceeding"
3378
                             " anyway. Please make sure node %s is down",
3379
                             instance.name, source_node, source_node)
3380
      else:
3381
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3382
                                 (instance.name, source_node))
3383

    
3384
    feedback_fn("* deactivating the instance's disks on source node")
3385
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3386
      raise errors.OpExecError("Can't shut down the instance's disks.")
3387

    
3388
    instance.primary_node = target_node
3389
    # distribute new instance config to the other nodes
3390
    self.cfg.Update(instance)
3391

    
3392
    # Only start the instance if it's marked as up
3393
    if instance.status == "up":
3394
      feedback_fn("* activating the instance's disks on target node")
3395
      logging.info("Starting instance %s on node %s",
3396
                   instance.name, target_node)
3397

    
3398
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3399
                                               ignore_secondaries=True)
3400
      if not disks_ok:
3401
        _ShutdownInstanceDisks(self, instance)
3402
        raise errors.OpExecError("Can't activate the instance's disks")
3403

    
3404
      feedback_fn("* starting the instance on the target node")
3405
      result = self.rpc.call_instance_start(target_node, instance, None)
3406
      msg = result.RemoteFailMsg()
3407
      if msg:
3408
        _ShutdownInstanceDisks(self, instance)
3409
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3410
                                 (instance.name, target_node, msg))
3411

    
3412

    
3413
class LUMigrateInstance(LogicalUnit):
3414
  """Migrate an instance.
3415

3416
  This is migration without shutting down, compared to the failover,
3417
  which is done with shutdown.
3418

3419
  """
3420
  HPATH = "instance-migrate"
3421
  HTYPE = constants.HTYPE_INSTANCE
3422
  _OP_REQP = ["instance_name", "live", "cleanup"]
3423

    
3424
  REQ_BGL = False
3425

    
3426
  def ExpandNames(self):
3427
    self._ExpandAndLockInstance()
3428
    self.needed_locks[locking.LEVEL_NODE] = []
3429
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3430

    
3431
  def DeclareLocks(self, level):
3432
    if level == locking.LEVEL_NODE:
3433
      self._LockInstancesNodes()
3434

    
3435
  def BuildHooksEnv(self):
3436
    """Build hooks env.
3437

3438
    This runs on master, primary and secondary nodes of the instance.
3439

3440
    """
3441
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3442
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3443
    return env, nl, nl
3444

    
3445
  def CheckPrereq(self):
3446
    """Check prerequisites.
3447

3448
    This checks that the instance is in the cluster.
3449

3450
    """
3451
    instance = self.cfg.GetInstanceInfo(
3452
      self.cfg.ExpandInstanceName(self.op.instance_name))
3453
    if instance is None:
3454
      raise errors.OpPrereqError("Instance '%s' not known" %
3455
                                 self.op.instance_name)
3456

    
3457
    if instance.disk_template != constants.DT_DRBD8:
3458
      raise errors.OpPrereqError("Instance's disk layout is not"
3459
                                 " drbd8, cannot migrate.")
3460

    
3461
    secondary_nodes = instance.secondary_nodes
3462
    if not secondary_nodes:
3463
      raise errors.ProgrammerError("no secondary node but using "
3464
                                   "drbd8 disk template")
3465

    
3466
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3467

    
3468
    target_node = secondary_nodes[0]
3469
    # check memory requirements on the secondary node
3470
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3471
                         instance.name, i_be[constants.BE_MEMORY],
3472
                         instance.hypervisor)
3473

    
3474
    # check bridge existance
3475
    brlist = [nic.bridge for nic in instance.nics]
3476
    result = self.rpc.call_bridges_exist(target_node, brlist)
3477
    if result.failed or not result.data:
3478
      raise errors.OpPrereqError("One or more target bridges %s does not"
3479
                                 " exist on destination node '%s'" %
3480
                                 (brlist, target_node))
3481

    
3482
    if not self.op.cleanup:
3483
      result = self.rpc.call_instance_migratable(instance.primary_node,
3484
                                                 instance)
3485
      msg = result.RemoteFailMsg()
3486
      if msg:
3487
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3488
                                   msg)
3489

    
3490
    self.instance = instance
3491

    
3492
  def _WaitUntilSync(self):
3493
    """Poll with custom rpc for disk sync.
3494

3495
    This uses our own step-based rpc call.
3496

3497
    """
3498
    self.feedback_fn("* wait until resync is done")
3499
    all_done = False
3500
    while not all_done:
3501
      all_done = True
3502
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3503
                                            self.nodes_ip,
3504
                                            self.instance.disks)
3505
      min_percent = 100
3506
      for node, nres in result.items():
3507
        msg = nres.RemoteFailMsg()
3508
        if msg:
3509
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3510
                                   (node, msg))
3511
        node_done, node_percent = nres.data[1]
3512
        all_done = all_done and node_done
3513
        if node_percent is not None:
3514
          min_percent = min(min_percent, node_percent)
3515
      if not all_done:
3516
        if min_percent < 100:
3517
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3518
        time.sleep(2)
3519

    
3520
  def _EnsureSecondary(self, node):
3521
    """Demote a node to secondary.
3522

3523
    """
3524
    self.feedback_fn("* switching node %s to secondary mode" % node)
3525

    
3526
    for dev in self.instance.disks:
3527
      self.cfg.SetDiskID(dev, node)
3528

    
3529
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3530
                                          self.instance.disks)
3531
    msg = result.RemoteFailMsg()
3532
    if msg:
3533
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3534
                               " error %s" % (node, msg))
3535

    
3536
  def _GoStandalone(self):
3537
    """Disconnect from the network.
3538

3539
    """
3540
    self.feedback_fn("* changing into standalone mode")
3541
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3542
                                               self.instance.disks)
3543
    for node, nres in result.items():
3544
      msg = nres.RemoteFailMsg()
3545
      if msg:
3546
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3547
                                 " error %s" % (node, msg))
3548

    
3549
  def _GoReconnect(self, multimaster):
3550
    """Reconnect to the network.
3551

3552
    """
3553
    if multimaster:
3554
      msg = "dual-master"
3555
    else:
3556
      msg = "single-master"
3557
    self.feedback_fn("* changing disks into %s mode" % msg)
3558
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3559
                                           self.instance.disks,
3560
                                           self.instance.name, multimaster)
3561
    for node, nres in result.items():
3562
      msg = nres.RemoteFailMsg()
3563
      if msg:
3564
        raise errors.OpExecError("Cannot change disks config on node %s,"
3565
                                 " error: %s" % (node, msg))
3566

    
3567
  def _ExecCleanup(self):
3568
    """Try to cleanup after a failed migration.
3569

3570
    The cleanup is done by:
3571
      - check that the instance is running only on one node
3572
        (and update the config if needed)
3573
      - change disks on its secondary node to secondary
3574
      - wait until disks are fully synchronized
3575
      - disconnect from the network
3576
      - change disks into single-master mode
3577
      - wait again until disks are fully synchronized
3578

3579
    """
3580
    instance = self.instance
3581
    target_node = self.target_node
3582
    source_node = self.source_node
3583

    
3584
    # check running on only one node
3585
    self.feedback_fn("* checking where the instance actually runs"
3586
                     " (if this hangs, the hypervisor might be in"
3587
                     " a bad state)")
3588
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3589
    for node, result in ins_l.items():
3590
      result.Raise()
3591
      if not isinstance(result.data, list):
3592
        raise errors.OpExecError("Can't contact node '%s'" % node)
3593

    
3594
    runningon_source = instance.name in ins_l[source_node].data
3595
    runningon_target = instance.name in ins_l[target_node].data
3596

    
3597
    if runningon_source and runningon_target:
3598
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3599
                               " or the hypervisor is confused. You will have"
3600
                               " to ensure manually that it runs only on one"
3601
                               " and restart this operation.")
3602

    
3603
    if not (runningon_source or runningon_target):
3604
      raise errors.OpExecError("Instance does not seem to be running at all."
3605
                               " In this case, it's safer to repair by"
3606
                               " running 'gnt-instance stop' to ensure disk"
3607
                               " shutdown, and then restarting it.")
3608

    
3609
    if runningon_target:
3610
      # the migration has actually succeeded, we need to update the config
3611
      self.feedback_fn("* instance running on secondary node (%s),"
3612
                       " updating config" % target_node)
3613
      instance.primary_node = target_node
3614
      self.cfg.Update(instance)
3615
      demoted_node = source_node
3616
    else:
3617
      self.feedback_fn("* instance confirmed to be running on its"
3618
                       " primary node (%s)" % source_node)
3619
      demoted_node = target_node
3620

    
3621
    self._EnsureSecondary(demoted_node)
3622
    try:
3623
      self._WaitUntilSync()
3624
    except errors.OpExecError:
3625
      # we ignore here errors, since if the device is standalone, it
3626
      # won't be able to sync
3627
      pass
3628
    self._GoStandalone()
3629
    self._GoReconnect(False)
3630
    self._WaitUntilSync()
3631

    
3632
    self.feedback_fn("* done")
3633

    
3634
  def _RevertDiskStatus(self):
3635
    """Try to revert the disk status after a failed migration.
3636

3637
    """
3638
    target_node = self.target_node
3639
    try:
3640
      self._EnsureSecondary(target_node)
3641
      self._GoStandalone()
3642
      self._GoReconnect(False)
3643
      self._WaitUntilSync()
3644
    except errors.OpExecError, err:
3645
      self.LogWarning("Migration failed and I can't reconnect the"
3646
                      " drives: error '%s'\n"
3647
                      "Please look and recover the instance status" %
3648
                      str(err))
3649

    
3650
  def _AbortMigration(self):
3651
    """Call the hypervisor code to abort a started migration.
3652

3653
    """
3654
    instance = self.instance
3655
    target_node = self.target_node
3656
    migration_info = self.migration_info
3657

    
3658
    abort_result = self.rpc.call_finalize_migration(target_node,
3659
                                                    instance,
3660
                                                    migration_info,
3661
                                                    False)
3662
    abort_msg = abort_result.RemoteFailMsg()
3663
    if abort_msg:
3664
      logging.error("Aborting migration failed on target node %s: %s" %
3665
                    (target_node, abort_msg))
3666
      # Don't raise an exception here, as we stil have to try to revert the
3667
      # disk status, even if this step failed.
3668

    
3669
  def _ExecMigration(self):
3670
    """Migrate an instance.
3671

3672
    The migrate is done by:
3673
      - change the disks into dual-master mode
3674
      - wait until disks are fully synchronized again
3675
      - migrate the instance
3676
      - change disks on the new secondary node (the old primary) to secondary
3677
      - wait until disks are fully synchronized
3678
      - change disks into single-master mode
3679

3680
    """
3681
    instance = self.instance
3682
    target_node = self.target_node
3683
    source_node = self.source_node
3684

    
3685
    self.feedback_fn("* checking disk consistency between source and target")
3686
    for dev in instance.disks:
3687
      if not _CheckDiskConsistency(self, dev, target_node, False):
3688
        raise errors.OpExecError("Disk %s is degraded or not fully"
3689
                                 " synchronized on target node,"
3690
                                 " aborting migrate." % dev.iv_name)
3691

    
3692
    # First get the migration information from the remote node
3693
    result = self.rpc.call_migration_info(source_node, instance)
3694
    msg = result.RemoteFailMsg()
3695
    if msg:
3696
      log_err = ("Failed fetching source migration information from %s: %s" %
3697
                  (source_node, msg))
3698
      logging.error(log_err)
3699
      raise errors.OpExecError(log_err)
3700

    
3701
    self.migration_info = migration_info = result.data[1]
3702

    
3703
    # Then switch the disks to master/master mode
3704
    self._EnsureSecondary(target_node)
3705
    self._GoStandalone()
3706
    self._GoReconnect(True)
3707
    self._WaitUntilSync()
3708

    
3709
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3710
    result = self.rpc.call_accept_instance(target_node,
3711
                                           instance,
3712
                                           migration_info,
3713
                                           self.nodes_ip[target_node])
3714

    
3715
    msg = result.RemoteFailMsg()
3716
    if msg:
3717
      logging.error("Instance pre-migration failed, trying to revert"
3718
                    " disk status: %s", msg)
3719
      self._AbortMigration()
3720
      self._RevertDiskStatus()
3721
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3722
                               (instance.name, msg))
3723

    
3724
    self.feedback_fn("* migrating instance to %s" % target_node)
3725
    time.sleep(10)
3726
    result = self.rpc.call_instance_migrate(source_node, instance,
3727
                                            self.nodes_ip[target_node],
3728
                                            self.op.live)
3729
    msg = result.RemoteFailMsg()
3730
    if msg:
3731
      logging.error("Instance migration failed, trying to revert"
3732
                    " disk status: %s", msg)
3733
      self._AbortMigration()
3734
      self._RevertDiskStatus()
3735
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3736
                               (instance.name, msg))
3737
    time.sleep(10)
3738

    
3739
    instance.primary_node = target_node
3740
    # distribute new instance config to the other nodes
3741
    self.cfg.Update(instance)
3742

    
3743
    result = self.rpc.call_finalize_migration(target_node,
3744
                                              instance,
3745
                                              migration_info,
3746
                                              True)
3747
    msg = result.RemoteFailMsg()
3748
    if msg:
3749
      logging.error("Instance migration succeeded, but finalization failed:"
3750
                    " %s" % msg)
3751
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3752
                               msg)
3753

    
3754
    self._EnsureSecondary(source_node)
3755
    self._WaitUntilSync()
3756
    self._GoStandalone()
3757
    self._GoReconnect(False)
3758
    self._WaitUntilSync()
3759

    
3760
    self.feedback_fn("* done")
3761

    
3762
  def Exec(self, feedback_fn):
3763
    """Perform the migration.
3764

3765
    """
3766
    self.feedback_fn = feedback_fn
3767

    
3768
    self.source_node = self.instance.primary_node
3769
    self.target_node = self.instance.secondary_nodes[0]
3770
    self.all_nodes = [self.source_node, self.target_node]
3771
    self.nodes_ip = {
3772
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3773
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3774
      }
3775
    if self.op.cleanup:
3776
      return self._ExecCleanup()
3777
    else:
3778
      return self._ExecMigration()
3779

    
3780

    
3781
def _CreateBlockDev(lu, node, instance, device, force_create,
3782
                    info, force_open):
3783
  """Create a tree of block devices on a given node.
3784

3785
  If this device type has to be created on secondaries, create it and
3786
  all its children.
3787

3788
  If not, just recurse to children keeping the same 'force' value.
3789

3790
  @param lu: the lu on whose behalf we execute
3791
  @param node: the node on which to create the device
3792
  @type instance: L{objects.Instance}
3793
  @param instance: the instance which owns the device
3794
  @type device: L{objects.Disk}
3795
  @param device: the device to create
3796
  @type force_create: boolean
3797
  @param force_create: whether to force creation of this device; this
3798
      will be change to True whenever we find a device which has
3799
      CreateOnSecondary() attribute
3800
  @param info: the extra 'metadata' we should attach to the device
3801
      (this will be represented as a LVM tag)
3802
  @type force_open: boolean
3803
  @param force_open: this parameter will be passes to the
3804
      L{backend.CreateBlockDevice} function where it specifies
3805
      whether we run on primary or not, and it affects both
3806
      the child assembly and the device own Open() execution
3807

3808
  """
3809
  if device.CreateOnSecondary():
3810
    force_create = True
3811

    
3812
  if device.children:
3813
    for child in device.children:
3814
      _CreateBlockDev(lu, node, instance, child, force_create,
3815
                      info, force_open)
3816

    
3817
  if not force_create:
3818
    return
3819

    
3820
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3821

    
3822

    
3823
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3824
  """Create a single block device on a given node.
3825

3826
  This will not recurse over children of the device, so they must be
3827
  created in advance.
3828

3829
  @param lu: the lu on whose behalf we execute
3830
  @param node: the node on which to create the device
3831
  @type instance: L{objects.Instance}
3832
  @param instance: the instance which owns the device
3833
  @type device: L{objects.Disk}
3834
  @param device: the device to create
3835
  @param info: the extra 'metadata' we should attach to the device
3836
      (this will be represented as a LVM tag)
3837
  @type force_open: boolean
3838
  @param force_open: this parameter will be passes to the
3839
      L{backend.CreateBlockDevice} function where it specifies
3840
      whether we run on primary or not, and it affects both
3841
      the child assembly and the device own Open() execution
3842

3843
  """
3844
  lu.cfg.SetDiskID(device, node)
3845
  result = lu.rpc.call_blockdev_create(node, device, device.size,
3846
                                       instance.name, force_open, info)
3847
  msg = result.RemoteFailMsg()
3848
  if msg:
3849
    raise errors.OpExecError("Can't create block device %s on"
3850
                             " node %s for instance %s: %s" %
3851
                             (device, node, instance.name, msg))
3852
  if device.physical_id is None:
3853
    device.physical_id = result.data[1]
3854

    
3855

    
3856
def _GenerateUniqueNames(lu, exts):
3857
  """Generate a suitable LV name.
3858

3859
  This will generate a logical volume name for the given instance.
3860

3861
  """
3862
  results = []
3863
  for val in exts:
3864
    new_id = lu.cfg.GenerateUniqueID()
3865
    results.append("%s%s" % (new_id, val))
3866
  return results
3867

    
3868

    
3869
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3870
                         p_minor, s_minor):
3871
  """Generate a drbd8 device complete with its children.
3872

3873
  """
3874
  port = lu.cfg.AllocatePort()
3875
  vgname = lu.cfg.GetVGName()
3876
  shared_secret = lu.cfg.GenerateDRBDSecret()
3877
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3878
                          logical_id=(vgname, names[0]))
3879
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3880
                          logical_id=(vgname, names[1]))
3881
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3882
                          logical_id=(primary, secondary, port,
3883
                                      p_minor, s_minor,
3884
                                      shared_secret),
3885
                          children=[dev_data, dev_meta],
3886
                          iv_name=iv_name)
3887
  return drbd_dev
3888

    
3889

    
3890
def _GenerateDiskTemplate(lu, template_name,
3891
                          instance_name, primary_node,
3892
                          secondary_nodes, disk_info,
3893
                          file_storage_dir, file_driver,
3894
                          base_index):
3895
  """Generate the entire disk layout for a given template type.
3896

3897
  """
3898
  #TODO: compute space requirements
3899

    
3900
  vgname = lu.cfg.GetVGName()
3901
  disk_count = len(disk_info)
3902
  disks = []
3903
  if template_name == constants.DT_DISKLESS:
3904
    pass
3905
  elif template_name == constants.DT_PLAIN:
3906
    if len(secondary_nodes) != 0:
3907
      raise errors.ProgrammerError("Wrong template configuration")
3908

    
3909
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3910
                                      for i in range(disk_count)])
3911
    for idx, disk in enumerate(disk_info):
3912
      disk_index = idx + base_index
3913
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3914
                              logical_id=(vgname, names[idx]),
3915
                              iv_name="disk/%d" % disk_index)
3916
      disks.append(disk_dev)
3917
  elif template_name == constants.DT_DRBD8:
3918
    if len(secondary_nodes) != 1:
3919
      raise errors.ProgrammerError("Wrong template configuration")
3920
    remote_node = secondary_nodes[0]
3921
    minors = lu.cfg.AllocateDRBDMinor(
3922
      [primary_node, remote_node] * len(disk_info), instance_name)
3923

    
3924
    names = []
3925
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
3926
                                               for i in range(disk_count)]):
3927
      names.append(lv_prefix + "_data")
3928
      names.append(lv_prefix + "_meta")
3929
    for idx, disk in enumerate(disk_info):
3930
      disk_index = idx + base_index
3931
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3932
                                      disk["size"], names[idx*2:idx*2+2],
3933
                                      "disk/%d" % disk_index,
3934
                                      minors[idx*2], minors[idx*2+1])
3935
      disks.append(disk_dev)
3936
  elif template_name == constants.DT_FILE:
3937
    if len(secondary_nodes) != 0:
3938
      raise errors.ProgrammerError("Wrong template configuration")
3939

    
3940
    for idx, disk in enumerate(disk_info):
3941
      disk_index = idx + base_index
3942
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3943
                              iv_name="disk/%d" % disk_index,
3944
                              logical_id=(file_driver,
3945
                                          "%s/disk%d" % (file_storage_dir,
3946
                                                         idx)))
3947
      disks.append(disk_dev)
3948
  else:
3949
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3950
  return disks
3951

    
3952

    
3953
def _GetInstanceInfoText(instance):
3954
  """Compute that text that should be added to the disk's metadata.
3955

3956
  """
3957
  return "originstname+%s" % instance.name
3958

    
3959

    
3960
def _CreateDisks(lu, instance):
3961
  """Create all disks for an instance.
3962

3963
  This abstracts away some work from AddInstance.
3964

3965
  @type lu: L{LogicalUnit}
3966
  @param lu: the logical unit on whose behalf we execute
3967
  @type instance: L{objects.Instance}
3968
  @param instance: the instance whose disks we should create
3969
  @rtype: boolean
3970
  @return: the success of the creation
3971

3972
  """
3973
  info = _GetInstanceInfoText(instance)
3974
  pnode = instance.primary_node
3975

    
3976
  if instance.disk_template == constants.DT_FILE:
3977
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3978
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
3979

    
3980
    if result.failed or not result.data:
3981
      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
3982

    
3983
    if not result.data[0]:
3984
      raise errors.OpExecError("Failed to create directory '%s'" %
3985
                               file_storage_dir)
3986

    
3987
  # Note: this needs to be kept in sync with adding of disks in
3988
  # LUSetInstanceParams
3989
  for device in instance.disks:
3990
    logging.info("Creating volume %s for instance %s",
3991
                 device.iv_name, instance.name)
3992
    #HARDCODE
3993
    for node in instance.all_nodes:
3994
      f_create = node == pnode
3995
      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
3996

    
3997

    
3998
def _RemoveDisks(lu, instance):
3999
  """Remove all disks for an instance.
4000

4001
  This abstracts away some work from `AddInstance()` and
4002
  `RemoveInstance()`. Note that in case some of the devices couldn't
4003
  be removed, the removal will continue with the other ones (compare
4004
  with `_CreateDisks()`).
4005

4006
  @type lu: L{LogicalUnit}
4007
  @param lu: the logical unit on whose behalf we execute
4008
  @type instance: L{objects.Instance}
4009
  @param instance: the instance whose disks we should remove
4010
  @rtype: boolean
4011
  @return: the success of the removal
4012

4013
  """
4014
  logging.info("Removing block devices for instance %s", instance.name)
4015

    
4016
  result = True
4017
  for device in instance.disks:
4018
    for node, disk in device.ComputeNodeTree(instance.primary_node):
4019
      lu.cfg.SetDiskID(disk, node)
4020
      result = lu.rpc.call_blockdev_remove(node, disk)
4021
      if result.failed or not result.data:
4022
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
4023
                           " continuing anyway", device.iv_name, node)
4024
        result = False
4025

    
4026
  if instance.disk_template == constants.DT_FILE:
4027
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4028
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4029
                                                 file_storage_dir)
4030
    if result.failed or not result.data:
4031
      logging.error("Could not remove directory '%s'", file_storage_dir)
4032
      result = False
4033

    
4034
  return result
4035

    
4036

    
4037
def _ComputeDiskSize(disk_template, disks):
4038
  """Compute disk size requirements in the volume group
4039

4040
  """
4041
  # Required free disk space as a function of disk and swap space
4042
  req_size_dict = {
4043
    constants.DT_DISKLESS: None,
4044
    constants.DT_PLAIN: sum(d["size"] for d in disks),
4045
    # 128 MB are added for drbd metadata for each disk
4046
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4047
    constants.DT_FILE: None,
4048
  }
4049

    
4050
  if disk_template not in req_size_dict:
4051
    raise errors.ProgrammerError("Disk template '%s' size requirement"
4052
                                 " is unknown" %  disk_template)
4053

    
4054
  return req_size_dict[disk_template]
4055

    
4056

    
4057
def _CheckHVParams(lu, nodenames, hvname, hvparams):
4058
  """Hypervisor parameter validation.
4059

4060
  This function abstract the hypervisor parameter validation to be
4061
  used in both instance create and instance modify.
4062

4063
  @type lu: L{LogicalUnit}
4064
  @param lu: the logical unit for which we check
4065
  @type nodenames: list
4066
  @param nodenames: the list of nodes on which we should check
4067
  @type hvname: string
4068
  @param hvname: the name of the hypervisor we should use
4069
  @type hvparams: dict
4070
  @param hvparams: the parameters which we need to check
4071
  @raise errors.OpPrereqError: if the parameters are not valid
4072

4073
  """
4074
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4075
                                                  hvname,
4076
                                                  hvparams)
4077
  for node in nodenames:
4078
    info = hvinfo[node]
4079
    info.Raise()
4080
    if not info.data or not isinstance(info.data, (tuple, list)):
4081
      raise errors.OpPrereqError("Cannot get current information"
4082
                                 " from node '%s' (%s)" % (node, info.data))
4083
    if not info.data[0]:
4084
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4085
                                 " %s" % info.data[1])
4086

    
4087

    
4088
class LUCreateInstance(LogicalUnit):
4089
  """Create an instance.
4090

4091
  """
4092
  HPATH = "instance-add"
4093
  HTYPE = constants.HTYPE_INSTANCE
4094
  _OP_REQP = ["instance_name", "disks", "disk_template",
4095
              "mode", "start",
4096
              "wait_for_sync", "ip_check", "nics",
4097
              "hvparams", "beparams"]
4098
  REQ_BGL = False
4099

    
4100
  def _ExpandNode(self, node):
4101
    """Expands and checks one node name.
4102

4103
    """
4104
    node_full = self.cfg.ExpandNodeName(node)
4105
    if node_full is None:
4106
      raise errors.OpPrereqError("Unknown node %s" % node)
4107
    return node_full
4108

    
4109
  def ExpandNames(self):
4110
    """ExpandNames for CreateInstance.
4111

4112
    Figure out the right locks for instance creation.
4113

4114
    """
4115
    self.needed_locks = {}
4116

    
4117
    # set optional parameters to none if they don't exist
4118
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4119
      if not hasattr(self.op, attr):
4120
        setattr(self.op, attr, None)
4121

    
4122
    # cheap checks, mostly valid constants given
4123

    
4124
    # verify creation mode
4125
    if self.op.mode not in (constants.INSTANCE_CREATE,
4126
                            constants.INSTANCE_IMPORT):
4127
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4128
                                 self.op.mode)
4129

    
4130
    # disk template and mirror node verification
4131
    if self.op.disk_template not in constants.DISK_TEMPLATES:
4132
      raise errors.OpPrereqError("Invalid disk template name")
4133

    
4134
    if self.op.hypervisor is None:
4135
      self.op.hypervisor = self.cfg.GetHypervisorType()
4136

    
4137
    cluster = self.cfg.GetClusterInfo()
4138
    enabled_hvs = cluster.enabled_hypervisors
4139
    if self.op.hypervisor not in enabled_hvs:
4140
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4141
                                 " cluster (%s)" % (self.op.hypervisor,
4142
                                  ",".join(enabled_hvs)))
4143

    
4144
    # check hypervisor parameter syntax (locally)
4145

    
4146
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4147
                                  self.op.hvparams)
4148
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4149
    hv_type.CheckParameterSyntax(filled_hvp)
4150

    
4151
    # fill and remember the beparams dict
4152
    utils.CheckBEParams(self.op.beparams)
4153
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4154
                                    self.op.beparams)
4155

    
4156
    #### instance parameters check
4157

    
4158
    # instance name verification
4159
    hostname1 = utils.HostInfo(self.op.instance_name)
4160
    self.op.instance_name = instance_name = hostname1.name
4161

    
4162
    # this is just a preventive check, but someone might still add this
4163
    # instance in the meantime, and creation will fail at lock-add time
4164
    if instance_name in self.cfg.GetInstanceList():
4165
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4166
                                 instance_name)
4167

    
4168
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4169

    
4170
    # NIC buildup
4171
    self.nics = []
4172
    for nic in self.op.nics:
4173
      # ip validity checks
4174
      ip = nic.get("ip", None)
4175
      if ip is None or ip.lower() == "none":
4176
        nic_ip = None
4177
      elif ip.lower() == constants.VALUE_AUTO:
4178
        nic_ip = hostname1.ip
4179
      else:
4180
        if not utils.IsValidIP(ip):
4181
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4182
                                     " like a valid IP" % ip)
4183
        nic_ip = ip
4184

    
4185
      # MAC address verification
4186
      mac = nic.get("mac", constants.VALUE_AUTO)
4187
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4188
        if not utils.IsValidMac(mac.lower()):
4189
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4190
                                     mac)
4191
      # bridge verification
4192
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
4193
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4194

    
4195
    # disk checks/pre-build
4196
    self.disks = []
4197
    for disk in self.op.disks:
4198
      mode = disk.get("mode", constants.DISK_RDWR)
4199
      if mode not in constants.DISK_ACCESS_SET:
4200
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4201
                                   mode)
4202
      size = disk.get("size", None)
4203
      if size is None:
4204
        raise errors.OpPrereqError("Missing disk size")
4205
      try:
4206
        size = int(size)
4207
      except ValueError:
4208
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4209
      self.disks.append({"size": size, "mode": mode})
4210

    
4211
    # used in CheckPrereq for ip ping check
4212
    self.check_ip = hostname1.ip
4213

    
4214
    # file storage checks
4215
    if (self.op.file_driver and
4216
        not self.op.file_driver in constants.FILE_DRIVER):
4217
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
4218
                                 self.op.file_driver)
4219

    
4220
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4221
      raise errors.OpPrereqError("File storage directory path not absolute")
4222

    
4223
    ### Node/iallocator related checks
4224
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
4225
      raise errors.OpPrereqError("One and only one of iallocator and primary"
4226
                                 " node must be given")
4227

    
4228
    if self.op.iallocator:
4229
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4230
    else:
4231
      self.op.pnode = self._ExpandNode(self.op.pnode)
4232
      nodelist = [self.op.pnode]
4233
      if self.op.snode is not None:
4234
        self.op.snode = self._ExpandNode(self.op.snode)
4235
        nodelist.append(self.op.snode)
4236
      self.needed_locks[locking.LEVEL_NODE] = nodelist
4237

    
4238
    # in case of import lock the source node too
4239
    if self.op.mode == constants.INSTANCE_IMPORT:
4240
      src_node = getattr(self.op, "src_node", None)
4241
      src_path = getattr(self.op, "src_path", None)
4242

    
4243
      if src_path is None:
4244
        self.op.src_path = src_path = self.op.instance_name
4245

    
4246
      if src_node is None:
4247
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4248
        self.op.src_node = None
4249
        if os.path.isabs(src_path):
4250
          raise errors.OpPrereqError("Importing an instance from an absolute"
4251
                                     " path requires a source node option.")
4252
      else:
4253
        self.op.src_node = src_node = self._ExpandNode(src_node)
4254
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4255
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
4256
        if not os.path.isabs(src_path):
4257
          self.op.src_path = src_path = \
4258
            os.path.join(constants.EXPORT_DIR, src_path)
4259

    
4260
    else: # INSTANCE_CREATE
4261
      if getattr(self.op, "os_type", None) is None:
4262
        raise errors.OpPrereqError("No guest OS specified")
4263

    
4264
  def _RunAllocator(self):
4265
    """Run the allocator based on input opcode.
4266

4267
    """
4268
    nics = [n.ToDict() for n in self.nics]
4269
    ial = IAllocator(self,
4270
                     mode=constants.IALLOCATOR_MODE_ALLOC,
4271
                     name=self.op.instance_name,
4272
                     disk_template=self.op.disk_template,
4273
                     tags=[],
4274
                     os=self.op.os_type,
4275
                     vcpus=self.be_full[constants.BE_VCPUS],
4276
                     mem_size=self.be_full[constants.BE_MEMORY],
4277
                     disks=self.disks,
4278
                     nics=nics,
4279
                     hypervisor=self.op.hypervisor,
4280
                     )
4281

    
4282
    ial.Run(self.op.iallocator)
4283

    
4284
    if not ial.success:
4285
      raise errors.OpPrereqError("Can't compute nodes using"
4286
                                 " iallocator '%s': %s" % (self.op.iallocator,
4287
                                                           ial.info))
4288
    if len(ial.nodes) != ial.required_nodes:
4289
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4290
                                 " of nodes (%s), required %s" %
4291
                                 (self.op.iallocator, len(ial.nodes),
4292
                                  ial.required_nodes))
4293
    self.op.pnode = ial.nodes[0]
4294
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4295
                 self.op.instance_name, self.op.iallocator,
4296
                 ", ".join(ial.nodes))
4297
    if ial.required_nodes == 2:
4298
      self.op.snode = ial.nodes[1]
4299

    
4300
  def BuildHooksEnv(self):
4301
    """Build hooks env.
4302

4303
    This runs on master, primary and secondary nodes of the instance.
4304

4305
    """
4306
    env = {
4307
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4308
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4309
      "INSTANCE_ADD_MODE": self.op.mode,
4310
      }
4311
    if self.op.mode == constants.INSTANCE_IMPORT:
4312
      env["INSTANCE_SRC_NODE"] = self.op.src_node
4313
      env["INSTANCE_SRC_PATH"] = self.op.src_path
4314
      env["INSTANCE_SRC_IMAGES"] = self.src_images
4315

    
4316
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4317
      primary_node=self.op.pnode,
4318
      secondary_nodes=self.secondaries,
4319
      status=self.instance_status,
4320
      os_type=self.op.os_type,
4321
      memory=self.be_full[constants.BE_MEMORY],
4322
      vcpus=self.be_full[constants.BE_VCPUS],
4323
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4324
    ))
4325

    
4326
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4327
          self.secondaries)
4328
    return env, nl, nl
4329

    
4330

    
4331
  def CheckPrereq(self):
4332
    """Check prerequisites.
4333

4334
    """
4335
    if (not self.cfg.GetVGName() and
4336
        self.op.disk_template not in constants.DTS_NOT_LVM):
4337
      raise errors.OpPrereqError("Cluster does not support lvm-based"
4338
                                 " instances")
4339

    
4340

    
4341
    if self.op.mode == constants.INSTANCE_IMPORT:
4342
      src_node = self.op.src_node
4343
      src_path = self.op.src_path
4344

    
4345
      if src_node is None:
4346
        exp_list = self.rpc.call_export_list(
4347
          self.acquired_locks[locking.LEVEL_NODE])
4348
        found = False
4349
        for node in exp_list:
4350
          if not exp_list[node].failed and src_path in exp_list[node].data:
4351
            found = True
4352
            self.op.src_node = src_node = node
4353
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4354
                                                       src_path)
4355
            break
4356
        if not found:
4357
          raise errors.OpPrereqError("No export found for relative path %s" %
4358
                                      src_path)
4359

    
4360
      _CheckNodeOnline(self, src_node)
4361
      result = self.rpc.call_export_info(src_node, src_path)
4362
      result.Raise()
4363
      if not result.data:
4364
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
4365

    
4366
      export_info = result.data
4367
      if not export_info.has_section(constants.INISECT_EXP):
4368
        raise errors.ProgrammerError("Corrupted export config")
4369

    
4370
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
4371
      if (int(ei_version) != constants.EXPORT_VERSION):
4372
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4373
                                   (ei_version, constants.EXPORT_VERSION))
4374

    
4375
      # Check that the new instance doesn't have less disks than the export
4376
      instance_disks = len(self.disks)
4377
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4378
      if instance_disks < export_disks:
4379
        raise errors.OpPrereqError("Not enough disks to import."
4380
                                   " (instance: %d, export: %d)" %
4381
                                   (instance_disks, export_disks))
4382

    
4383
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4384
      disk_images = []
4385
      for idx in range(export_disks):
4386
        option = 'disk%d_dump' % idx
4387
        if export_info.has_option(constants.INISECT_INS, option):
4388
          # FIXME: are the old os-es, disk sizes, etc. useful?
4389
          export_name = export_info.get(constants.INISECT_INS, option)
4390
          image = os.path.join(src_path, export_name)
4391
          disk_images.append(image)
4392
        else:
4393
          disk_images.append(False)
4394

    
4395
      self.src_images = disk_images
4396

    
4397
      old_name = export_info.get(constants.INISECT_INS, 'name')
4398
      # FIXME: int() here could throw a ValueError on broken exports
4399
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4400
      if self.op.instance_name == old_name:
4401
        for idx, nic in enumerate(self.nics):
4402
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4403
            nic_mac_ini = 'nic%d_mac' % idx
4404
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4405

    
4406
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
4407
    if self.op.start and not self.op.ip_check:
4408
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4409
                                 " adding an instance in start mode")
4410

    
4411
    if self.op.ip_check:
4412
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4413
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4414
                                   (self.check_ip, self.op.instance_name))
4415

    
4416
    #### allocator run
4417

    
4418
    if self.op.iallocator is not None:
4419
      self._RunAllocator()
4420

    
4421
    #### node related checks
4422

    
4423
    # check primary node
4424
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4425
    assert self.pnode is not None, \
4426
      "Cannot retrieve locked node %s" % self.op.pnode
4427
    if pnode.offline:
4428
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4429
                                 pnode.name)
4430

    
4431
    self.secondaries = []
4432

    
4433
    # mirror node verification
4434
    if self.op.disk_template in constants.DTS_NET_MIRROR:
4435
      if self.op.snode is None:
4436
        raise errors.OpPrereqError("The networked disk templates need"
4437
                                   " a mirror node")
4438
      if self.op.snode == pnode.name:
4439
        raise errors.OpPrereqError("The secondary node cannot be"
4440
                                   " the primary node.")
4441
      self.secondaries.append(self.op.snode)
4442
      _CheckNodeOnline(self, self.op.snode)
4443

    
4444
    nodenames = [pnode.name] + self.secondaries
4445

    
4446
    req_size = _ComputeDiskSize(self.op.disk_template,
4447
                                self.disks)
4448

    
4449
    # Check lv size requirements
4450
    if req_size is not None:
4451
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4452
                                         self.op.hypervisor)
4453
      for node in nodenames:
4454
        info = nodeinfo[node]
4455
        info.Raise()
4456
        info = info.data
4457
        if not info:
4458
          raise errors.OpPrereqError("Cannot get current information"
4459
                                     " from node '%s'" % node)
4460
        vg_free = info.get('vg_free', None)
4461
        if not isinstance(vg_free, int):
4462
          raise errors.OpPrereqError("Can't compute free disk space on"
4463
                                     " node %s" % node)
4464
        if req_size > info['vg_free']:
4465
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4466
                                     " %d MB available, %d MB required" %
4467
                                     (node, info['vg_free'], req_size))
4468

    
4469
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4470

    
4471
    # os verification
4472
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4473
    result.Raise()
4474
    if not isinstance(result.data, objects.OS):
4475
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
4476
                                 " primary node"  % self.op.os_type)
4477

    
4478
    # bridge check on primary node
4479
    bridges = [n.bridge for n in self.nics]
4480
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4481
    result.Raise()
4482
    if not result.data:
4483
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
4484
                                 " exist on destination node '%s'" %
4485
                                 (",".join(bridges), pnode.name))
4486

    
4487
    # memory check on primary node
4488
    if self.op.start:
4489
      _CheckNodeFreeMemory(self, self.pnode.name,
4490
                           "creating instance %s" % self.op.instance_name,
4491
                           self.be_full[constants.BE_MEMORY],
4492
                           self.op.hypervisor)
4493

    
4494
    if self.op.start:
4495
      self.instance_status = 'up'
4496
    else:
4497
      self.instance_status = 'down'
4498

    
4499
  def Exec(self, feedback_fn):
4500
    """Create and add the instance to the cluster.
4501

4502
    """
4503
    instance = self.op.instance_name
4504
    pnode_name = self.pnode.name
4505

    
4506
    for nic in self.nics:
4507
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4508
        nic.mac = self.cfg.GenerateMAC()
4509

    
4510
    ht_kind = self.op.hypervisor
4511
    if ht_kind in constants.HTS_REQ_PORT:
4512
      network_port = self.cfg.AllocatePort()
4513
    else:
4514
      network_port = None
4515

    
4516
    ##if self.op.vnc_bind_address is None:
4517
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4518

    
4519
    # this is needed because os.path.join does not accept None arguments
4520
    if self.op.file_storage_dir is None:
4521
      string_file_storage_dir = ""
4522
    else:
4523
      string_file_storage_dir = self.op.file_storage_dir
4524

    
4525
    # build the full file storage dir path
4526
    file_storage_dir = os.path.normpath(os.path.join(
4527
                                        self.cfg.GetFileStorageDir(),
4528
                                        string_file_storage_dir, instance))
4529

    
4530

    
4531
    disks = _GenerateDiskTemplate(self,
4532
                                  self.op.disk_template,
4533
                                  instance, pnode_name,
4534
                                  self.secondaries,
4535
                                  self.disks,
4536
                                  file_storage_dir,
4537
                                  self.op.file_driver,
4538
                                  0)
4539

    
4540
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4541
                            primary_node=pnode_name,
4542
                            nics=self.nics, disks=disks,
4543
                            disk_template=self.op.disk_template,
4544
                            status=self.instance_status,
4545
                            network_port=network_port,
4546
                            beparams=self.op.beparams,
4547
                            hvparams=self.op.hvparams,
4548
                            hypervisor=self.op.hypervisor,
4549
                            )
4550

    
4551
    feedback_fn("* creating instance disks...")
4552
    try:
4553
      _CreateDisks(self, iobj)
4554
    except errors.OpExecError:
4555
      self.LogWarning("Device creation failed, reverting...")
4556
      try:
4557
        _RemoveDisks(self, iobj)
4558
      finally:
4559
        self.cfg.ReleaseDRBDMinors(instance)
4560
        raise
4561

    
4562
    feedback_fn("adding instance %s to cluster config" % instance)
4563

    
4564
    self.cfg.AddInstance(iobj)
4565
    # Declare that we don't want to remove the instance lock anymore, as we've
4566
    # added the instance to the config
4567
    del self.remove_locks[locking.LEVEL_INSTANCE]
4568
    # Remove the temp. assignements for the instance's drbds
4569
    self.cfg.ReleaseDRBDMinors(instance)
4570
    # Unlock all the nodes
4571
    if self.op.mode == constants.INSTANCE_IMPORT:
4572
      nodes_keep = [self.op.src_node]
4573
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4574
                       if node != self.op.src_node]
4575
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4576
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4577
    else:
4578
      self.context.glm.release(locking.LEVEL_NODE)
4579
      del self.acquired_locks[locking.LEVEL_NODE]
4580

    
4581
    if self.op.wait_for_sync:
4582
      disk_abort = not _WaitForSync(self, iobj)
4583
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4584
      # make sure the disks are not degraded (still sync-ing is ok)
4585
      time.sleep(15)
4586
      feedback_fn("* checking mirrors status")
4587
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4588
    else:
4589
      disk_abort = False
4590

    
4591
    if disk_abort:
4592
      _RemoveDisks(self, iobj)
4593
      self.cfg.RemoveInstance(iobj.name)
4594
      # Make sure the instance lock gets removed
4595
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4596
      raise errors.OpExecError("There are some degraded disks for"
4597
                               " this instance")
4598

    
4599
    feedback_fn("creating os for instance %s on node %s" %
4600
                (instance, pnode_name))
4601

    
4602
    if iobj.disk_template != constants.DT_DISKLESS:
4603
      if self.op.mode == constants.INSTANCE_CREATE:
4604
        feedback_fn("* running the instance OS create scripts...")
4605
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4606
        msg = result.RemoteFailMsg()
4607
        if msg:
4608
          raise errors.OpExecError("Could not add os for instance %s"
4609
                                   " on node %s: %s" %
4610
                                   (instance, pnode_name, msg))
4611

    
4612
      elif self.op.mode == constants.INSTANCE_IMPORT:
4613
        feedback_fn("* running the instance OS import scripts...")
4614
        src_node = self.op.src_node
4615
        src_images = self.src_images
4616
        cluster_name = self.cfg.GetClusterName()
4617
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4618
                                                         src_node, src_images,
4619
                                                         cluster_name)
4620
        import_result.Raise()
4621
        for idx, result in enumerate(import_result.data):
4622
          if not result:
4623
            self.LogWarning("Could not import the image %s for instance"
4624
                            " %s, disk %d, on node %s" %
4625
                            (src_images[idx], instance, idx, pnode_name))
4626
      else:
4627
        # also checked in the prereq part
4628
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4629
                                     % self.op.mode)
4630

    
4631
    if self.op.start:
4632
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4633
      feedback_fn("* starting instance...")
4634
      result = self.rpc.call_instance_start(pnode_name, iobj, None)
4635
      msg = result.RemoteFailMsg()
4636
      if msg:
4637
        raise errors.OpExecError("Could not start instance: %s" % msg)
4638

    
4639

    
4640
class LUConnectConsole(NoHooksLU):
4641
  """Connect to an instance's console.
4642

4643
  This is somewhat special in that it returns the command line that
4644
  you need to run on the master node in order to connect to the
4645
  console.
4646

4647
  """
4648
  _OP_REQP = ["instance_name"]
4649
  REQ_BGL = False
4650

    
4651
  def ExpandNames(self):
4652
    self._ExpandAndLockInstance()
4653

    
4654
  def CheckPrereq(self):
4655
    """Check prerequisites.
4656

4657
    This checks that the instance is in the cluster.
4658

4659
    """
4660
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4661
    assert self.instance is not None, \
4662
      "Cannot retrieve locked instance %s" % self.op.instance_name
4663
    _CheckNodeOnline(self, self.instance.primary_node)
4664

    
4665
  def Exec(self, feedback_fn):
4666
    """Connect to the console of an instance
4667

4668
    """
4669
    instance = self.instance
4670
    node = instance.primary_node
4671

    
4672
    node_insts = self.rpc.call_instance_list([node],
4673
                                             [instance.hypervisor])[node]
4674
    node_insts.Raise()
4675

    
4676
    if instance.name not in node_insts.data:
4677
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4678

    
4679
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4680

    
4681
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4682
    console_cmd = hyper.GetShellCommandForConsole(instance)
4683

    
4684
    # build ssh cmdline
4685
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4686

    
4687

    
4688
class LUReplaceDisks(LogicalUnit):
4689
  """Replace the disks of an instance.
4690

4691
  """
4692
  HPATH = "mirrors-replace"
4693
  HTYPE = constants.HTYPE_INSTANCE
4694
  _OP_REQP = ["instance_name", "mode", "disks"]
4695
  REQ_BGL = False
4696

    
4697
  def CheckArguments(self):
4698
    if not hasattr(self.op, "remote_node"):
4699
      self.op.remote_node = None
4700
    if not hasattr(self.op, "iallocator"):
4701
      self.op.iallocator = None
4702

    
4703
    # check for valid parameter combination
4704
    cnt = [self.op.remote_node, self.op.iallocator].count(None)
4705
    if self.op.mode == constants.REPLACE_DISK_CHG:
4706
      if cnt == 2:
4707
        raise errors.OpPrereqError("When changing the secondary either an"
4708
                                   " iallocator script must be used or the"
4709
                                   " new node given")
4710
      elif cnt == 0:
4711
        raise errors.OpPrereqError("Give either the iallocator or the new"
4712
                                   " secondary, not both")
4713
    else: # not replacing the secondary
4714
      if cnt != 2:
4715
        raise errors.OpPrereqError("The iallocator and new node options can"
4716
                                   " be used only when changing the"
4717
                                   " secondary node")
4718

    
4719
  def ExpandNames(self):
4720
    self._ExpandAndLockInstance()
4721

    
4722
    if self.op.iallocator is not None:
4723
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4724
    elif self.op.remote_node is not None:
4725
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4726
      if remote_node is None:
4727
        raise errors.OpPrereqError("Node '%s' not known" %
4728
                                   self.op.remote_node)
4729
      self.op.remote_node = remote_node
4730
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4731
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4732
    else:
4733
      self.needed_locks[locking.LEVEL_NODE] = []
4734
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4735

    
4736
  def DeclareLocks(self, level):
4737
    # If we're not already locking all nodes in the set we have to declare the
4738
    # instance's primary/secondary nodes.
4739
    if (level == locking.LEVEL_NODE and
4740
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4741
      self._LockInstancesNodes()
4742

    
4743
  def _RunAllocator(self):
4744
    """Compute a new secondary node using an IAllocator.
4745

4746
    """
4747
    ial = IAllocator(self,
4748
                     mode=constants.IALLOCATOR_MODE_RELOC,
4749
                     name=self.op.instance_name,
4750
                     relocate_from=[self.sec_node])
4751

    
4752
    ial.Run(self.op.iallocator)
4753

    
4754
    if not ial.success:
4755
      raise errors.OpPrereqError("Can't compute nodes using"
4756
                                 " iallocator '%s': %s" % (self.op.iallocator,
4757
                                                           ial.info))
4758
    if len(ial.nodes) != ial.required_nodes:
4759
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4760
                                 " of nodes (%s), required %s" %
4761
                                 (len(ial.nodes), ial.required_nodes))
4762
    self.op.remote_node = ial.nodes[0]
4763
    self.LogInfo("Selected new secondary for the instance: %s",
4764
                 self.op.remote_node)
4765

    
4766
  def BuildHooksEnv(self):
4767
    """Build hooks env.
4768

4769
    This runs on the master, the primary and all the secondaries.
4770

4771
    """
4772
    env = {
4773
      "MODE": self.op.mode,
4774
      "NEW_SECONDARY": self.op.remote_node,
4775
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4776
      }
4777
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4778
    nl = [
4779
      self.cfg.GetMasterNode(),
4780
      self.instance.primary_node,
4781
      ]
4782
    if self.op.remote_node is not None:
4783
      nl.append(self.op.remote_node)
4784
    return env, nl, nl
4785

    
4786
  def CheckPrereq(self):
4787
    """Check prerequisites.
4788

4789
    This checks that the instance is in the cluster.
4790

4791
    """
4792
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4793
    assert instance is not None, \
4794
      "Cannot retrieve locked instance %s" % self.op.instance_name
4795
    self.instance = instance
4796

    
4797
    if instance.disk_template != constants.DT_DRBD8:
4798
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4799
                                 " instances")
4800

    
4801
    if len(instance.secondary_nodes) != 1:
4802
      raise errors.OpPrereqError("The instance has a strange layout,"
4803
                                 " expected one secondary but found %d" %
4804
                                 len(instance.secondary_nodes))
4805

    
4806
    self.sec_node = instance.secondary_nodes[0]
4807

    
4808
    if self.op.iallocator is not None:
4809
      self._RunAllocator()
4810

    
4811
    remote_node = self.op.remote_node
4812
    if remote_node is not None:
4813
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4814
      assert self.remote_node_info is not None, \
4815
        "Cannot retrieve locked node %s" % remote_node
4816
    else:
4817
      self.remote_node_info = None
4818
    if remote_node == instance.primary_node:
4819
      raise errors.OpPrereqError("The specified node is the primary node of"
4820
                                 " the instance.")
4821
    elif remote_node == self.sec_node:
4822
      raise errors.OpPrereqError("The specified node is already the"
4823
                                 " secondary node of the instance.")
4824

    
4825
    if self.op.mode == constants.REPLACE_DISK_PRI:
4826
      n1 = self.tgt_node = instance.primary_node
4827
      n2 = self.oth_node = self.sec_node
4828
    elif self.op.mode == constants.REPLACE_DISK_SEC:
4829
      n1 = self.tgt_node = self.sec_node
4830
      n2 = self.oth_node = instance.primary_node
4831
    elif self.op.mode == constants.REPLACE_DISK_CHG:
4832
      n1 = self.new_node = remote_node
4833
      n2 = self.oth_node = instance.primary_node
4834
      self.tgt_node = self.sec_node
4835
    else:
4836
      raise errors.ProgrammerError("Unhandled disk replace mode")
4837

    
4838
    _CheckNodeOnline(self, n1)
4839
    _CheckNodeOnline(self, n2)
4840

    
4841
    if not self.op.disks:
4842
      self.op.disks = range(len(instance.disks))
4843

    
4844
    for disk_idx in self.op.disks:
4845
      instance.FindDisk(disk_idx)
4846

    
4847
  def _ExecD8DiskOnly(self, feedback_fn):
4848
    """Replace a disk on the primary or secondary for dbrd8.
4849

4850
    The algorithm for replace is quite complicated:
4851

4852
      1. for each disk to be replaced:
4853

4854
        1. create new LVs on the target node with unique names
4855
        1. detach old LVs from the drbd device
4856
        1. rename old LVs to name_replaced.<time_t>
4857
        1. rename new LVs to old LVs
4858
        1. attach the new LVs (with the old names now) to the drbd device
4859

4860
      1. wait for sync across all devices
4861

4862
      1. for each modified disk:
4863

4864
        1. remove old LVs (which have the name name_replaces.<time_t>)
4865

4866
    Failures are not very well handled.
4867

4868
    """
4869
    steps_total = 6
4870
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4871
    instance = self.instance
4872
    iv_names = {}
4873
    vgname = self.cfg.GetVGName()
4874
    # start of work
4875
    cfg = self.cfg
4876
    tgt_node = self.tgt_node
4877
    oth_node = self.oth_node
4878

    
4879
    # Step: check device activation
4880
    self.proc.LogStep(1, steps_total, "check device existence")
4881
    info("checking volume groups")
4882
    my_vg = cfg.GetVGName()
4883
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4884
    if not results:
4885
      raise errors.OpExecError("Can't list volume groups on the nodes")
4886
    for node in oth_node, tgt_node:
4887
      res = results[node]
4888
      if res.failed or not res.data or my_vg not in res.data:
4889
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4890
                                 (my_vg, node))
4891
    for idx, dev in enumerate(instance.disks):
4892
      if idx not in self.op.disks:
4893
        continue
4894
      for node in tgt_node, oth_node:
4895
        info("checking disk/%d on %s" % (idx, node))
4896
        cfg.SetDiskID(dev, node)
4897
        if not self.rpc.call_blockdev_find(node, dev):
4898
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4899
                                   (idx, node))
4900

    
4901
    # Step: check other node consistency
4902
    self.proc.LogStep(2, steps_total, "check peer consistency")
4903
    for idx, dev in enumerate(instance.disks):
4904
      if idx not in self.op.disks:
4905
        continue
4906
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4907
      if not _CheckDiskConsistency(self, dev, oth_node,
4908
                                   oth_node==instance.primary_node):
4909
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4910
                                 " to replace disks on this node (%s)" %
4911
                                 (oth_node, tgt_node))
4912

    
4913
    # Step: create new storage
4914
    self.proc.LogStep(3, steps_total, "allocate new storage")
4915
    for idx, dev in enumerate(instance.disks):
4916
      if idx not in self.op.disks:
4917
        continue
4918
      size = dev.size
4919
      cfg.SetDiskID(dev, tgt_node)
4920
      lv_names = [".disk%d_%s" % (idx, suf)
4921
                  for suf in ["data", "meta"]]
4922
      names = _GenerateUniqueNames(self, lv_names)
4923
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4924
                             logical_id=(vgname, names[0]))
4925
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4926
                             logical_id=(vgname, names[1]))
4927
      new_lvs = [lv_data, lv_meta]
4928
      old_lvs = dev.children
4929
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4930
      info("creating new local storage on %s for %s" %
4931
           (tgt_node, dev.iv_name))
4932
      # we pass force_create=True to force the LVM creation
4933
      for new_lv in new_lvs:
4934
        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
4935
                        _GetInstanceInfoText(instance), False)
4936

    
4937
    # Step: for each lv, detach+rename*2+attach
4938
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4939
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4940
      info("detaching %s drbd from local storage" % dev.iv_name)
4941
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4942
      result.Raise()
4943
      if not result.data:
4944
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4945
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4946
      #dev.children = []
4947
      #cfg.Update(instance)
4948

    
4949
      # ok, we created the new LVs, so now we know we have the needed
4950
      # storage; as such, we proceed on the target node to rename
4951
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4952
      # using the assumption that logical_id == physical_id (which in
4953
      # turn is the unique_id on that node)
4954

    
4955
      # FIXME(iustin): use a better name for the replaced LVs
4956
      temp_suffix = int(time.time())
4957
      ren_fn = lambda d, suff: (d.physical_id[0],
4958
                                d.physical_id[1] + "_replaced-%s" % suff)
4959
      # build the rename list based on what LVs exist on the node
4960
      rlist = []
4961
      for to_ren in old_lvs:
4962
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4963
        if not find_res.failed and find_res.data is not None: # device exists
4964
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4965

    
4966
      info("renaming the old LVs on the target node")
4967
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4968
      result.Raise()
4969
      if not result.data:
4970
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4971
      # now we rename the new LVs to the old LVs
4972
      info("renaming the new LVs on the target node")
4973
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4974
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4975
      result.Raise()
4976
      if not result.data:
4977
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4978

    
4979
      for old, new in zip(old_lvs, new_lvs):
4980
        new.logical_id = old.logical_id
4981
        cfg.SetDiskID(new, tgt_node)
4982

    
4983
      for disk in old_lvs:
4984
        disk.logical_id = ren_fn(disk, temp_suffix)
4985
        cfg.SetDiskID(disk, tgt_node)
4986

    
4987
      # now that the new lvs have the old name, we can add them to the device
4988
      info("adding new mirror component on %s" % tgt_node)
4989
      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4990
      if result.failed or not result.data:
4991
        for new_lv in new_lvs:
4992
          result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4993
          if result.failed or not result.data:
4994
            warning("Can't rollback device %s", hint="manually cleanup unused"
4995
                    " logical volumes")
4996
        raise errors.OpExecError("Can't add local storage to drbd")
4997

    
4998
      dev.children = new_lvs
4999
      cfg.Update(instance)
5000

    
5001
    # Step: wait for sync
5002

    
5003
    # this can fail as the old devices are degraded and _WaitForSync
5004
    # does a combined result over all disks, so we don't check its
5005
    # return value
5006
    self.proc.LogStep(5, steps_total, "sync devices")
5007
    _WaitForSync(self, instance, unlock=True)
5008

    
5009
    # so check manually all the devices
5010
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5011
      cfg.SetDiskID(dev, instance.primary_node)
5012
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5013
      if result.failed or result.data[5]:
5014
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
5015

    
5016
    # Step: remove old storage
5017
    self.proc.LogStep(6, steps_total, "removing old storage")
5018
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5019
      info("remove logical volumes for %s" % name)
5020
      for lv in old_lvs:
5021
        cfg.SetDiskID(lv, tgt_node)
5022
        result = self.rpc.call_blockdev_remove(tgt_node, lv)
5023
        if result.failed or not result.data:
5024
          warning("Can't remove old LV", hint="manually remove unused LVs")
5025
          continue
5026

    
5027
  def _ExecD8Secondary(self, feedback_fn):
5028
    """Replace the secondary node for drbd8.
5029

5030
    The algorithm for replace is quite complicated:
5031
      - for all disks of the instance:
5032
        - create new LVs on the new node with same names
5033
        - shutdown the drbd device on the old secondary
5034
        - disconnect the drbd network on the primary
5035
        - create the drbd device on the new secondary
5036
        - network attach the drbd on the primary, using an artifice:
5037
          the drbd code for Attach() will connect to the network if it
5038
          finds a device which is connected to the good local disks but
5039
          not network enabled
5040
      - wait for sync across all devices
5041
      - remove all disks from the old secondary
5042

5043
    Failures are not very well handled.
5044

5045
    """
5046
    steps_total = 6
5047
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5048
    instance = self.instance
5049
    iv_names = {}
5050
    # start of work
5051
    cfg = self.cfg
5052
    old_node = self.tgt_node
5053
    new_node = self.new_node
5054
    pri_node = instance.primary_node
5055
    nodes_ip = {
5056
      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5057
      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5058
      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5059
      }
5060

    
5061
    # Step: check device activation
5062
    self.proc.LogStep(1, steps_total, "check device existence")
5063
    info("checking volume groups")
5064
    my_vg = cfg.GetVGName()
5065
    results = self.rpc.call_vg_list([pri_node, new_node])
5066
    for node in pri_node, new_node:
5067
      res = results[node]
5068
      if res.failed or not res.data or my_vg not in res.data:
5069
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5070
                                 (my_vg, node))
5071
    for idx, dev in enumerate(instance.disks):
5072
      if idx not in self.op.disks:
5073
        continue
5074
      info("checking disk/%d on %s" % (idx, pri_node))
5075
      cfg.SetDiskID(dev, pri_node)
5076
      result = self.rpc.call_blockdev_find(pri_node, dev)
5077
      result.Raise()
5078
      if not result.data:
5079
        raise errors.OpExecError("Can't find disk/%d on node %s" %
5080
                                 (idx, pri_node))
5081

    
5082
    # Step: check other node consistency
5083
    self.proc.LogStep(2, steps_total, "check peer consistency")
5084
    for idx, dev in enumerate(instance.disks):
5085
      if idx not in self.op.disks:
5086
        continue
5087
      info("checking disk/%d consistency on %s" % (idx, pri_node))
5088
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5089
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
5090
                                 " unsafe to replace the secondary" %
5091
                                 pri_node)
5092

    
5093
    # Step: create new storage
5094
    self.proc.LogStep(3, steps_total, "allocate new storage")
5095
    for idx, dev in enumerate(instance.disks):
5096
      info("adding new local storage on %s for disk/%d" %
5097
           (new_node, idx))
5098
      # we pass force_create=True to force LVM creation
5099
      for new_lv in dev.children:
5100
        _CreateBlockDev(self, new_node, instance, new_lv, True,
5101
                        _GetInstanceInfoText(instance), False)
5102

    
5103
    # Step 4: dbrd minors and drbd setups changes
5104
    # after this, we must manually remove the drbd minors on both the
5105
    # error and the success paths
5106
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5107
                                   instance.name)
5108
    logging.debug("Allocated minors %s" % (minors,))
5109
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
5110
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5111
      size = dev.size
5112
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5113
      # create new devices on new_node; note that we create two IDs:
5114
      # one without port, so the drbd will be activated without
5115
      # networking information on the new node at this stage, and one
5116
      # with network, for the latter activation in step 4
5117
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5118
      if pri_node == o_node1:
5119
        p_minor = o_minor1
5120
      else:
5121
        p_minor = o_minor2
5122

    
5123
      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5124
      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5125

    
5126
      iv_names[idx] = (dev, dev.children, new_net_id)
5127
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5128
                    new_net_id)
5129
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5130
                              logical_id=new_alone_id,
5131
                              children=dev.children)
5132
      try:
5133
        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5134
                              _GetInstanceInfoText(instance), False)
5135
      except errors.BlockDeviceError:
5136
        self.cfg.ReleaseDRBDMinors(instance.name)
5137
        raise
5138

    
5139
    for idx, dev in enumerate(instance.disks):
5140
      # we have new devices, shutdown the drbd on the old secondary
5141
      info("shutting down drbd for disk/%d on old node" % idx)
5142
      cfg.SetDiskID(dev, old_node)
5143
      result = self.rpc.call_blockdev_shutdown(old_node, dev)
5144
      if result.failed or not result.data:
5145
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
5146
                hint="Please cleanup this device manually as soon as possible")
5147

    
5148
    info("detaching primary drbds from the network (=> standalone)")
5149
    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5150
                                               instance.disks)[pri_node]
5151

    
5152
    msg = result.RemoteFailMsg()
5153
    if msg:
5154
      # detaches didn't succeed (unlikely)
5155
      self.cfg.ReleaseDRBDMinors(instance.name)
5156
      raise errors.OpExecError("Can't detach the disks from the network on"
5157
                               " old node: %s" % (msg,))
5158

    
5159
    # if we managed to detach at least one, we update all the disks of
5160
    # the instance to point to the new secondary
5161
    info("updating instance configuration")
5162
    for dev, _, new_logical_id in iv_names.itervalues():
5163
      dev.logical_id = new_logical_id
5164
      cfg.SetDiskID(dev, pri_node)
5165
    cfg.Update(instance)
5166
    # we can remove now the temp minors as now the new values are
5167
    # written to the config file (and therefore stable)
5168
    self.cfg.ReleaseDRBDMinors(instance.name)
5169

    
5170
    # and now perform the drbd attach
5171
    info("attaching primary drbds to new secondary (standalone => connected)")
5172
    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5173
                                           instance.disks, instance.name,
5174
                                           False)
5175
    for to_node, to_result in result.items():
5176
      msg = to_result.RemoteFailMsg()
5177
      if msg:
5178
        warning("can't attach drbd disks on node %s: %s", to_node, msg,
5179
                hint="please do a gnt-instance info to see the"
5180
                " status of disks")
5181

    
5182
    # this can fail as the old devices are degraded and _WaitForSync
5183
    # does a combined result over all disks, so we don't check its
5184
    # return value
5185
    self.proc.LogStep(5, steps_total, "sync devices")
5186
    _WaitForSync(self, instance, unlock=True)
5187

    
5188
    # so check manually all the devices
5189
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5190
      cfg.SetDiskID(dev, pri_node)
5191
      result = self.rpc.call_blockdev_find(pri_node, dev)
5192
      result.Raise()
5193
      if result.data[5]:
5194
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5195

    
5196
    self.proc.LogStep(6, steps_total, "removing old storage")
5197
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5198
      info("remove logical volumes for disk/%d" % idx)
5199
      for lv in old_lvs:
5200
        cfg.SetDiskID(lv, old_node)
5201
        result = self.rpc.call_blockdev_remove(old_node, lv)
5202
        if result.failed or not result.data:
5203
          warning("Can't remove LV on old secondary",
5204
                  hint="Cleanup stale volumes by hand")
5205

    
5206
  def Exec(self, feedback_fn):
5207
    """Execute disk replacement.
5208

5209
    This dispatches the disk replacement to the appropriate handler.
5210

5211
    """
5212
    instance = self.instance
5213

    
5214
    # Activate the instance disks if we're replacing them on a down instance
5215
    if instance.status == "down":
5216
      _StartInstanceDisks(self, instance, True)
5217

    
5218
    if self.op.mode == constants.REPLACE_DISK_CHG:
5219
      fn = self._ExecD8Secondary
5220
    else:
5221
      fn = self._ExecD8DiskOnly
5222

    
5223
    ret = fn(feedback_fn)
5224

    
5225
    # Deactivate the instance disks if we're replacing them on a down instance
5226
    if instance.status == "down":
5227
      _SafeShutdownInstanceDisks(self, instance)
5228

    
5229
    return ret
5230

    
5231

    
5232
class LUGrowDisk(LogicalUnit):
5233
  """Grow a disk of an instance.
5234

5235
  """
5236
  HPATH = "disk-grow"
5237
  HTYPE = constants.HTYPE_INSTANCE
5238
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5239
  REQ_BGL = False
5240

    
5241
  def ExpandNames(self):
5242
    self._ExpandAndLockInstance()
5243
    self.needed_locks[locking.LEVEL_NODE] = []
5244
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5245

    
5246
  def DeclareLocks(self, level):
5247
    if level == locking.LEVEL_NODE:
5248
      self._LockInstancesNodes()
5249

    
5250
  def BuildHooksEnv(self):
5251
    """Build hooks env.
5252

5253
    This runs on the master, the primary and all the secondaries.
5254

5255
    """
5256
    env = {
5257
      "DISK": self.op.disk,
5258
      "AMOUNT": self.op.amount,
5259
      }
5260
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5261
    nl = [
5262
      self.cfg.GetMasterNode(),
5263
      self.instance.primary_node,
5264
      ]
5265
    return env, nl, nl
5266

    
5267
  def CheckPrereq(self):
5268
    """Check prerequisites.
5269

5270
    This checks that the instance is in the cluster.
5271

5272
    """
5273
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5274
    assert instance is not None, \
5275
      "Cannot retrieve locked instance %s" % self.op.instance_name
5276
    nodenames = list(instance.all_nodes)
5277
    for node in nodenames:
5278
      _CheckNodeOnline(self, node)
5279

    
5280

    
5281
    self.instance = instance
5282

    
5283
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5284
      raise errors.OpPrereqError("Instance's disk layout does not support"
5285
                                 " growing.")
5286

    
5287
    self.disk = instance.FindDisk(self.op.disk)
5288

    
5289
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5290
                                       instance.hypervisor)
5291
    for node in nodenames:
5292
      info = nodeinfo[node]
5293
      if info.failed or not info.data:
5294
        raise errors.OpPrereqError("Cannot get current information"
5295
                                   " from node '%s'" % node)
5296
      vg_free = info.data.get('vg_free', None)
5297
      if not isinstance(vg_free, int):
5298
        raise errors.OpPrereqError("Can't compute free disk space on"
5299
                                   " node %s" % node)
5300
      if self.op.amount > vg_free:
5301
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
5302
                                   " %d MiB available, %d MiB required" %
5303
                                   (node, vg_free, self.op.amount))
5304

    
5305
  def Exec(self, feedback_fn):
5306
    """Execute disk grow.
5307

5308
    """
5309
    instance = self.instance
5310
    disk = self.disk
5311
    for node in instance.all_nodes:
5312
      self.cfg.SetDiskID(disk, node)
5313
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5314
      result.Raise()
5315
      if (not result.data or not isinstance(result.data, (list, tuple)) or
5316
          len(result.data) != 2):
5317
        raise errors.OpExecError("Grow request failed to node %s" % node)
5318
      elif not result.data[0]:
5319
        raise errors.OpExecError("Grow request failed to node %s: %s" %
5320
                                 (node, result.data[1]))
5321
    disk.RecordGrow(self.op.amount)
5322
    self.cfg.Update(instance)
5323
    if self.op.wait_for_sync:
5324
      disk_abort = not _WaitForSync(self, instance)
5325
      if disk_abort:
5326
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5327
                             " status.\nPlease check the instance.")
5328

    
5329

    
5330
class LUQueryInstanceData(NoHooksLU):
5331
  """Query runtime instance data.
5332

5333
  """
5334
  _OP_REQP = ["instances", "static"]
5335
  REQ_BGL = False
5336

    
5337
  def ExpandNames(self):
5338
    self.needed_locks = {}
5339
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5340

    
5341
    if not isinstance(self.op.instances, list):
5342
      raise errors.OpPrereqError("Invalid argument type 'instances'")
5343

    
5344
    if self.op.instances:
5345
      self.wanted_names = []
5346
      for name in self.op.instances:
5347
        full_name = self.cfg.ExpandInstanceName(name)
5348
        if full_name is None:
5349
          raise errors.OpPrereqError("Instance '%s' not known" % name)
5350
        self.wanted_names.append(full_name)
5351
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5352
    else:
5353
      self.wanted_names = None
5354
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5355

    
5356
    self.needed_locks[locking.LEVEL_NODE] = []
5357
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5358

    
5359
  def DeclareLocks(self, level):
5360
    if level == locking.LEVEL_NODE:
5361
      self._LockInstancesNodes()
5362

    
5363
  def CheckPrereq(self):
5364
    """Check prerequisites.
5365

5366
    This only checks the optional instance list against the existing names.
5367

5368
    """
5369
    if self.wanted_names is None:
5370
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5371

    
5372
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5373
                             in self.wanted_names]
5374
    return
5375

    
5376
  def _ComputeDiskStatus(self, instance, snode, dev):
5377
    """Compute block device status.
5378

5379
    """
5380
    static = self.op.static
5381
    if not static:
5382
      self.cfg.SetDiskID(dev, instance.primary_node)
5383
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5384
      dev_pstatus.Raise()
5385
      dev_pstatus = dev_pstatus.data
5386
    else:
5387
      dev_pstatus = None
5388

    
5389
    if dev.dev_type in constants.LDS_DRBD:
5390
      # we change the snode then (otherwise we use the one passed in)
5391
      if dev.logical_id[0] == instance.primary_node:
5392
        snode = dev.logical_id[1]
5393
      else:
5394
        snode = dev.logical_id[0]
5395

    
5396
    if snode and not static:
5397
      self.cfg.SetDiskID(dev, snode)
5398
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5399
      dev_sstatus.Raise()
5400
      dev_sstatus = dev_sstatus.data
5401
    else:
5402
      dev_sstatus = None
5403

    
5404
    if dev.children:
5405
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
5406
                      for child in dev.children]
5407
    else:
5408
      dev_children = []
5409

    
5410
    data = {
5411
      "iv_name": dev.iv_name,
5412
      "dev_type": dev.dev_type,
5413
      "logical_id": dev.logical_id,
5414
      "physical_id": dev.physical_id,
5415
      "pstatus": dev_pstatus,
5416
      "sstatus": dev_sstatus,
5417
      "children": dev_children,
5418
      "mode": dev.mode,
5419
      }
5420

    
5421
    return data
5422

    
5423
  def Exec(self, feedback_fn):
5424
    """Gather and return data"""
5425
    result = {}
5426

    
5427
    cluster = self.cfg.GetClusterInfo()
5428

    
5429
    for instance in self.wanted_instances:
5430
      if not self.op.static:
5431
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5432
                                                  instance.name,
5433
                                                  instance.hypervisor)
5434
        remote_info.Raise()
5435
        remote_info = remote_info.data
5436
        if remote_info and "state" in remote_info:
5437
          remote_state = "up"
5438
        else:
5439
          remote_state = "down"
5440
      else:
5441
        remote_state = None
5442
      if instance.status == "down":
5443
        config_state = "down"
5444
      else:
5445
        config_state = "up"
5446

    
5447
      disks = [self._ComputeDiskStatus(instance, None, device)
5448
               for device in instance.disks]
5449

    
5450
      idict = {
5451
        "name": instance.name,
5452
        "config_state": config_state,
5453
        "run_state": remote_state,
5454
        "pnode": instance.primary_node,
5455
        "snodes": instance.secondary_nodes,
5456
        "os": instance.os,
5457
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5458
        "disks": disks,
5459
        "hypervisor": instance.hypervisor,
5460
        "network_port": instance.network_port,
5461
        "hv_instance": instance.hvparams,
5462
        "hv_actual": cluster.FillHV(instance),
5463
        "be_instance": instance.beparams,
5464
        "be_actual": cluster.FillBE(instance),
5465
        }
5466

    
5467
      result[instance.name] = idict
5468

    
5469
    return result
5470

    
5471

    
5472
class LUSetInstanceParams(LogicalUnit):
5473
  """Modifies an instances's parameters.
5474

5475
  """
5476
  HPATH = "instance-modify"
5477
  HTYPE = constants.HTYPE_INSTANCE
5478
  _OP_REQP = ["instance_name"]
5479
  REQ_BGL = False
5480

    
5481
  def CheckArguments(self):
5482
    if not hasattr(self.op, 'nics'):
5483
      self.op.nics = []
5484
    if not hasattr(self.op, 'disks'):
5485
      self.op.disks = []
5486
    if not hasattr(self.op, 'beparams'):
5487
      self.op.beparams = {}
5488
    if not hasattr(self.op, 'hvparams'):
5489
      self.op.hvparams = {}
5490
    self.op.force = getattr(self.op, "force", False)
5491
    if not (self.op.nics or self.op.disks or
5492
            self.op.hvparams or self.op.beparams):
5493
      raise errors.OpPrereqError("No changes submitted")
5494

    
5495
    utils.CheckBEParams(self.op.beparams)
5496

    
5497
    # Disk validation
5498
    disk_addremove = 0
5499
    for disk_op, disk_dict in self.op.disks:
5500
      if disk_op == constants.DDM_REMOVE:
5501
        disk_addremove += 1
5502
        continue
5503
      elif disk_op == constants.DDM_ADD:
5504
        disk_addremove += 1
5505
      else:
5506
        if not isinstance(disk_op, int):
5507
          raise errors.OpPrereqError("Invalid disk index")
5508
      if disk_op == constants.DDM_ADD:
5509
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5510
        if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5511
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5512
        size = disk_dict.get('size', None)
5513
        if size is None:
5514
          raise errors.OpPrereqError("Required disk parameter size missing")
5515
        try:
5516
          size = int(size)
5517
        except ValueError, err:
5518
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5519
                                     str(err))
5520
        disk_dict['size'] = size
5521
      else:
5522
        # modification of disk
5523
        if 'size' in disk_dict:
5524
          raise errors.OpPrereqError("Disk size change not possible, use"
5525
                                     " grow-disk")
5526

    
5527
    if disk_addremove > 1:
5528
      raise errors.OpPrereqError("Only one disk add or remove operation"
5529
                                 " supported at a time")
5530

    
5531
    # NIC validation
5532
    nic_addremove = 0
5533
    for nic_op, nic_dict in self.op.nics:
5534
      if nic_op == constants.DDM_REMOVE:
5535
        nic_addremove += 1
5536
        continue
5537
      elif nic_op == constants.DDM_ADD:
5538
        nic_addremove += 1
5539
      else:
5540
        if not isinstance(nic_op, int):
5541
          raise errors.OpPrereqError("Invalid nic index")
5542

    
5543
      # nic_dict should be a dict
5544
      nic_ip = nic_dict.get('ip', None)
5545
      if nic_ip is not None:
5546
        if nic_ip.lower() == "none":
5547
          nic_dict['ip'] = None
5548
        else:
5549
          if not utils.IsValidIP(nic_ip):
5550
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5551
      # we can only check None bridges and assign the default one
5552
      nic_bridge = nic_dict.get('bridge', None)
5553
      if nic_bridge is None:
5554
        nic_dict['bridge'] = self.cfg.GetDefBridge()
5555
      # but we can validate MACs
5556
      nic_mac = nic_dict.get('mac', None)
5557
      if nic_mac is not None:
5558
        if self.cfg.IsMacInUse(nic_mac):
5559
          raise errors.OpPrereqError("MAC address %s already in use"
5560
                                     " in cluster" % nic_mac)
5561
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5562
          if not utils.IsValidMac(nic_mac):
5563
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5564
    if nic_addremove > 1:
5565
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5566
                                 " supported at a time")
5567

    
5568
  def ExpandNames(self):
5569
    self._ExpandAndLockInstance()
5570
    self.needed_locks[locking.LEVEL_NODE] = []
5571
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5572

    
5573
  def DeclareLocks(self, level):
5574
    if level == locking.LEVEL_NODE:
5575
      self._LockInstancesNodes()
5576

    
5577
  def BuildHooksEnv(self):
5578
    """Build hooks env.
5579

5580
    This runs on the master, primary and secondaries.
5581

5582
    """
5583
    args = dict()
5584
    if constants.BE_MEMORY in self.be_new:
5585
      args['memory'] = self.be_new[constants.BE_MEMORY]
5586
    if constants.BE_VCPUS in self.be_new:
5587
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5588
    # FIXME: readd disk/nic changes
5589
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5590
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5591
    return env, nl, nl
5592

    
5593
  def CheckPrereq(self):
5594
    """Check prerequisites.
5595

5596
    This only checks the instance list against the existing names.
5597

5598
    """
5599
    force = self.force = self.op.force
5600

    
5601
    # checking the new params on the primary/secondary nodes
5602

    
5603
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5604
    assert self.instance is not None, \
5605
      "Cannot retrieve locked instance %s" % self.op.instance_name
5606
    pnode = instance.primary_node
5607
    nodelist = list(instance.all_nodes)
5608

    
5609
    # hvparams processing
5610
    if self.op.hvparams:
5611
      i_hvdict = copy.deepcopy(instance.hvparams)
5612
      for key, val in self.op.hvparams.iteritems():
5613
        if val == constants.VALUE_DEFAULT:
5614
          try:
5615
            del i_hvdict[key]
5616
          except KeyError:
5617
            pass
5618
        elif val == constants.VALUE_NONE:
5619
          i_hvdict[key] = None
5620
        else:
5621
          i_hvdict[key] = val
5622
      cluster = self.cfg.GetClusterInfo()
5623
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5624
                                i_hvdict)
5625
      # local check
5626
      hypervisor.GetHypervisor(
5627
        instance.hypervisor).CheckParameterSyntax(hv_new)
5628
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5629
      self.hv_new = hv_new # the new actual values
5630
      self.hv_inst = i_hvdict # the new dict (without defaults)
5631
    else:
5632
      self.hv_new = self.hv_inst = {}
5633

    
5634
    # beparams processing
5635
    if self.op.beparams:
5636
      i_bedict = copy.deepcopy(instance.beparams)
5637
      for key, val in self.op.beparams.iteritems():
5638
        if val == constants.VALUE_DEFAULT:
5639
          try:
5640
            del i_bedict[key]
5641
          except KeyError:
5642
            pass
5643
        else:
5644
          i_bedict[key] = val
5645
      cluster = self.cfg.GetClusterInfo()
5646
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5647
                                i_bedict)
5648
      self.be_new = be_new # the new actual values
5649
      self.be_inst = i_bedict # the new dict (without defaults)
5650
    else:
5651
      self.be_new = self.be_inst = {}
5652

    
5653
    self.warn = []
5654

    
5655
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5656
      mem_check_list = [pnode]
5657
      if be_new[constants.BE_AUTO_BALANCE]:
5658
        # either we changed auto_balance to yes or it was from before
5659
        mem_check_list.extend(instance.secondary_nodes)
5660
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5661
                                                  instance.hypervisor)
5662
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5663
                                         instance.hypervisor)
5664
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5665
        # Assume the primary node is unreachable and go ahead
5666
        self.warn.append("Can't get info from primary node %s" % pnode)
5667
      else:
5668
        if not instance_info.failed and instance_info.data:
5669
          current_mem = instance_info.data['memory']
5670
        else:
5671
          # Assume instance not running
5672
          # (there is a slight race condition here, but it's not very probable,
5673
          # and we have no other way to check)
5674
          current_mem = 0
5675
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5676
                    nodeinfo[pnode].data['memory_free'])
5677
        if miss_mem > 0:
5678
          raise errors.OpPrereqError("This change will prevent the instance"
5679
                                     " from starting, due to %d MB of memory"
5680
                                     " missing on its primary node" % miss_mem)
5681

    
5682
      if be_new[constants.BE_AUTO_BALANCE]:
5683
        for node, nres in nodeinfo.iteritems():
5684
          if node not in instance.secondary_nodes:
5685
            continue
5686
          if nres.failed or not isinstance(nres.data, dict):
5687
            self.warn.append("Can't get info from secondary node %s" % node)
5688
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5689
            self.warn.append("Not enough memory to failover instance to"
5690
                             " secondary node %s" % node)
5691

    
5692
    # NIC processing
5693
    for nic_op, nic_dict in self.op.nics:
5694
      if nic_op == constants.DDM_REMOVE:
5695
        if not instance.nics:
5696
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5697
        continue
5698
      if nic_op != constants.DDM_ADD:
5699
        # an existing nic
5700
        if nic_op < 0 or nic_op >= len(instance.nics):
5701
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5702
                                     " are 0 to %d" %
5703
                                     (nic_op, len(instance.nics)))
5704
      nic_bridge = nic_dict.get('bridge', None)
5705
      if nic_bridge is not None:
5706
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5707
          msg = ("Bridge '%s' doesn't exist on one of"
5708
                 " the instance nodes" % nic_bridge)
5709
          if self.force:
5710
            self.warn.append(msg)
5711
          else:
5712
            raise errors.OpPrereqError(msg)
5713

    
5714
    # DISK processing
5715
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5716
      raise errors.OpPrereqError("Disk operations not supported for"
5717
                                 " diskless instances")
5718
    for disk_op, disk_dict in self.op.disks:
5719
      if disk_op == constants.DDM_REMOVE:
5720
        if len(instance.disks) == 1:
5721
          raise errors.OpPrereqError("Cannot remove the last disk of"
5722
                                     " an instance")
5723
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5724
        ins_l = ins_l[pnode]
5725
        if ins_l.failed or not isinstance(ins_l.data, list):
5726
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5727
        if instance.name in ins_l.data:
5728
          raise errors.OpPrereqError("Instance is running, can't remove"
5729
                                     " disks.")
5730

    
5731
      if (disk_op == constants.DDM_ADD and
5732
          len(instance.nics) >= constants.MAX_DISKS):
5733
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5734
                                   " add more" % constants.MAX_DISKS)
5735
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5736
        # an existing disk
5737
        if disk_op < 0 or disk_op >= len(instance.disks):
5738
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5739
                                     " are 0 to %d" %
5740
                                     (disk_op, len(instance.disks)))
5741

    
5742
    return
5743

    
5744
  def Exec(self, feedback_fn):
5745
    """Modifies an instance.
5746

5747
    All parameters take effect only at the next restart of the instance.
5748

5749
    """
5750
    # Process here the warnings from CheckPrereq, as we don't have a
5751
    # feedback_fn there.
5752
    for warn in self.warn:
5753
      feedback_fn("WARNING: %s" % warn)
5754

    
5755
    result = []
5756
    instance = self.instance
5757
    # disk changes
5758
    for disk_op, disk_dict in self.op.disks:
5759
      if disk_op == constants.DDM_REMOVE:
5760
        # remove the last disk
5761
        device = instance.disks.pop()
5762
        device_idx = len(instance.disks)
5763
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5764
          self.cfg.SetDiskID(disk, node)
5765
          rpc_result = self.rpc.call_blockdev_remove(node, disk)
5766
          if rpc_result.failed or not rpc_result.data:
5767
            self.proc.LogWarning("Could not remove disk/%d on node %s,"
5768
                                 " continuing anyway", device_idx, node)
5769
        result.append(("disk/%d" % device_idx, "remove"))
5770
      elif disk_op == constants.DDM_ADD:
5771
        # add a new disk
5772
        if instance.disk_template == constants.DT_FILE:
5773
          file_driver, file_path = instance.disks[0].logical_id
5774
          file_path = os.path.dirname(file_path)
5775
        else:
5776
          file_driver = file_path = None
5777
        disk_idx_base = len(instance.disks)
5778
        new_disk = _GenerateDiskTemplate(self,
5779
                                         instance.disk_template,
5780
                                         instance.name, instance.primary_node,
5781
                                         instance.secondary_nodes,
5782
                                         [disk_dict],
5783
                                         file_path,
5784
                                         file_driver,
5785
                                         disk_idx_base)[0]
5786
        new_disk.mode = disk_dict['mode']
5787
        instance.disks.append(new_disk)
5788
        info = _GetInstanceInfoText(instance)
5789

    
5790
        logging.info("Creating volume %s for instance %s",
5791
                     new_disk.iv_name, instance.name)
5792
        # Note: this needs to be kept in sync with _CreateDisks
5793
        #HARDCODE
5794
        for node in instance.all_nodes:
5795
          f_create = node == instance.primary_node
5796
          try:
5797
            _CreateBlockDev(self, node, instance, new_disk,
5798
                            f_create, info, f_create)
5799
          except errors.OpExecError, err:
5800
            self.LogWarning("Failed to create volume %s (%s) on"
5801
                            " node %s: %s",
5802
                            new_disk.iv_name, new_disk, node, err)
5803
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5804
                       (new_disk.size, new_disk.mode)))
5805
      else:
5806
        # change a given disk
5807
        instance.disks[disk_op].mode = disk_dict['mode']
5808
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5809
    # NIC changes
5810
    for nic_op, nic_dict in self.op.nics:
5811
      if nic_op == constants.DDM_REMOVE:
5812
        # remove the last nic
5813
        del instance.nics[-1]
5814
        result.append(("nic.%d" % len(instance.nics), "remove"))
5815
      elif nic_op == constants.DDM_ADD:
5816
        # add a new nic
5817
        if 'mac' not in nic_dict:
5818
          mac = constants.VALUE_GENERATE
5819
        else:
5820
          mac = nic_dict['mac']
5821
        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5822
          mac = self.cfg.GenerateMAC()
5823
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5824
                              bridge=nic_dict.get('bridge', None))
5825
        instance.nics.append(new_nic)
5826
        result.append(("nic.%d" % (len(instance.nics) - 1),
5827
                       "add:mac=%s,ip=%s,bridge=%s" %
5828
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
5829
      else:
5830
        # change a given nic
5831
        for key in 'mac', 'ip', 'bridge':
5832
          if key in nic_dict:
5833
            setattr(instance.nics[nic_op], key, nic_dict[key])
5834
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5835

    
5836
    # hvparams changes
5837
    if self.op.hvparams:
5838
      instance.hvparams = self.hv_new
5839
      for key, val in self.op.hvparams.iteritems():
5840
        result.append(("hv/%s" % key, val))
5841

    
5842
    # beparams changes
5843
    if self.op.beparams:
5844
      instance.beparams = self.be_inst
5845
      for key, val in self.op.beparams.iteritems():
5846
        result.append(("be/%s" % key, val))
5847

    
5848
    self.cfg.Update(instance)
5849

    
5850
    return result
5851

    
5852

    
5853
class LUQueryExports(NoHooksLU):
5854
  """Query the exports list
5855

5856
  """
5857
  _OP_REQP = ['nodes']
5858
  REQ_BGL = False
5859

    
5860
  def ExpandNames(self):
5861
    self.needed_locks = {}
5862
    self.share_locks[locking.LEVEL_NODE] = 1
5863
    if not self.op.nodes:
5864
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5865
    else:
5866
      self.needed_locks[locking.LEVEL_NODE] = \
5867
        _GetWantedNodes(self, self.op.nodes)
5868

    
5869
  def CheckPrereq(self):
5870
    """Check prerequisites.
5871

5872
    """
5873
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5874

    
5875
  def Exec(self, feedback_fn):
5876
    """Compute the list of all the exported system images.
5877

5878
    @rtype: dict
5879
    @return: a dictionary with the structure node->(export-list)
5880
        where export-list is a list of the instances exported on
5881
        that node.
5882

5883
    """
5884
    rpcresult = self.rpc.call_export_list(self.nodes)
5885
    result = {}
5886
    for node in rpcresult:
5887
      if rpcresult[node].failed:
5888
        result[node] = False
5889
      else:
5890
        result[node] = rpcresult[node].data
5891

    
5892
    return result
5893

    
5894

    
5895
class LUExportInstance(LogicalUnit):
5896
  """Export an instance to an image in the cluster.
5897

5898
  """
5899
  HPATH = "instance-export"
5900
  HTYPE = constants.HTYPE_INSTANCE
5901
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
5902
  REQ_BGL = False
5903

    
5904
  def ExpandNames(self):
5905
    self._ExpandAndLockInstance()
5906
    # FIXME: lock only instance primary and destination node
5907
    #
5908
    # Sad but true, for now we have do lock all nodes, as we don't know where
5909
    # the previous export might be, and and in this LU we search for it and
5910
    # remove it from its current node. In the future we could fix this by:
5911
    #  - making a tasklet to search (share-lock all), then create the new one,
5912
    #    then one to remove, after
5913
    #  - removing the removal operation altoghether
5914
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5915

    
5916
  def DeclareLocks(self, level):
5917
    """Last minute lock declaration."""
5918
    # All nodes are locked anyway, so nothing to do here.
5919

    
5920
  def BuildHooksEnv(self):
5921
    """Build hooks env.
5922

5923
    This will run on the master, primary node and target node.
5924

5925
    """
5926
    env = {
5927
      "EXPORT_NODE": self.op.target_node,
5928
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5929
      }
5930
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5931
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5932
          self.op.target_node]
5933
    return env, nl, nl
5934

    
5935
  def CheckPrereq(self):
5936
    """Check prerequisites.
5937

5938
    This checks that the instance and node names are valid.
5939

5940
    """
5941
    instance_name = self.op.instance_name
5942
    self.instance = self.cfg.GetInstanceInfo(instance_name)
5943
    assert self.instance is not None, \
5944
          "Cannot retrieve locked instance %s" % self.op.instance_name
5945
    _CheckNodeOnline(self, self.instance.primary_node)
5946

    
5947
    self.dst_node = self.cfg.GetNodeInfo(
5948
      self.cfg.ExpandNodeName(self.op.target_node))
5949

    
5950
    if self.dst_node is None:
5951
      # This is wrong node name, not a non-locked node
5952
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5953
    _CheckNodeOnline(self, self.dst_node.name)
5954

    
5955
    # instance disk type verification
5956
    for disk in self.instance.disks:
5957
      if disk.dev_type == constants.LD_FILE:
5958
        raise errors.OpPrereqError("Export not supported for instances with"
5959
                                   " file-based disks")
5960

    
5961
  def Exec(self, feedback_fn):
5962
    """Export an instance to an image in the cluster.
5963

5964
    """
5965
    instance = self.instance
5966
    dst_node = self.dst_node
5967
    src_node = instance.primary_node
5968
    if self.op.shutdown:
5969
      # shutdown the instance, but not the disks
5970
      result = self.rpc.call_instance_shutdown(src_node, instance)
5971
      result.Raise()
5972
      if not result.data:
5973
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5974
                                 (instance.name, src_node))
5975

    
5976
    vgname = self.cfg.GetVGName()
5977

    
5978
    snap_disks = []
5979

    
5980
    # set the disks ID correctly since call_instance_start needs the
5981
    # correct drbd minor to create the symlinks
5982
    for disk in instance.disks:
5983
      self.cfg.SetDiskID(disk, src_node)
5984

    
5985
    try:
5986
      for disk in instance.disks:
5987
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5988
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5989
        if new_dev_name.failed or not new_dev_name.data:
5990
          self.LogWarning("Could not snapshot block device %s on node %s",
5991
                          disk.logical_id[1], src_node)
5992
          snap_disks.append(False)
5993
        else:
5994
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5995
                                 logical_id=(vgname, new_dev_name.data),
5996
                                 physical_id=(vgname, new_dev_name.data),
5997
                                 iv_name=disk.iv_name)
5998
          snap_disks.append(new_dev)
5999

    
6000
    finally:
6001
      if self.op.shutdown and instance.status == "up":
6002
        result = self.rpc.call_instance_start(src_node, instance, None)
6003
        msg = result.RemoteFailMsg()
6004
        if msg:
6005
          _ShutdownInstanceDisks(self, instance)
6006
          raise errors.OpExecError("Could not start instance: %s" % msg)
6007

    
6008
    # TODO: check for size
6009

    
6010
    cluster_name = self.cfg.GetClusterName()
6011
    for idx, dev in enumerate(snap_disks):
6012
      if dev:
6013
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6014
                                               instance, cluster_name, idx)
6015
        if result.failed or not result.data:
6016
          self.LogWarning("Could not export block device %s from node %s to"
6017
                          " node %s", dev.logical_id[1], src_node,
6018
                          dst_node.name)
6019
        result = self.rpc.call_blockdev_remove(src_node, dev)
6020
        if result.failed or not result.data:
6021
          self.LogWarning("Could not remove snapshot block device %s from node"
6022
                          " %s", dev.logical_id[1], src_node)
6023

    
6024
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6025
    if result.failed or not result.data:
6026
      self.LogWarning("Could not finalize export for instance %s on node %s",
6027
                      instance.name, dst_node.name)
6028

    
6029
    nodelist = self.cfg.GetNodeList()
6030
    nodelist.remove(dst_node.name)
6031

    
6032
    # on one-node clusters nodelist will be empty after the removal
6033
    # if we proceed the backup would be removed because OpQueryExports
6034
    # substitutes an empty list with the full cluster node list.
6035
    if nodelist:
6036
      exportlist = self.rpc.call_export_list(nodelist)
6037
      for node in exportlist:
6038
        if exportlist[node].failed:
6039
          continue
6040
        if instance.name in exportlist[node].data:
6041
          if not self.rpc.call_export_remove(node, instance.name):
6042
            self.LogWarning("Could not remove older export for instance %s"
6043
                            " on node %s", instance.name, node)
6044

    
6045

    
6046
class LURemoveExport(NoHooksLU):
6047
  """Remove exports related to the named instance.
6048

6049
  """
6050
  _OP_REQP = ["instance_name"]
6051
  REQ_BGL = False
6052

    
6053
  def ExpandNames(self):
6054
    self.needed_locks = {}
6055
    # We need all nodes to be locked in order for RemoveExport to work, but we
6056
    # don't need to lock the instance itself, as nothing will happen to it (and
6057
    # we can remove exports also for a removed instance)
6058
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6059

    
6060
  def CheckPrereq(self):
6061
    """Check prerequisites.
6062
    """
6063
    pass
6064

    
6065
  def Exec(self, feedback_fn):
6066
    """Remove any export.
6067

6068
    """
6069
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6070
    # If the instance was not found we'll try with the name that was passed in.
6071
    # This will only work if it was an FQDN, though.
6072
    fqdn_warn = False
6073
    if not instance_name:
6074
      fqdn_warn = True
6075
      instance_name = self.op.instance_name
6076

    
6077
    exportlist = self.rpc.call_export_list(self.acquired_locks[
6078
      locking.LEVEL_NODE])
6079
    found = False
6080
    for node in exportlist:
6081
      if exportlist[node].failed:
6082
        self.LogWarning("Failed to query node %s, continuing" % node)
6083
        continue
6084
      if instance_name in exportlist[node].data:
6085
        found = True
6086
        result = self.rpc.call_export_remove(node, instance_name)
6087
        if result.failed or not result.data:
6088
          logging.error("Could not remove export for instance %s"
6089
                        " on node %s", instance_name, node)
6090

    
6091
    if fqdn_warn and not found:
6092
      feedback_fn("Export not found. If trying to remove an export belonging"
6093
                  " to a deleted instance please use its Fully Qualified"
6094
                  " Domain Name.")
6095

    
6096

    
6097
class TagsLU(NoHooksLU):
6098
  """Generic tags LU.
6099

6100
  This is an abstract class which is the parent of all the other tags LUs.
6101

6102
  """
6103

    
6104
  def ExpandNames(self):
6105
    self.needed_locks = {}
6106
    if self.op.kind == constants.TAG_NODE:
6107
      name = self.cfg.ExpandNodeName(self.op.name)
6108
      if name is None:
6109
        raise errors.OpPrereqError("Invalid node name (%s)" %
6110
                                   (self.op.name,))
6111
      self.op.name = name
6112
      self.needed_locks[locking.LEVEL_NODE] = name
6113
    elif self.op.kind == constants.TAG_INSTANCE:
6114
      name = self.cfg.ExpandInstanceName(self.op.name)
6115
      if name is None:
6116
        raise errors.OpPrereqError("Invalid instance name (%s)" %
6117
                                   (self.op.name,))
6118
      self.op.name = name
6119
      self.needed_locks[locking.LEVEL_INSTANCE] = name
6120

    
6121
  def CheckPrereq(self):
6122
    """Check prerequisites.
6123

6124
    """
6125
    if self.op.kind == constants.TAG_CLUSTER:
6126
      self.target = self.cfg.GetClusterInfo()
6127
    elif self.op.kind == constants.TAG_NODE:
6128
      self.target = self.cfg.GetNodeInfo(self.op.name)
6129
    elif self.op.kind == constants.TAG_INSTANCE:
6130
      self.target = self.cfg.GetInstanceInfo(self.op.name)
6131
    else:
6132
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6133
                                 str(self.op.kind))
6134

    
6135

    
6136
class LUGetTags(TagsLU):
6137
  """Returns the tags of a given object.
6138

6139
  """
6140
  _OP_REQP = ["kind", "name"]
6141
  REQ_BGL = False
6142

    
6143
  def Exec(self, feedback_fn):
6144
    """Returns the tag list.
6145

6146
    """
6147
    return list(self.target.GetTags())
6148

    
6149

    
6150
class LUSearchTags(NoHooksLU):
6151
  """Searches the tags for a given pattern.
6152

6153
  """
6154
  _OP_REQP = ["pattern"]
6155
  REQ_BGL = False
6156

    
6157
  def ExpandNames(self):
6158
    self.needed_locks = {}
6159

    
6160
  def CheckPrereq(self):
6161
    """Check prerequisites.
6162

6163
    This checks the pattern passed for validity by compiling it.
6164

6165
    """
6166
    try:
6167
      self.re = re.compile(self.op.pattern)
6168
    except re.error, err:
6169
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6170
                                 (self.op.pattern, err))
6171

    
6172
  def Exec(self, feedback_fn):
6173
    """Returns the tag list.
6174

6175
    """
6176
    cfg = self.cfg
6177
    tgts = [("/cluster", cfg.GetClusterInfo())]
6178
    ilist = cfg.GetAllInstancesInfo().values()
6179
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6180
    nlist = cfg.GetAllNodesInfo().values()
6181
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6182
    results = []
6183
    for path, target in tgts:
6184
      for tag in target.GetTags():
6185
        if self.re.search(tag):
6186
          results.append((path, tag))
6187
    return results
6188

    
6189

    
6190
class LUAddTags(TagsLU):
6191
  """Sets a tag on a given object.
6192

6193
  """
6194
  _OP_REQP = ["kind", "name", "tags"]
6195
  REQ_BGL = False
6196

    
6197
  def CheckPrereq(self):
6198
    """Check prerequisites.
6199

6200
    This checks the type and length of the tag name and value.
6201

6202
    """
6203
    TagsLU.CheckPrereq(self)
6204
    for tag in self.op.tags:
6205
      objects.TaggableObject.ValidateTag(tag)
6206

    
6207
  def Exec(self, feedback_fn):
6208
    """Sets the tag.
6209

6210
    """
6211
    try:
6212
      for tag in self.op.tags:
6213
        self.target.AddTag(tag)
6214
    except errors.TagError, err:
6215
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
6216
    try:
6217
      self.cfg.Update(self.target)
6218
    except errors.ConfigurationError:
6219
      raise errors.OpRetryError("There has been a modification to the"
6220
                                " config file and the operation has been"
6221
                                " aborted. Please retry.")
6222

    
6223

    
6224
class LUDelTags(TagsLU):
6225
  """Delete a list of tags from a given object.
6226

6227
  """
6228
  _OP_REQP = ["kind", "name", "tags"]
6229
  REQ_BGL = False
6230

    
6231
  def CheckPrereq(self):
6232
    """Check prerequisites.
6233

6234
    This checks that we have the given tag.
6235

6236
    """
6237
    TagsLU.CheckPrereq(self)
6238
    for tag in self.op.tags:
6239
      objects.TaggableObject.ValidateTag(tag)
6240
    del_tags = frozenset(self.op.tags)
6241
    cur_tags = self.target.GetTags()
6242
    if not del_tags <= cur_tags:
6243
      diff_tags = del_tags - cur_tags
6244
      diff_names = ["'%s'" % tag for tag in diff_tags]
6245
      diff_names.sort()
6246
      raise errors.OpPrereqError("Tag(s) %s not found" %
6247
                                 (",".join(diff_names)))
6248

    
6249
  def Exec(self, feedback_fn):
6250
    """Remove the tag from the object.
6251

6252
    """
6253
    for tag in self.op.tags:
6254
      self.target.RemoveTag(tag)
6255
    try:
6256
      self.cfg.Update(self.target)
6257
    except errors.ConfigurationError:
6258
      raise errors.OpRetryError("There has been a modification to the"
6259
                                " config file and the operation has been"
6260
                                " aborted. Please retry.")
6261

    
6262

    
6263
class LUTestDelay(NoHooksLU):
6264
  """Sleep for a specified amount of time.
6265

6266
  This LU sleeps on the master and/or nodes for a specified amount of
6267
  time.
6268

6269
  """
6270
  _OP_REQP = ["duration", "on_master", "on_nodes"]
6271
  REQ_BGL = False
6272

    
6273
  def ExpandNames(self):
6274
    """Expand names and set required locks.
6275

6276
    This expands the node list, if any.
6277

6278
    """
6279
    self.needed_locks = {}
6280
    if self.op.on_nodes:
6281
      # _GetWantedNodes can be used here, but is not always appropriate to use
6282
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6283
      # more information.
6284
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6285
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6286

    
6287
  def CheckPrereq(self):
6288
    """Check prerequisites.
6289

6290
    """
6291

    
6292
  def Exec(self, feedback_fn):
6293
    """Do the actual sleep.
6294

6295
    """
6296
    if self.op.on_master:
6297
      if not utils.TestDelay(self.op.duration):
6298
        raise errors.OpExecError("Error during master delay test")
6299
    if self.op.on_nodes:
6300
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6301
      if not result:
6302
        raise errors.OpExecError("Complete failure from rpc call")
6303
      for node, node_result in result.items():
6304
        node_result.Raise()
6305
        if not node_result.data:
6306
          raise errors.OpExecError("Failure during rpc call to node %s,"
6307
                                   " result: %s" % (node, node_result.data))
6308

    
6309

    
6310
class IAllocator(object):
6311
  """IAllocator framework.
6312

6313
  An IAllocator instance has three sets of attributes:
6314
    - cfg that is needed to query the cluster
6315
    - input data (all members of the _KEYS class attribute are required)
6316
    - four buffer attributes (in|out_data|text), that represent the
6317
      input (to the external script) in text and data structure format,
6318
      and the output from it, again in two formats
6319
    - the result variables from the script (success, info, nodes) for
6320
      easy usage
6321

6322
  """
6323
  _ALLO_KEYS = [
6324
    "mem_size", "disks", "disk_template",
6325
    "os", "tags", "nics", "vcpus", "hypervisor",
6326
    ]
6327
  _RELO_KEYS = [
6328
    "relocate_from",
6329
    ]
6330

    
6331
  def __init__(self, lu, mode, name, **kwargs):
6332
    self.lu = lu
6333
    # init buffer variables
6334
    self.in_text = self.out_text = self.in_data = self.out_data = None
6335
    # init all input fields so that pylint is happy
6336
    self.mode = mode
6337
    self.name = name
6338
    self.mem_size = self.disks = self.disk_template = None
6339
    self.os = self.tags = self.nics = self.vcpus = None
6340
    self.hypervisor = None
6341
    self.relocate_from = None
6342
    # computed fields
6343
    self.required_nodes = None
6344
    # init result fields
6345
    self.success = self.info = self.nodes = None
6346
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6347
      keyset = self._ALLO_KEYS
6348
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6349
      keyset = self._RELO_KEYS
6350
    else:
6351
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6352
                                   " IAllocator" % self.mode)
6353
    for key in kwargs:
6354
      if key not in keyset:
6355
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
6356
                                     " IAllocator" % key)
6357
      setattr(self, key, kwargs[key])
6358
    for key in keyset:
6359
      if key not in kwargs:
6360
        raise errors.ProgrammerError("Missing input parameter '%s' to"
6361
                                     " IAllocator" % key)
6362
    self._BuildInputData()
6363

    
6364
  def _ComputeClusterData(self):
6365
    """Compute the generic allocator input data.
6366

6367
    This is the data that is independent of the actual operation.
6368

6369
    """
6370
    cfg = self.lu.cfg
6371
    cluster_info = cfg.GetClusterInfo()
6372
    # cluster data
6373
    data = {
6374
      "version": 1,
6375
      "cluster_name": cfg.GetClusterName(),
6376
      "cluster_tags": list(cluster_info.GetTags()),
6377
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
6378
      # we don't have job IDs
6379
      }
6380
    iinfo = cfg.GetAllInstancesInfo().values()
6381
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6382

    
6383
    # node data
6384
    node_results = {}
6385
    node_list = cfg.GetNodeList()
6386

    
6387
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6388
      hypervisor_name = self.hypervisor
6389
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6390
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6391

    
6392
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6393
                                           hypervisor_name)
6394
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6395
                       cluster_info.enabled_hypervisors)
6396
    for nname in node_list:
6397
      ninfo = cfg.GetNodeInfo(nname)
6398
      node_data[nname].Raise()
6399
      if not isinstance(node_data[nname].data, dict):
6400
        raise errors.OpExecError("Can't get data for node %s" % nname)
6401
      remote_info = node_data[nname].data
6402
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
6403
                   'vg_size', 'vg_free', 'cpu_total']:
6404
        if attr not in remote_info:
6405
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
6406
                                   (nname, attr))
6407
        try:
6408
          remote_info[attr] = int(remote_info[attr])
6409
        except ValueError, err:
6410
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
6411
                                   " %s" % (nname, attr, str(err)))
6412
      # compute memory used by primary instances
6413
      i_p_mem = i_p_up_mem = 0
6414
      for iinfo, beinfo in i_list:
6415
        if iinfo.primary_node == nname:
6416
          i_p_mem += beinfo[constants.BE_MEMORY]
6417
          if iinfo.name not in node_iinfo[nname]:
6418
            i_used_mem = 0
6419
          else:
6420
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
6421
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6422
          remote_info['memory_free'] -= max(0, i_mem_diff)
6423

    
6424
          if iinfo.status == "up":
6425
            i_p_up_mem += beinfo[constants.BE_MEMORY]
6426

    
6427
      # compute memory used by instances
6428
      pnr = {
6429
        "tags": list(ninfo.GetTags()),
6430
        "total_memory": remote_info['memory_total'],
6431
        "reserved_memory": remote_info['memory_dom0'],
6432
        "free_memory": remote_info['memory_free'],
6433
        "i_pri_memory": i_p_mem,
6434
        "i_pri_up_memory": i_p_up_mem,
6435
        "total_disk": remote_info['vg_size'],
6436
        "free_disk": remote_info['vg_free'],
6437
        "primary_ip": ninfo.primary_ip,
6438
        "secondary_ip": ninfo.secondary_ip,
6439
        "total_cpus": remote_info['cpu_total'],
6440
        "offline": ninfo.offline,
6441
        }
6442
      node_results[nname] = pnr
6443
    data["nodes"] = node_results
6444

    
6445
    # instance data
6446
    instance_data = {}
6447
    for iinfo, beinfo in i_list:
6448
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6449
                  for n in iinfo.nics]
6450
      pir = {
6451
        "tags": list(iinfo.GetTags()),
6452
        "should_run": iinfo.status == "up",
6453
        "vcpus": beinfo[constants.BE_VCPUS],
6454
        "memory": beinfo[constants.BE_MEMORY],
6455
        "os": iinfo.os,
6456
        "nodes": list(iinfo.all_nodes),
6457
        "nics": nic_data,
6458
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
6459
        "disk_template": iinfo.disk_template,
6460
        "hypervisor": iinfo.hypervisor,
6461
        }
6462
      instance_data[iinfo.name] = pir
6463

    
6464
    data["instances"] = instance_data
6465

    
6466
    self.in_data = data
6467

    
6468
  def _AddNewInstance(self):
6469
    """Add new instance data to allocator structure.
6470

6471
    This in combination with _AllocatorGetClusterData will create the
6472
    correct structure needed as input for the allocator.
6473

6474
    The checks for the completeness of the opcode must have already been
6475
    done.
6476

6477
    """
6478
    data = self.in_data
6479
    if len(self.disks) != 2:
6480
      raise errors.OpExecError("Only two-disk configurations supported")
6481

    
6482
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6483

    
6484
    if self.disk_template in constants.DTS_NET_MIRROR:
6485
      self.required_nodes = 2
6486
    else:
6487
      self.required_nodes = 1
6488
    request = {
6489
      "type": "allocate",
6490
      "name": self.name,
6491
      "disk_template": self.disk_template,
6492
      "tags": self.tags,
6493
      "os": self.os,
6494
      "vcpus": self.vcpus,
6495
      "memory": self.mem_size,
6496
      "disks": self.disks,
6497
      "disk_space_total": disk_space,
6498
      "nics": self.nics,
6499
      "required_nodes": self.required_nodes,
6500
      }
6501
    data["request"] = request
6502

    
6503
  def _AddRelocateInstance(self):
6504
    """Add relocate instance data to allocator structure.
6505

6506
    This in combination with _IAllocatorGetClusterData will create the
6507
    correct structure needed as input for the allocator.
6508

6509
    The checks for the completeness of the opcode must have already been
6510
    done.
6511

6512
    """
6513
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6514
    if instance is None:
6515
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6516
                                   " IAllocator" % self.name)
6517

    
6518
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6519
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6520

    
6521
    if len(instance.secondary_nodes) != 1:
6522
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6523

    
6524
    self.required_nodes = 1
6525
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6526
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6527

    
6528
    request = {
6529
      "type": "relocate",
6530
      "name": self.name,
6531
      "disk_space_total": disk_space,
6532
      "required_nodes": self.required_nodes,
6533
      "relocate_from": self.relocate_from,
6534
      }
6535
    self.in_data["request"] = request
6536

    
6537
  def _BuildInputData(self):
6538
    """Build input data structures.
6539

6540
    """
6541
    self._ComputeClusterData()
6542

    
6543
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6544
      self._AddNewInstance()
6545
    else:
6546
      self._AddRelocateInstance()
6547

    
6548
    self.in_text = serializer.Dump(self.in_data)
6549

    
6550
  def Run(self, name, validate=True, call_fn=None):
6551
    """Run an instance allocator and return the results.
6552

6553
    """
6554
    if call_fn is None:
6555
      call_fn = self.lu.rpc.call_iallocator_runner
6556
    data = self.in_text
6557

    
6558
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6559
    result.Raise()
6560

    
6561
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6562
      raise errors.OpExecError("Invalid result from master iallocator runner")
6563

    
6564
    rcode, stdout, stderr, fail = result.data
6565

    
6566
    if rcode == constants.IARUN_NOTFOUND:
6567
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6568
    elif rcode == constants.IARUN_FAILURE:
6569
      raise errors.OpExecError("Instance allocator call failed: %s,"
6570
                               " output: %s" % (fail, stdout+stderr))
6571
    self.out_text = stdout
6572
    if validate:
6573
      self._ValidateResult()
6574

    
6575
  def _ValidateResult(self):
6576
    """Process the allocator results.
6577

6578
    This will process and if successful save the result in
6579
    self.out_data and the other parameters.
6580

6581
    """
6582
    try:
6583
      rdict = serializer.Load(self.out_text)
6584
    except Exception, err:
6585
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6586

    
6587
    if not isinstance(rdict, dict):
6588
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6589

    
6590
    for key in "success", "info", "nodes":
6591
      if key not in rdict:
6592
        raise errors.OpExecError("Can't parse iallocator results:"
6593
                                 " missing key '%s'" % key)
6594
      setattr(self, key, rdict[key])
6595

    
6596
    if not isinstance(rdict["nodes"], list):
6597
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6598
                               " is not a list")
6599
    self.out_data = rdict
6600

    
6601

    
6602
class LUTestAllocator(NoHooksLU):
6603
  """Run allocator tests.
6604

6605
  This LU runs the allocator tests
6606

6607
  """
6608
  _OP_REQP = ["direction", "mode", "name"]
6609

    
6610
  def CheckPrereq(self):
6611
    """Check prerequisites.
6612

6613
    This checks the opcode parameters depending on the director and mode test.
6614

6615
    """
6616
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6617
      for attr in ["name", "mem_size", "disks", "disk_template",
6618
                   "os", "tags", "nics", "vcpus"]:
6619
        if not hasattr(self.op, attr):
6620
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6621
                                     attr)
6622
      iname = self.cfg.ExpandInstanceName(self.op.name)
6623
      if iname is not None:
6624
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6625
                                   iname)
6626
      if not isinstance(self.op.nics, list):
6627
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6628
      for row in self.op.nics:
6629
        if (not isinstance(row, dict) or
6630
            "mac" not in row or
6631
            "ip" not in row or
6632
            "bridge" not in row):
6633
          raise errors.OpPrereqError("Invalid contents of the"
6634
                                     " 'nics' parameter")
6635
      if not isinstance(self.op.disks, list):
6636
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6637
      if len(self.op.disks) != 2:
6638
        raise errors.OpPrereqError("Only two-disk configurations supported")
6639
      for row in self.op.disks:
6640
        if (not isinstance(row, dict) or
6641
            "size" not in row or
6642
            not isinstance(row["size"], int) or
6643
            "mode" not in row or
6644
            row["mode"] not in ['r', 'w']):
6645
          raise errors.OpPrereqError("Invalid contents of the"
6646
                                     " 'disks' parameter")
6647
      if self.op.hypervisor is None:
6648
        self.op.hypervisor = self.cfg.GetHypervisorType()
6649
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6650
      if not hasattr(self.op, "name"):
6651
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6652
      fname = self.cfg.ExpandInstanceName(self.op.name)
6653
      if fname is None:
6654
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6655
                                   self.op.name)
6656
      self.op.name = fname
6657
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6658
    else:
6659
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6660
                                 self.op.mode)
6661

    
6662
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6663
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6664
        raise errors.OpPrereqError("Missing allocator name")
6665
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6666
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6667
                                 self.op.direction)
6668

    
6669
  def Exec(self, feedback_fn):
6670
    """Run the allocator test.
6671

6672
    """
6673
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6674
      ial = IAllocator(self,
6675
                       mode=self.op.mode,
6676
                       name=self.op.name,
6677
                       mem_size=self.op.mem_size,
6678
                       disks=self.op.disks,
6679
                       disk_template=self.op.disk_template,
6680
                       os=self.op.os,
6681
                       tags=self.op.tags,
6682
                       nics=self.op.nics,
6683
                       vcpus=self.op.vcpus,
6684
                       hypervisor=self.op.hypervisor,
6685
                       )
6686
    else:
6687
      ial = IAllocator(self,
6688
                       mode=self.op.mode,
6689
                       name=self.op.name,
6690
                       relocate_from=list(self.relocate_from),
6691
                       )
6692

    
6693
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6694
      result = ial.in_text
6695
    else:
6696
      ial.Run(self.op.allocator, validate=False)
6697
      result = ial.out_text
6698
    return result