Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ cbfc4681

History | View | Annotate | Download (215 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = lu.cfg.GetInstanceList()
396
  return utils.NiceSort(wanted)
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419
                          memory, vcpus, nics):
420
  """Builds instance related env variables for hooks
421

422
  This builds the hook environment from individual variables.
423

424
  @type name: string
425
  @param name: the name of the instance
426
  @type primary_node: string
427
  @param primary_node: the name of the instance's primary node
428
  @type secondary_nodes: list
429
  @param secondary_nodes: list of secondary nodes as strings
430
  @type os_type: string
431
  @param os_type: the name of the instance's OS
432
  @type status: string
433
  @param status: the desired status of the instances
434
  @type memory: string
435
  @param memory: the memory size of the instance
436
  @type vcpus: string
437
  @param vcpus: the count of VCPUs the instance has
438
  @type nics: list
439
  @param nics: list of tuples (ip, bridge, mac) representing
440
      the NICs the instance  has
441
  @rtype: dict
442
  @return: the hook environment for this instance
443

444
  """
445
  env = {
446
    "OP_TARGET": name,
447
    "INSTANCE_NAME": name,
448
    "INSTANCE_PRIMARY": primary_node,
449
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450
    "INSTANCE_OS_TYPE": os_type,
451
    "INSTANCE_STATUS": status,
452
    "INSTANCE_MEMORY": memory,
453
    "INSTANCE_VCPUS": vcpus,
454
  }
455

    
456
  if nics:
457
    nic_count = len(nics)
458
    for idx, (ip, bridge, mac) in enumerate(nics):
459
      if ip is None:
460
        ip = ""
461
      env["INSTANCE_NIC%d_IP" % idx] = ip
462
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
464
  else:
465
    nic_count = 0
466

    
467
  env["INSTANCE_NIC_COUNT"] = nic_count
468

    
469
  return env
470

    
471

    
472
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473
  """Builds instance related env variables for hooks from an object.
474

475
  @type lu: L{LogicalUnit}
476
  @param lu: the logical unit on whose behalf we execute
477
  @type instance: L{objects.Instance}
478
  @param instance: the instance for which we should build the
479
      environment
480
  @type override: dict
481
  @param override: dictionary with key/values that will override
482
      our values
483
  @rtype: dict
484
  @return: the hook environment dictionary
485

486
  """
487
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
488
  args = {
489
    'name': instance.name,
490
    'primary_node': instance.primary_node,
491
    'secondary_nodes': instance.secondary_nodes,
492
    'os_type': instance.os,
493
    'status': instance.os,
494
    'memory': bep[constants.BE_MEMORY],
495
    'vcpus': bep[constants.BE_VCPUS],
496
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
497
  }
498
  if override:
499
    args.update(override)
500
  return _BuildInstanceHookEnv(**args)
501

    
502

    
503
def _CheckInstanceBridgesExist(lu, instance):
504
  """Check that the brigdes needed by an instance exist.
505

506
  """
507
  # check bridges existance
508
  brlist = [nic.bridge for nic in instance.nics]
509
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
510
  result.Raise()
511
  if not result.data:
512
    raise errors.OpPrereqError("One or more target bridges %s does not"
513
                               " exist on destination node '%s'" %
514
                               (brlist, instance.primary_node))
515

    
516

    
517
class LUDestroyCluster(NoHooksLU):
518
  """Logical unit for destroying the cluster.
519

520
  """
521
  _OP_REQP = []
522

    
523
  def CheckPrereq(self):
524
    """Check prerequisites.
525

526
    This checks whether the cluster is empty.
527

528
    Any errors are signalled by raising errors.OpPrereqError.
529

530
    """
531
    master = self.cfg.GetMasterNode()
532

    
533
    nodelist = self.cfg.GetNodeList()
534
    if len(nodelist) != 1 or nodelist[0] != master:
535
      raise errors.OpPrereqError("There are still %d node(s) in"
536
                                 " this cluster." % (len(nodelist) - 1))
537
    instancelist = self.cfg.GetInstanceList()
538
    if instancelist:
539
      raise errors.OpPrereqError("There are still %d instance(s) in"
540
                                 " this cluster." % len(instancelist))
541

    
542
  def Exec(self, feedback_fn):
543
    """Destroys the cluster.
544

545
    """
546
    master = self.cfg.GetMasterNode()
547
    result = self.rpc.call_node_stop_master(master, False)
548
    result.Raise()
549
    if not result.data:
550
      raise errors.OpExecError("Could not disable the master role")
551
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552
    utils.CreateBackup(priv_key)
553
    utils.CreateBackup(pub_key)
554
    return master
555

    
556

    
557
class LUVerifyCluster(LogicalUnit):
558
  """Verifies the cluster status.
559

560
  """
561
  HPATH = "cluster-verify"
562
  HTYPE = constants.HTYPE_CLUSTER
563
  _OP_REQP = ["skip_checks"]
564
  REQ_BGL = False
565

    
566
  def ExpandNames(self):
567
    self.needed_locks = {
568
      locking.LEVEL_NODE: locking.ALL_SET,
569
      locking.LEVEL_INSTANCE: locking.ALL_SET,
570
    }
571
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
572

    
573
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574
                  node_result, feedback_fn, master_files):
575
    """Run multiple tests against a node.
576

577
    Test list:
578

579
      - compares ganeti version
580
      - checks vg existance and size > 20G
581
      - checks config file checksum
582
      - checks ssh to other nodes
583

584
    @type nodeinfo: L{objects.Node}
585
    @param nodeinfo: the node to check
586
    @param file_list: required list of files
587
    @param local_cksum: dictionary of local files and their checksums
588
    @param node_result: the results from the node
589
    @param feedback_fn: function used to accumulate results
590
    @param master_files: list of files that only masters should have
591

592
    """
593
    node = nodeinfo.name
594

    
595
    # main result, node_result should be a non-empty dict
596
    if not node_result or not isinstance(node_result, dict):
597
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
598
      return True
599

    
600
    # compares ganeti version
601
    local_version = constants.PROTOCOL_VERSION
602
    remote_version = node_result.get('version', None)
603
    if not remote_version:
604
      feedback_fn("  - ERROR: connection to %s failed" % (node))
605
      return True
606

    
607
    if local_version != remote_version:
608
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
609
                      (local_version, node, remote_version))
610
      return True
611

    
612
    # checks vg existance and size > 20G
613

    
614
    bad = False
615
    vglist = node_result.get(constants.NV_VGLIST, None)
616
    if not vglist:
617
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
618
                      (node,))
619
      bad = True
620
    else:
621
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622
                                            constants.MIN_VG_SIZE)
623
      if vgstatus:
624
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
625
        bad = True
626

    
627
    # checks config file checksum
628

    
629
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
630
    if not isinstance(remote_cksum, dict):
631
      bad = True
632
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
633
    else:
634
      for file_name in file_list:
635
        node_is_mc = nodeinfo.master_candidate
636
        must_have_file = file_name not in master_files
637
        if file_name not in remote_cksum:
638
          if node_is_mc or must_have_file:
639
            bad = True
640
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
641
        elif remote_cksum[file_name] != local_cksum[file_name]:
642
          if node_is_mc or must_have_file:
643
            bad = True
644
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
645
          else:
646
            # not candidate and this is not a must-have file
647
            bad = True
648
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
649
                        " '%s'" % file_name)
650
        else:
651
          # all good, except non-master/non-must have combination
652
          if not node_is_mc and not must_have_file:
653
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
654
                        " candidates" % file_name)
655

    
656
    # checks ssh to any
657

    
658
    if constants.NV_NODELIST not in node_result:
659
      bad = True
660
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
661
    else:
662
      if node_result[constants.NV_NODELIST]:
663
        bad = True
664
        for node in node_result[constants.NV_NODELIST]:
665
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
666
                          (node, node_result[constants.NV_NODELIST][node]))
667

    
668
    if constants.NV_NODENETTEST not in node_result:
669
      bad = True
670
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
671
    else:
672
      if node_result[constants.NV_NODENETTEST]:
673
        bad = True
674
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
675
        for node in nlist:
676
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
677
                          (node, node_result[constants.NV_NODENETTEST][node]))
678

    
679
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680
    if isinstance(hyp_result, dict):
681
      for hv_name, hv_result in hyp_result.iteritems():
682
        if hv_result is not None:
683
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
684
                      (hv_name, hv_result))
685
    return bad
686

    
687
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688
                      node_instance, feedback_fn):
689
    """Verify an instance.
690

691
    This function checks to see if the required block devices are
692
    available on the instance's node.
693

694
    """
695
    bad = False
696

    
697
    node_current = instanceconfig.primary_node
698

    
699
    node_vol_should = {}
700
    instanceconfig.MapLVsByNode(node_vol_should)
701

    
702
    for node in node_vol_should:
703
      for volume in node_vol_should[node]:
704
        if node not in node_vol_is or volume not in node_vol_is[node]:
705
          feedback_fn("  - ERROR: volume %s missing on node %s" %
706
                          (volume, node))
707
          bad = True
708

    
709
    if not instanceconfig.status == 'down':
710
      if (node_current not in node_instance or
711
          not instance in node_instance[node_current]):
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758
    """Verify N+1 Memory Resilience.
759

760
    Check that if one single node dies we can still start all the instances it
761
    was primary for.
762

763
    """
764
    bad = False
765

    
766
    for node, nodeinfo in node_info.iteritems():
767
      # This code checks that every node which is now listed as secondary has
768
      # enough memory to host all instances it is supposed to should a single
769
      # other node in the cluster fail.
770
      # FIXME: not ready for failover to an arbitrary node
771
      # FIXME: does not support file-backed instances
772
      # WARNING: we currently take into account down instances as well as up
773
      # ones, considering that even if they're down someone might want to start
774
      # them even in the event of a node failure.
775
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776
        needed_mem = 0
777
        for instance in instances:
778
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779
          if bep[constants.BE_AUTO_BALANCE]:
780
            needed_mem += bep[constants.BE_MEMORY]
781
        if nodeinfo['mfree'] < needed_mem:
782
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
783
                      " failovers should node %s fail" % (node, prinode))
784
          bad = True
785
    return bad
786

    
787
  def CheckPrereq(self):
788
    """Check prerequisites.
789

790
    Transform the list of checks we're going to skip into a set and check that
791
    all its members are valid.
792

793
    """
794
    self.skip_set = frozenset(self.op.skip_checks)
795
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
797

    
798
  def BuildHooksEnv(self):
799
    """Build hooks env.
800

801
    Cluster-Verify hooks just rone in the post phase and their failure makes
802
    the output be logged in the verify output and the verification to fail.
803

804
    """
805
    all_nodes = self.cfg.GetNodeList()
806
    # TODO: populate the environment with useful information for verify hooks
807
    env = {}
808
    return env, [], all_nodes
809

    
810
  def Exec(self, feedback_fn):
811
    """Verify integrity of cluster, performing various test on nodes.
812

813
    """
814
    bad = False
815
    feedback_fn("* Verifying global settings")
816
    for msg in self.cfg.VerifyConfig():
817
      feedback_fn("  - ERROR: %s" % msg)
818

    
819
    vg_name = self.cfg.GetVGName()
820
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
822
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824
    i_non_redundant = [] # Non redundant instances
825
    i_non_a_balanced = [] # Non auto-balanced instances
826
    node_volume = {}
827
    node_instance = {}
828
    node_info = {}
829
    instance_cfg = {}
830

    
831
    # FIXME: verify OS list
832
    # do local checksums
833
    master_files = [constants.CLUSTER_CONF_FILE]
834

    
835
    file_names = ssconf.SimpleStore().GetFileList()
836
    file_names.append(constants.SSL_CERT_FILE)
837
    file_names.extend(master_files)
838

    
839
    local_checksums = utils.FingerprintFiles(file_names)
840

    
841
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842
    node_verify_param = {
843
      constants.NV_FILELIST: file_names,
844
      constants.NV_NODELIST: nodelist,
845
      constants.NV_HYPERVISOR: hypervisors,
846
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847
                                  node.secondary_ip) for node in nodeinfo],
848
      constants.NV_LVLIST: vg_name,
849
      constants.NV_INSTANCELIST: hypervisors,
850
      constants.NV_VGLIST: None,
851
      constants.NV_VERSION: None,
852
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
853
      }
854
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855
                                           self.cfg.GetClusterName())
856

    
857
    cluster = self.cfg.GetClusterInfo()
858
    master_node = self.cfg.GetMasterNode()
859
    for node_i in nodeinfo:
860
      node = node_i.name
861
      nresult = all_nvinfo[node].data
862

    
863
      if node == master_node:
864
        ntype = "master"
865
      elif node_i.master_candidate:
866
        ntype = "master candidate"
867
      else:
868
        ntype = "regular"
869
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
870

    
871
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
872
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
873
        bad = True
874
        continue
875

    
876
      result = self._VerifyNode(node_i, file_names, local_checksums,
877
                                nresult, feedback_fn, master_files)
878
      bad = bad or result
879

    
880
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881
      if isinstance(lvdata, basestring):
882
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
883
                    (node, lvdata.encode('string_escape')))
884
        bad = True
885
        node_volume[node] = {}
886
      elif not isinstance(lvdata, dict):
887
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
888
        bad = True
889
        continue
890
      else:
891
        node_volume[node] = lvdata
892

    
893
      # node_instance
894
      idata = nresult.get(constants.NV_INSTANCELIST, None)
895
      if not isinstance(idata, list):
896
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
897
                    (node,))
898
        bad = True
899
        continue
900

    
901
      node_instance[node] = idata
902

    
903
      # node_info
904
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
905
      if not isinstance(nodeinfo, dict):
906
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
907
        bad = True
908
        continue
909

    
910
      try:
911
        node_info[node] = {
912
          "mfree": int(nodeinfo['memory_free']),
913
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
914
          "pinst": [],
915
          "sinst": [],
916
          # dictionary holding all instances this node is secondary for,
917
          # grouped by their primary node. Each key is a cluster node, and each
918
          # value is a list of instances which have the key as primary and the
919
          # current node as secondary.  this is handy to calculate N+1 memory
920
          # availability if you can only failover from a primary to its
921
          # secondary.
922
          "sinst-by-pnode": {},
923
        }
924
      except ValueError:
925
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926
        bad = True
927
        continue
928

    
929
    node_vol_should = {}
930

    
931
    for instance in instancelist:
932
      feedback_fn("* Verifying instance %s" % instance)
933
      inst_config = self.cfg.GetInstanceInfo(instance)
934
      result =  self._VerifyInstance(instance, inst_config, node_volume,
935
                                     node_instance, feedback_fn)
936
      bad = bad or result
937

    
938
      inst_config.MapLVsByNode(node_vol_should)
939

    
940
      instance_cfg[instance] = inst_config
941

    
942
      pnode = inst_config.primary_node
943
      if pnode in node_info:
944
        node_info[pnode]['pinst'].append(instance)
945
      else:
946
        feedback_fn("  - ERROR: instance %s, connection to primary node"
947
                    " %s failed" % (instance, pnode))
948
        bad = True
949

    
950
      # If the instance is non-redundant we cannot survive losing its primary
951
      # node, so we are not N+1 compliant. On the other hand we have no disk
952
      # templates with more than one secondary so that situation is not well
953
      # supported either.
954
      # FIXME: does not support file-backed instances
955
      if len(inst_config.secondary_nodes) == 0:
956
        i_non_redundant.append(instance)
957
      elif len(inst_config.secondary_nodes) > 1:
958
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
959
                    % instance)
960

    
961
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962
        i_non_a_balanced.append(instance)
963

    
964
      for snode in inst_config.secondary_nodes:
965
        if snode in node_info:
966
          node_info[snode]['sinst'].append(instance)
967
          if pnode not in node_info[snode]['sinst-by-pnode']:
968
            node_info[snode]['sinst-by-pnode'][pnode] = []
969
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
970
        else:
971
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
972
                      " %s failed" % (instance, snode))
973

    
974
    feedback_fn("* Verifying orphan volumes")
975
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976
                                       feedback_fn)
977
    bad = bad or result
978

    
979
    feedback_fn("* Verifying remaining instances")
980
    result = self._VerifyOrphanInstances(instancelist, node_instance,
981
                                         feedback_fn)
982
    bad = bad or result
983

    
984
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985
      feedback_fn("* Verifying N+1 Memory redundancy")
986
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
987
      bad = bad or result
988

    
989
    feedback_fn("* Other Notes")
990
    if i_non_redundant:
991
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
992
                  % len(i_non_redundant))
993

    
994
    if i_non_a_balanced:
995
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
996
                  % len(i_non_a_balanced))
997

    
998
    return not bad
999

    
1000
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001
    """Analize the post-hooks' result
1002

1003
    This method analyses the hook result, handles it, and sends some
1004
    nicely-formatted feedback back to the user.
1005

1006
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008
    @param hooks_results: the results of the multi-node hooks rpc call
1009
    @param feedback_fn: function used send feedback back to the caller
1010
    @param lu_result: previous Exec result
1011
    @return: the new Exec result, based on the previous result
1012
        and hook results
1013

1014
    """
1015
    # We only really run POST phase hooks, and are only interested in
1016
    # their results
1017
    if phase == constants.HOOKS_PHASE_POST:
1018
      # Used to change hooks' output to proper indentation
1019
      indent_re = re.compile('^', re.M)
1020
      feedback_fn("* Hooks Results")
1021
      if not hooks_results:
1022
        feedback_fn("  - ERROR: general communication failure")
1023
        lu_result = 1
1024
      else:
1025
        for node_name in hooks_results:
1026
          show_node_header = True
1027
          res = hooks_results[node_name]
1028
          if res.failed or res.data is False or not isinstance(res.data, list):
1029
            feedback_fn("    Communication failure in hooks execution")
1030
            lu_result = 1
1031
            continue
1032
          for script, hkr, output in res.data:
1033
            if hkr == constants.HKR_FAIL:
1034
              # The node header is only shown once, if there are
1035
              # failing hooks on that node
1036
              if show_node_header:
1037
                feedback_fn("  Node %s:" % node_name)
1038
                show_node_header = False
1039
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1040
              output = indent_re.sub('      ', output)
1041
              feedback_fn("%s" % output)
1042
              lu_result = 1
1043

    
1044
      return lu_result
1045

    
1046

    
1047
class LUVerifyDisks(NoHooksLU):
1048
  """Verifies the cluster disks status.
1049

1050
  """
1051
  _OP_REQP = []
1052
  REQ_BGL = False
1053

    
1054
  def ExpandNames(self):
1055
    self.needed_locks = {
1056
      locking.LEVEL_NODE: locking.ALL_SET,
1057
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1058
    }
1059
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1060

    
1061
  def CheckPrereq(self):
1062
    """Check prerequisites.
1063

1064
    This has no prerequisites.
1065

1066
    """
1067
    pass
1068

    
1069
  def Exec(self, feedback_fn):
1070
    """Verify integrity of cluster disks.
1071

1072
    """
1073
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1074

    
1075
    vg_name = self.cfg.GetVGName()
1076
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1077
    instances = [self.cfg.GetInstanceInfo(name)
1078
                 for name in self.cfg.GetInstanceList()]
1079

    
1080
    nv_dict = {}
1081
    for inst in instances:
1082
      inst_lvs = {}
1083
      if (inst.status != "up" or
1084
          inst.disk_template not in constants.DTS_NET_MIRROR):
1085
        continue
1086
      inst.MapLVsByNode(inst_lvs)
1087
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088
      for node, vol_list in inst_lvs.iteritems():
1089
        for vol in vol_list:
1090
          nv_dict[(node, vol)] = inst
1091

    
1092
    if not nv_dict:
1093
      return result
1094

    
1095
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1096

    
1097
    to_act = set()
1098
    for node in nodes:
1099
      # node_volume
1100
      lvs = node_lvs[node]
1101
      if lvs.failed:
1102
        self.LogWarning("Connection to node %s failed: %s" %
1103
                        (node, lvs.data))
1104
        continue
1105
      lvs = lvs.data
1106
      if isinstance(lvs, basestring):
1107
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108
        res_nlvm[node] = lvs
1109
      elif not isinstance(lvs, dict):
1110
        logging.warning("Connection to node %s failed or invalid data"
1111
                        " returned", node)
1112
        res_nodes.append(node)
1113
        continue
1114

    
1115
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116
        inst = nv_dict.pop((node, lv_name), None)
1117
        if (not lv_online and inst is not None
1118
            and inst.name not in res_instances):
1119
          res_instances.append(inst.name)
1120

    
1121
    # any leftover items in nv_dict are missing LVs, let's arrange the
1122
    # data better
1123
    for key, inst in nv_dict.iteritems():
1124
      if inst.name not in res_missing:
1125
        res_missing[inst.name] = []
1126
      res_missing[inst.name].append(key)
1127

    
1128
    return result
1129

    
1130

    
1131
class LURenameCluster(LogicalUnit):
1132
  """Rename the cluster.
1133

1134
  """
1135
  HPATH = "cluster-rename"
1136
  HTYPE = constants.HTYPE_CLUSTER
1137
  _OP_REQP = ["name"]
1138

    
1139
  def BuildHooksEnv(self):
1140
    """Build hooks env.
1141

1142
    """
1143
    env = {
1144
      "OP_TARGET": self.cfg.GetClusterName(),
1145
      "NEW_NAME": self.op.name,
1146
      }
1147
    mn = self.cfg.GetMasterNode()
1148
    return env, [mn], [mn]
1149

    
1150
  def CheckPrereq(self):
1151
    """Verify that the passed name is a valid one.
1152

1153
    """
1154
    hostname = utils.HostInfo(self.op.name)
1155

    
1156
    new_name = hostname.name
1157
    self.ip = new_ip = hostname.ip
1158
    old_name = self.cfg.GetClusterName()
1159
    old_ip = self.cfg.GetMasterIP()
1160
    if new_name == old_name and new_ip == old_ip:
1161
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162
                                 " cluster has changed")
1163
    if new_ip != old_ip:
1164
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166
                                   " reachable on the network. Aborting." %
1167
                                   new_ip)
1168

    
1169
    self.op.name = new_name
1170

    
1171
  def Exec(self, feedback_fn):
1172
    """Rename the cluster.
1173

1174
    """
1175
    clustername = self.op.name
1176
    ip = self.ip
1177

    
1178
    # shutdown the master IP
1179
    master = self.cfg.GetMasterNode()
1180
    result = self.rpc.call_node_stop_master(master, False)
1181
    if result.failed or not result.data:
1182
      raise errors.OpExecError("Could not disable the master role")
1183

    
1184
    try:
1185
      cluster = self.cfg.GetClusterInfo()
1186
      cluster.cluster_name = clustername
1187
      cluster.master_ip = ip
1188
      self.cfg.Update(cluster)
1189

    
1190
      # update the known hosts file
1191
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192
      node_list = self.cfg.GetNodeList()
1193
      try:
1194
        node_list.remove(master)
1195
      except ValueError:
1196
        pass
1197
      result = self.rpc.call_upload_file(node_list,
1198
                                         constants.SSH_KNOWN_HOSTS_FILE)
1199
      for to_node, to_result in result.iteritems():
1200
        if to_result.failed or not to_result.data:
1201
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1202

    
1203
    finally:
1204
      result = self.rpc.call_node_start_master(master, False)
1205
      if result.failed or not result.data:
1206
        self.LogWarning("Could not re-enable the master role on"
1207
                        " the master, please restart manually.")
1208

    
1209

    
1210
def _RecursiveCheckIfLVMBased(disk):
1211
  """Check if the given disk or its children are lvm-based.
1212

1213
  @type disk: L{objects.Disk}
1214
  @param disk: the disk to check
1215
  @rtype: booleean
1216
  @return: boolean indicating whether a LD_LV dev_type was found or not
1217

1218
  """
1219
  if disk.children:
1220
    for chdisk in disk.children:
1221
      if _RecursiveCheckIfLVMBased(chdisk):
1222
        return True
1223
  return disk.dev_type == constants.LD_LV
1224

    
1225

    
1226
class LUSetClusterParams(LogicalUnit):
1227
  """Change the parameters of the cluster.
1228

1229
  """
1230
  HPATH = "cluster-modify"
1231
  HTYPE = constants.HTYPE_CLUSTER
1232
  _OP_REQP = []
1233
  REQ_BGL = False
1234

    
1235
  def CheckParameters(self):
1236
    """Check parameters
1237

1238
    """
1239
    if not hasattr(self.op, "candidate_pool_size"):
1240
      self.op.candidate_pool_size = None
1241
    if self.op.candidate_pool_size is not None:
1242
      try:
1243
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244
      except ValueError, err:
1245
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1246
                                   str(err))
1247
      if self.op.candidate_pool_size < 1:
1248
        raise errors.OpPrereqError("At least one master candidate needed")
1249

    
1250
  def ExpandNames(self):
1251
    # FIXME: in the future maybe other cluster params won't require checking on
1252
    # all nodes to be modified.
1253
    self.needed_locks = {
1254
      locking.LEVEL_NODE: locking.ALL_SET,
1255
    }
1256
    self.share_locks[locking.LEVEL_NODE] = 1
1257

    
1258
  def BuildHooksEnv(self):
1259
    """Build hooks env.
1260

1261
    """
1262
    env = {
1263
      "OP_TARGET": self.cfg.GetClusterName(),
1264
      "NEW_VG_NAME": self.op.vg_name,
1265
      }
1266
    mn = self.cfg.GetMasterNode()
1267
    return env, [mn], [mn]
1268

    
1269
  def CheckPrereq(self):
1270
    """Check prerequisites.
1271

1272
    This checks whether the given params don't conflict and
1273
    if the given volume group is valid.
1274

1275
    """
1276
    # FIXME: This only works because there is only one parameter that can be
1277
    # changed or removed.
1278
    if self.op.vg_name is not None and not self.op.vg_name:
1279
      instances = self.cfg.GetAllInstancesInfo().values()
1280
      for inst in instances:
1281
        for disk in inst.disks:
1282
          if _RecursiveCheckIfLVMBased(disk):
1283
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1284
                                       " lvm-based instances exist")
1285

    
1286
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1287

    
1288
    # if vg_name not None, checks given volume group on all nodes
1289
    if self.op.vg_name:
1290
      vglist = self.rpc.call_vg_list(node_list)
1291
      for node in node_list:
1292
        if vglist[node].failed:
1293
          # ignoring down node
1294
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1295
          continue
1296
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1297
                                              self.op.vg_name,
1298
                                              constants.MIN_VG_SIZE)
1299
        if vgstatus:
1300
          raise errors.OpPrereqError("Error on node '%s': %s" %
1301
                                     (node, vgstatus))
1302

    
1303
    self.cluster = cluster = self.cfg.GetClusterInfo()
1304
    # validate beparams changes
1305
    if self.op.beparams:
1306
      utils.CheckBEParams(self.op.beparams)
1307
      self.new_beparams = cluster.FillDict(
1308
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1309

    
1310
    # hypervisor list/parameters
1311
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312
    if self.op.hvparams:
1313
      if not isinstance(self.op.hvparams, dict):
1314
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315
      for hv_name, hv_dict in self.op.hvparams.items():
1316
        if hv_name not in self.new_hvparams:
1317
          self.new_hvparams[hv_name] = hv_dict
1318
        else:
1319
          self.new_hvparams[hv_name].update(hv_dict)
1320

    
1321
    if self.op.enabled_hypervisors is not None:
1322
      self.hv_list = self.op.enabled_hypervisors
1323
    else:
1324
      self.hv_list = cluster.enabled_hypervisors
1325

    
1326
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327
      # either the enabled list has changed, or the parameters have, validate
1328
      for hv_name, hv_params in self.new_hvparams.items():
1329
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330
            (self.op.enabled_hypervisors and
1331
             hv_name in self.op.enabled_hypervisors)):
1332
          # either this is a new hypervisor, or its parameters have changed
1333
          hv_class = hypervisor.GetHypervisor(hv_name)
1334
          hv_class.CheckParameterSyntax(hv_params)
1335
          _CheckHVParams(self, node_list, hv_name, hv_params)
1336

    
1337
  def Exec(self, feedback_fn):
1338
    """Change the parameters of the cluster.
1339

1340
    """
1341
    if self.op.vg_name is not None:
1342
      if self.op.vg_name != self.cfg.GetVGName():
1343
        self.cfg.SetVGName(self.op.vg_name)
1344
      else:
1345
        feedback_fn("Cluster LVM configuration already in desired"
1346
                    " state, not changing")
1347
    if self.op.hvparams:
1348
      self.cluster.hvparams = self.new_hvparams
1349
    if self.op.enabled_hypervisors is not None:
1350
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351
    if self.op.beparams:
1352
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353
    if self.op.candidate_pool_size is not None:
1354
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1355

    
1356
    self.cfg.Update(self.cluster)
1357

    
1358
    # we want to update nodes after the cluster so that if any errors
1359
    # happen, we have recorded and saved the cluster info
1360
    if self.op.candidate_pool_size is not None:
1361
      node_info = self.cfg.GetAllNodesInfo().values()
1362
      num_candidates = len([node for node in node_info
1363
                            if node.master_candidate])
1364
      num_nodes = len(node_info)
1365
      if num_candidates < self.op.candidate_pool_size:
1366
        random.shuffle(node_info)
1367
        for node in node_info:
1368
          if num_candidates >= self.op.candidate_pool_size:
1369
            break
1370
          if node.master_candidate:
1371
            continue
1372
          node.master_candidate = True
1373
          self.LogInfo("Promoting node %s to master candidate", node.name)
1374
          self.cfg.Update(node)
1375
          self.context.ReaddNode(node)
1376
          num_candidates += 1
1377
      elif num_candidates > self.op.candidate_pool_size:
1378
        self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379
                     " of candidate_pool_size (%d)" %
1380
                     (num_candidates, self.op.candidate_pool_size))
1381

    
1382

    
1383
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384
  """Sleep and poll for an instance's disk to sync.
1385

1386
  """
1387
  if not instance.disks:
1388
    return True
1389

    
1390
  if not oneshot:
1391
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1392

    
1393
  node = instance.primary_node
1394

    
1395
  for dev in instance.disks:
1396
    lu.cfg.SetDiskID(dev, node)
1397

    
1398
  retries = 0
1399
  while True:
1400
    max_time = 0
1401
    done = True
1402
    cumul_degraded = False
1403
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404
    if rstats.failed or not rstats.data:
1405
      lu.LogWarning("Can't get any data from node %s", node)
1406
      retries += 1
1407
      if retries >= 10:
1408
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1409
                                 " aborting." % node)
1410
      time.sleep(6)
1411
      continue
1412
    rstats = rstats.data
1413
    retries = 0
1414
    for i in range(len(rstats)):
1415
      mstat = rstats[i]
1416
      if mstat is None:
1417
        lu.LogWarning("Can't compute data for node %s/%s",
1418
                           node, instance.disks[i].iv_name)
1419
        continue
1420
      # we ignore the ldisk parameter
1421
      perc_done, est_time, is_degraded, _ = mstat
1422
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423
      if perc_done is not None:
1424
        done = False
1425
        if est_time is not None:
1426
          rem_time = "%d estimated seconds remaining" % est_time
1427
          max_time = est_time
1428
        else:
1429
          rem_time = "no time estimate"
1430
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431
                        (instance.disks[i].iv_name, perc_done, rem_time))
1432
    if done or oneshot:
1433
      break
1434

    
1435
    time.sleep(min(60, max_time))
1436

    
1437
  if done:
1438
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439
  return not cumul_degraded
1440

    
1441

    
1442
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443
  """Check that mirrors are not degraded.
1444

1445
  The ldisk parameter, if True, will change the test from the
1446
  is_degraded attribute (which represents overall non-ok status for
1447
  the device(s)) to the ldisk (representing the local storage status).
1448

1449
  """
1450
  lu.cfg.SetDiskID(dev, node)
1451
  if ldisk:
1452
    idx = 6
1453
  else:
1454
    idx = 5
1455

    
1456
  result = True
1457
  if on_primary or dev.AssembleOnSecondary():
1458
    rstats = lu.rpc.call_blockdev_find(node, dev)
1459
    if rstats.failed or not rstats.data:
1460
      logging.warning("Node %s: disk degraded, not found or node down", node)
1461
      result = False
1462
    else:
1463
      result = result and (not rstats.data[idx])
1464
  if dev.children:
1465
    for child in dev.children:
1466
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1467

    
1468
  return result
1469

    
1470

    
1471
class LUDiagnoseOS(NoHooksLU):
1472
  """Logical unit for OS diagnose/query.
1473

1474
  """
1475
  _OP_REQP = ["output_fields", "names"]
1476
  REQ_BGL = False
1477
  _FIELDS_STATIC = utils.FieldSet()
1478
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1479

    
1480
  def ExpandNames(self):
1481
    if self.op.names:
1482
      raise errors.OpPrereqError("Selective OS query not supported")
1483

    
1484
    _CheckOutputFields(static=self._FIELDS_STATIC,
1485
                       dynamic=self._FIELDS_DYNAMIC,
1486
                       selected=self.op.output_fields)
1487

    
1488
    # Lock all nodes, in shared mode
1489
    self.needed_locks = {}
1490
    self.share_locks[locking.LEVEL_NODE] = 1
1491
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1492

    
1493
  def CheckPrereq(self):
1494
    """Check prerequisites.
1495

1496
    """
1497

    
1498
  @staticmethod
1499
  def _DiagnoseByOS(node_list, rlist):
1500
    """Remaps a per-node return list into an a per-os per-node dictionary
1501

1502
    @param node_list: a list with the names of all nodes
1503
    @param rlist: a map with node names as keys and OS objects as values
1504

1505
    @rtype: dict
1506
    @returns: a dictionary with osnames as keys and as value another map, with
1507
        nodes as keys and list of OS objects as values, eg::
1508

1509
          {"debian-etch": {"node1": [<object>,...],
1510
                           "node2": [<object>,]}
1511
          }
1512

1513
    """
1514
    all_os = {}
1515
    for node_name, nr in rlist.iteritems():
1516
      if nr.failed or not nr.data:
1517
        continue
1518
      for os_obj in nr.data:
1519
        if os_obj.name not in all_os:
1520
          # build a list of nodes for this os containing empty lists
1521
          # for each node in node_list
1522
          all_os[os_obj.name] = {}
1523
          for nname in node_list:
1524
            all_os[os_obj.name][nname] = []
1525
        all_os[os_obj.name][node_name].append(os_obj)
1526
    return all_os
1527

    
1528
  def Exec(self, feedback_fn):
1529
    """Compute the list of OSes.
1530

1531
    """
1532
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1533
    node_data = self.rpc.call_os_diagnose(node_list)
1534
    if node_data == False:
1535
      raise errors.OpExecError("Can't gather the list of OSes")
1536
    pol = self._DiagnoseByOS(node_list, node_data)
1537
    output = []
1538
    for os_name, os_data in pol.iteritems():
1539
      row = []
1540
      for field in self.op.output_fields:
1541
        if field == "name":
1542
          val = os_name
1543
        elif field == "valid":
1544
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1545
        elif field == "node_status":
1546
          val = {}
1547
          for node_name, nos_list in os_data.iteritems():
1548
            val[node_name] = [(v.status, v.path) for v in nos_list]
1549
        else:
1550
          raise errors.ParameterError(field)
1551
        row.append(val)
1552
      output.append(row)
1553

    
1554
    return output
1555

    
1556

    
1557
class LURemoveNode(LogicalUnit):
1558
  """Logical unit for removing a node.
1559

1560
  """
1561
  HPATH = "node-remove"
1562
  HTYPE = constants.HTYPE_NODE
1563
  _OP_REQP = ["node_name"]
1564

    
1565
  def BuildHooksEnv(self):
1566
    """Build hooks env.
1567

1568
    This doesn't run on the target node in the pre phase as a failed
1569
    node would then be impossible to remove.
1570

1571
    """
1572
    env = {
1573
      "OP_TARGET": self.op.node_name,
1574
      "NODE_NAME": self.op.node_name,
1575
      }
1576
    all_nodes = self.cfg.GetNodeList()
1577
    all_nodes.remove(self.op.node_name)
1578
    return env, all_nodes, all_nodes
1579

    
1580
  def CheckPrereq(self):
1581
    """Check prerequisites.
1582

1583
    This checks:
1584
     - the node exists in the configuration
1585
     - it does not have primary or secondary instances
1586
     - it's not the master
1587

1588
    Any errors are signalled by raising errors.OpPrereqError.
1589

1590
    """
1591
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1592
    if node is None:
1593
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1594

    
1595
    instance_list = self.cfg.GetInstanceList()
1596

    
1597
    masternode = self.cfg.GetMasterNode()
1598
    if node.name == masternode:
1599
      raise errors.OpPrereqError("Node is the master node,"
1600
                                 " you need to failover first.")
1601

    
1602
    for instance_name in instance_list:
1603
      instance = self.cfg.GetInstanceInfo(instance_name)
1604
      if node.name == instance.primary_node:
1605
        raise errors.OpPrereqError("Instance %s still running on the node,"
1606
                                   " please remove first." % instance_name)
1607
      if node.name in instance.secondary_nodes:
1608
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609
                                   " please remove first." % instance_name)
1610
    self.op.node_name = node.name
1611
    self.node = node
1612

    
1613
  def Exec(self, feedback_fn):
1614
    """Removes the node from the cluster.
1615

1616
    """
1617
    node = self.node
1618
    logging.info("Stopping the node daemon and removing configs from node %s",
1619
                 node.name)
1620

    
1621
    self.context.RemoveNode(node.name)
1622

    
1623
    self.rpc.call_node_leave_cluster(node.name)
1624

    
1625
    # Promote nodes to master candidate as needed
1626
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1627
    node_info = self.cfg.GetAllNodesInfo().values()
1628
    num_candidates = len([n for n in node_info
1629
                          if n.master_candidate])
1630
    num_nodes = len(node_info)
1631
    random.shuffle(node_info)
1632
    for node in node_info:
1633
      if num_candidates >= cp_size or num_candidates >= num_nodes:
1634
        break
1635
      if node.master_candidate:
1636
        continue
1637
      node.master_candidate = True
1638
      self.LogInfo("Promoting node %s to master candidate", node.name)
1639
      self.cfg.Update(node)
1640
      self.context.ReaddNode(node)
1641
      num_candidates += 1
1642

    
1643

    
1644
class LUQueryNodes(NoHooksLU):
1645
  """Logical unit for querying nodes.
1646

1647
  """
1648
  _OP_REQP = ["output_fields", "names"]
1649
  REQ_BGL = False
1650
  _FIELDS_DYNAMIC = utils.FieldSet(
1651
    "dtotal", "dfree",
1652
    "mtotal", "mnode", "mfree",
1653
    "bootid",
1654
    "ctotal",
1655
    )
1656

    
1657
  _FIELDS_STATIC = utils.FieldSet(
1658
    "name", "pinst_cnt", "sinst_cnt",
1659
    "pinst_list", "sinst_list",
1660
    "pip", "sip", "tags",
1661
    "serial_no",
1662
    "master_candidate",
1663
    "master",
1664
    "offline",
1665
    )
1666

    
1667
  def ExpandNames(self):
1668
    _CheckOutputFields(static=self._FIELDS_STATIC,
1669
                       dynamic=self._FIELDS_DYNAMIC,
1670
                       selected=self.op.output_fields)
1671

    
1672
    self.needed_locks = {}
1673
    self.share_locks[locking.LEVEL_NODE] = 1
1674

    
1675
    if self.op.names:
1676
      self.wanted = _GetWantedNodes(self, self.op.names)
1677
    else:
1678
      self.wanted = locking.ALL_SET
1679

    
1680
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1681
    if self.do_locking:
1682
      # if we don't request only static fields, we need to lock the nodes
1683
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1684

    
1685

    
1686
  def CheckPrereq(self):
1687
    """Check prerequisites.
1688

1689
    """
1690
    # The validation of the node list is done in the _GetWantedNodes,
1691
    # if non empty, and if empty, there's no validation to do
1692
    pass
1693

    
1694
  def Exec(self, feedback_fn):
1695
    """Computes the list of nodes and their attributes.
1696

1697
    """
1698
    all_info = self.cfg.GetAllNodesInfo()
1699
    if self.do_locking:
1700
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1701
    elif self.wanted != locking.ALL_SET:
1702
      nodenames = self.wanted
1703
      missing = set(nodenames).difference(all_info.keys())
1704
      if missing:
1705
        raise errors.OpExecError(
1706
          "Some nodes were removed before retrieving their data: %s" % missing)
1707
    else:
1708
      nodenames = all_info.keys()
1709

    
1710
    nodenames = utils.NiceSort(nodenames)
1711
    nodelist = [all_info[name] for name in nodenames]
1712

    
1713
    # begin data gathering
1714

    
1715
    if self.do_locking:
1716
      live_data = {}
1717
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1718
                                          self.cfg.GetHypervisorType())
1719
      for name in nodenames:
1720
        nodeinfo = node_data[name]
1721
        if not nodeinfo.failed and nodeinfo.data:
1722
          nodeinfo = nodeinfo.data
1723
          fn = utils.TryConvert
1724
          live_data[name] = {
1725
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1726
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1727
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1728
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1729
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1730
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1731
            "bootid": nodeinfo.get('bootid', None),
1732
            }
1733
        else:
1734
          live_data[name] = {}
1735
    else:
1736
      live_data = dict.fromkeys(nodenames, {})
1737

    
1738
    node_to_primary = dict([(name, set()) for name in nodenames])
1739
    node_to_secondary = dict([(name, set()) for name in nodenames])
1740

    
1741
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1742
                             "sinst_cnt", "sinst_list"))
1743
    if inst_fields & frozenset(self.op.output_fields):
1744
      instancelist = self.cfg.GetInstanceList()
1745

    
1746
      for instance_name in instancelist:
1747
        inst = self.cfg.GetInstanceInfo(instance_name)
1748
        if inst.primary_node in node_to_primary:
1749
          node_to_primary[inst.primary_node].add(inst.name)
1750
        for secnode in inst.secondary_nodes:
1751
          if secnode in node_to_secondary:
1752
            node_to_secondary[secnode].add(inst.name)
1753

    
1754
    master_node = self.cfg.GetMasterNode()
1755

    
1756
    # end data gathering
1757

    
1758
    output = []
1759
    for node in nodelist:
1760
      node_output = []
1761
      for field in self.op.output_fields:
1762
        if field == "name":
1763
          val = node.name
1764
        elif field == "pinst_list":
1765
          val = list(node_to_primary[node.name])
1766
        elif field == "sinst_list":
1767
          val = list(node_to_secondary[node.name])
1768
        elif field == "pinst_cnt":
1769
          val = len(node_to_primary[node.name])
1770
        elif field == "sinst_cnt":
1771
          val = len(node_to_secondary[node.name])
1772
        elif field == "pip":
1773
          val = node.primary_ip
1774
        elif field == "sip":
1775
          val = node.secondary_ip
1776
        elif field == "tags":
1777
          val = list(node.GetTags())
1778
        elif field == "serial_no":
1779
          val = node.serial_no
1780
        elif field == "master_candidate":
1781
          val = node.master_candidate
1782
        elif field == "master":
1783
          val = node.name == master_node
1784
        elif field == "offline":
1785
          val = node.offline
1786
        elif self._FIELDS_DYNAMIC.Matches(field):
1787
          val = live_data[node.name].get(field, None)
1788
        else:
1789
          raise errors.ParameterError(field)
1790
        node_output.append(val)
1791
      output.append(node_output)
1792

    
1793
    return output
1794

    
1795

    
1796
class LUQueryNodeVolumes(NoHooksLU):
1797
  """Logical unit for getting volumes on node(s).
1798

1799
  """
1800
  _OP_REQP = ["nodes", "output_fields"]
1801
  REQ_BGL = False
1802
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1803
  _FIELDS_STATIC = utils.FieldSet("node")
1804

    
1805
  def ExpandNames(self):
1806
    _CheckOutputFields(static=self._FIELDS_STATIC,
1807
                       dynamic=self._FIELDS_DYNAMIC,
1808
                       selected=self.op.output_fields)
1809

    
1810
    self.needed_locks = {}
1811
    self.share_locks[locking.LEVEL_NODE] = 1
1812
    if not self.op.nodes:
1813
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1814
    else:
1815
      self.needed_locks[locking.LEVEL_NODE] = \
1816
        _GetWantedNodes(self, self.op.nodes)
1817

    
1818
  def CheckPrereq(self):
1819
    """Check prerequisites.
1820

1821
    This checks that the fields required are valid output fields.
1822

1823
    """
1824
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1825

    
1826
  def Exec(self, feedback_fn):
1827
    """Computes the list of nodes and their attributes.
1828

1829
    """
1830
    nodenames = self.nodes
1831
    volumes = self.rpc.call_node_volumes(nodenames)
1832

    
1833
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1834
             in self.cfg.GetInstanceList()]
1835

    
1836
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1837

    
1838
    output = []
1839
    for node in nodenames:
1840
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1841
        continue
1842

    
1843
      node_vols = volumes[node].data[:]
1844
      node_vols.sort(key=lambda vol: vol['dev'])
1845

    
1846
      for vol in node_vols:
1847
        node_output = []
1848
        for field in self.op.output_fields:
1849
          if field == "node":
1850
            val = node
1851
          elif field == "phys":
1852
            val = vol['dev']
1853
          elif field == "vg":
1854
            val = vol['vg']
1855
          elif field == "name":
1856
            val = vol['name']
1857
          elif field == "size":
1858
            val = int(float(vol['size']))
1859
          elif field == "instance":
1860
            for inst in ilist:
1861
              if node not in lv_by_node[inst]:
1862
                continue
1863
              if vol['name'] in lv_by_node[inst][node]:
1864
                val = inst.name
1865
                break
1866
            else:
1867
              val = '-'
1868
          else:
1869
            raise errors.ParameterError(field)
1870
          node_output.append(str(val))
1871

    
1872
        output.append(node_output)
1873

    
1874
    return output
1875

    
1876

    
1877
class LUAddNode(LogicalUnit):
1878
  """Logical unit for adding node to the cluster.
1879

1880
  """
1881
  HPATH = "node-add"
1882
  HTYPE = constants.HTYPE_NODE
1883
  _OP_REQP = ["node_name"]
1884

    
1885
  def BuildHooksEnv(self):
1886
    """Build hooks env.
1887

1888
    This will run on all nodes before, and on all nodes + the new node after.
1889

1890
    """
1891
    env = {
1892
      "OP_TARGET": self.op.node_name,
1893
      "NODE_NAME": self.op.node_name,
1894
      "NODE_PIP": self.op.primary_ip,
1895
      "NODE_SIP": self.op.secondary_ip,
1896
      }
1897
    nodes_0 = self.cfg.GetNodeList()
1898
    nodes_1 = nodes_0 + [self.op.node_name, ]
1899
    return env, nodes_0, nodes_1
1900

    
1901
  def CheckPrereq(self):
1902
    """Check prerequisites.
1903

1904
    This checks:
1905
     - the new node is not already in the config
1906
     - it is resolvable
1907
     - its parameters (single/dual homed) matches the cluster
1908

1909
    Any errors are signalled by raising errors.OpPrereqError.
1910

1911
    """
1912
    node_name = self.op.node_name
1913
    cfg = self.cfg
1914

    
1915
    dns_data = utils.HostInfo(node_name)
1916

    
1917
    node = dns_data.name
1918
    primary_ip = self.op.primary_ip = dns_data.ip
1919
    secondary_ip = getattr(self.op, "secondary_ip", None)
1920
    if secondary_ip is None:
1921
      secondary_ip = primary_ip
1922
    if not utils.IsValidIP(secondary_ip):
1923
      raise errors.OpPrereqError("Invalid secondary IP given")
1924
    self.op.secondary_ip = secondary_ip
1925

    
1926
    node_list = cfg.GetNodeList()
1927
    if not self.op.readd and node in node_list:
1928
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1929
                                 node)
1930
    elif self.op.readd and node not in node_list:
1931
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1932

    
1933
    for existing_node_name in node_list:
1934
      existing_node = cfg.GetNodeInfo(existing_node_name)
1935

    
1936
      if self.op.readd and node == existing_node_name:
1937
        if (existing_node.primary_ip != primary_ip or
1938
            existing_node.secondary_ip != secondary_ip):
1939
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1940
                                     " address configuration as before")
1941
        continue
1942

    
1943
      if (existing_node.primary_ip == primary_ip or
1944
          existing_node.secondary_ip == primary_ip or
1945
          existing_node.primary_ip == secondary_ip or
1946
          existing_node.secondary_ip == secondary_ip):
1947
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1948
                                   " existing node %s" % existing_node.name)
1949

    
1950
    # check that the type of the node (single versus dual homed) is the
1951
    # same as for the master
1952
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1953
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1954
    newbie_singlehomed = secondary_ip == primary_ip
1955
    if master_singlehomed != newbie_singlehomed:
1956
      if master_singlehomed:
1957
        raise errors.OpPrereqError("The master has no private ip but the"
1958
                                   " new node has one")
1959
      else:
1960
        raise errors.OpPrereqError("The master has a private ip but the"
1961
                                   " new node doesn't have one")
1962

    
1963
    # checks reachablity
1964
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1965
      raise errors.OpPrereqError("Node not reachable by ping")
1966

    
1967
    if not newbie_singlehomed:
1968
      # check reachability from my secondary ip to newbie's secondary ip
1969
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1970
                           source=myself.secondary_ip):
1971
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1972
                                   " based ping to noded port")
1973

    
1974
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1975
    node_info = self.cfg.GetAllNodesInfo().values()
1976
    num_candidates = len([n for n in node_info
1977
                          if n.master_candidate])
1978
    master_candidate = num_candidates < cp_size
1979

    
1980
    self.new_node = objects.Node(name=node,
1981
                                 primary_ip=primary_ip,
1982
                                 secondary_ip=secondary_ip,
1983
                                 master_candidate=master_candidate,
1984
                                 offline=False)
1985

    
1986
  def Exec(self, feedback_fn):
1987
    """Adds the new node to the cluster.
1988

1989
    """
1990
    new_node = self.new_node
1991
    node = new_node.name
1992

    
1993
    # check connectivity
1994
    result = self.rpc.call_version([node])[node]
1995
    result.Raise()
1996
    if result.data:
1997
      if constants.PROTOCOL_VERSION == result.data:
1998
        logging.info("Communication to node %s fine, sw version %s match",
1999
                     node, result.data)
2000
      else:
2001
        raise errors.OpExecError("Version mismatch master version %s,"
2002
                                 " node version %s" %
2003
                                 (constants.PROTOCOL_VERSION, result.data))
2004
    else:
2005
      raise errors.OpExecError("Cannot get version from the new node")
2006

    
2007
    # setup ssh on node
2008
    logging.info("Copy ssh key to node %s", node)
2009
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2010
    keyarray = []
2011
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2012
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2013
                priv_key, pub_key]
2014

    
2015
    for i in keyfiles:
2016
      f = open(i, 'r')
2017
      try:
2018
        keyarray.append(f.read())
2019
      finally:
2020
        f.close()
2021

    
2022
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2023
                                    keyarray[2],
2024
                                    keyarray[3], keyarray[4], keyarray[5])
2025

    
2026
    if result.failed or not result.data:
2027
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2028

    
2029
    # Add node to our /etc/hosts, and add key to known_hosts
2030
    utils.AddHostToEtcHosts(new_node.name)
2031

    
2032
    if new_node.secondary_ip != new_node.primary_ip:
2033
      result = self.rpc.call_node_has_ip_address(new_node.name,
2034
                                                 new_node.secondary_ip)
2035
      if result.failed or not result.data:
2036
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2037
                                 " you gave (%s). Please fix and re-run this"
2038
                                 " command." % new_node.secondary_ip)
2039

    
2040
    node_verify_list = [self.cfg.GetMasterNode()]
2041
    node_verify_param = {
2042
      'nodelist': [node],
2043
      # TODO: do a node-net-test as well?
2044
    }
2045

    
2046
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2047
                                       self.cfg.GetClusterName())
2048
    for verifier in node_verify_list:
2049
      if result[verifier].failed or not result[verifier].data:
2050
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2051
                                 " for remote verification" % verifier)
2052
      if result[verifier].data['nodelist']:
2053
        for failed in result[verifier].data['nodelist']:
2054
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2055
                      (verifier, result[verifier]['nodelist'][failed]))
2056
        raise errors.OpExecError("ssh/hostname verification failed.")
2057

    
2058
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2059
    # including the node just added
2060
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2061
    dist_nodes = self.cfg.GetNodeList()
2062
    if not self.op.readd:
2063
      dist_nodes.append(node)
2064
    if myself.name in dist_nodes:
2065
      dist_nodes.remove(myself.name)
2066

    
2067
    logging.debug("Copying hosts and known_hosts to all nodes")
2068
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2069
      result = self.rpc.call_upload_file(dist_nodes, fname)
2070
      for to_node, to_result in result.iteritems():
2071
        if to_result.failed or not to_result.data:
2072
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2073

    
2074
    to_copy = []
2075
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2076
      to_copy.append(constants.VNC_PASSWORD_FILE)
2077
    for fname in to_copy:
2078
      result = self.rpc.call_upload_file([node], fname)
2079
      if result[node].failed or not result[node]:
2080
        logging.error("Could not copy file %s to node %s", fname, node)
2081

    
2082
    if self.op.readd:
2083
      self.context.ReaddNode(new_node)
2084
    else:
2085
      self.context.AddNode(new_node)
2086

    
2087

    
2088
class LUSetNodeParams(LogicalUnit):
2089
  """Modifies the parameters of a node.
2090

2091
  """
2092
  HPATH = "node-modify"
2093
  HTYPE = constants.HTYPE_NODE
2094
  _OP_REQP = ["node_name"]
2095
  REQ_BGL = False
2096

    
2097
  def CheckArguments(self):
2098
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2099
    if node_name is None:
2100
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2101
    self.op.node_name = node_name
2102
    if not hasattr(self.op, 'master_candidate'):
2103
      raise errors.OpPrereqError("Please pass at least one modification")
2104
    self.op.master_candidate = bool(self.op.master_candidate)
2105

    
2106
  def ExpandNames(self):
2107
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2108

    
2109
  def BuildHooksEnv(self):
2110
    """Build hooks env.
2111

2112
    This runs on the master node.
2113

2114
    """
2115
    env = {
2116
      "OP_TARGET": self.op.node_name,
2117
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2118
      }
2119
    nl = [self.cfg.GetMasterNode(),
2120
          self.op.node_name]
2121
    return env, nl, nl
2122

    
2123
  def CheckPrereq(self):
2124
    """Check prerequisites.
2125

2126
    This only checks the instance list against the existing names.
2127

2128
    """
2129
    force = self.force = self.op.force
2130

    
2131
    if self.op.master_candidate == False:
2132
      if self.op.node_name == self.cfg.GetMasterNode():
2133
        raise errors.OpPrereqError("The master node has to be a"
2134
                                   " master candidate")
2135
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2136
      node_info = self.cfg.GetAllNodesInfo().values()
2137
      num_candidates = len([node for node in node_info
2138
                            if node.master_candidate])
2139
      if num_candidates <= cp_size:
2140
        msg = ("Not enough master candidates (desired"
2141
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2142
        if force:
2143
          self.LogWarning(msg)
2144
        else:
2145
          raise errors.OpPrereqError(msg)
2146

    
2147
    return
2148

    
2149
  def Exec(self, feedback_fn):
2150
    """Modifies a node.
2151

2152
    """
2153
    node = self.cfg.GetNodeInfo(self.op.node_name)
2154

    
2155
    result = []
2156

    
2157
    if self.op.master_candidate is not None:
2158
      node.master_candidate = self.op.master_candidate
2159
      result.append(("master_candidate", str(self.op.master_candidate)))
2160

    
2161
    # this will trigger configuration file update, if needed
2162
    self.cfg.Update(node)
2163
    # this will trigger job queue propagation or cleanup
2164
    if self.op.node_name != self.cfg.GetMasterNode():
2165
      self.context.ReaddNode(node)
2166

    
2167
    return result
2168

    
2169

    
2170
class LUQueryClusterInfo(NoHooksLU):
2171
  """Query cluster configuration.
2172

2173
  """
2174
  _OP_REQP = []
2175
  REQ_BGL = False
2176

    
2177
  def ExpandNames(self):
2178
    self.needed_locks = {}
2179

    
2180
  def CheckPrereq(self):
2181
    """No prerequsites needed for this LU.
2182

2183
    """
2184
    pass
2185

    
2186
  def Exec(self, feedback_fn):
2187
    """Return cluster config.
2188

2189
    """
2190
    cluster = self.cfg.GetClusterInfo()
2191
    result = {
2192
      "software_version": constants.RELEASE_VERSION,
2193
      "protocol_version": constants.PROTOCOL_VERSION,
2194
      "config_version": constants.CONFIG_VERSION,
2195
      "os_api_version": constants.OS_API_VERSION,
2196
      "export_version": constants.EXPORT_VERSION,
2197
      "architecture": (platform.architecture()[0], platform.machine()),
2198
      "name": cluster.cluster_name,
2199
      "master": cluster.master_node,
2200
      "default_hypervisor": cluster.default_hypervisor,
2201
      "enabled_hypervisors": cluster.enabled_hypervisors,
2202
      "hvparams": cluster.hvparams,
2203
      "beparams": cluster.beparams,
2204
      "candidate_pool_size": cluster.candidate_pool_size,
2205
      }
2206

    
2207
    return result
2208

    
2209

    
2210
class LUQueryConfigValues(NoHooksLU):
2211
  """Return configuration values.
2212

2213
  """
2214
  _OP_REQP = []
2215
  REQ_BGL = False
2216
  _FIELDS_DYNAMIC = utils.FieldSet()
2217
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2218

    
2219
  def ExpandNames(self):
2220
    self.needed_locks = {}
2221

    
2222
    _CheckOutputFields(static=self._FIELDS_STATIC,
2223
                       dynamic=self._FIELDS_DYNAMIC,
2224
                       selected=self.op.output_fields)
2225

    
2226
  def CheckPrereq(self):
2227
    """No prerequisites.
2228

2229
    """
2230
    pass
2231

    
2232
  def Exec(self, feedback_fn):
2233
    """Dump a representation of the cluster config to the standard output.
2234

2235
    """
2236
    values = []
2237
    for field in self.op.output_fields:
2238
      if field == "cluster_name":
2239
        entry = self.cfg.GetClusterName()
2240
      elif field == "master_node":
2241
        entry = self.cfg.GetMasterNode()
2242
      elif field == "drain_flag":
2243
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2244
      else:
2245
        raise errors.ParameterError(field)
2246
      values.append(entry)
2247
    return values
2248

    
2249

    
2250
class LUActivateInstanceDisks(NoHooksLU):
2251
  """Bring up an instance's disks.
2252

2253
  """
2254
  _OP_REQP = ["instance_name"]
2255
  REQ_BGL = False
2256

    
2257
  def ExpandNames(self):
2258
    self._ExpandAndLockInstance()
2259
    self.needed_locks[locking.LEVEL_NODE] = []
2260
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2261

    
2262
  def DeclareLocks(self, level):
2263
    if level == locking.LEVEL_NODE:
2264
      self._LockInstancesNodes()
2265

    
2266
  def CheckPrereq(self):
2267
    """Check prerequisites.
2268

2269
    This checks that the instance is in the cluster.
2270

2271
    """
2272
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2273
    assert self.instance is not None, \
2274
      "Cannot retrieve locked instance %s" % self.op.instance_name
2275

    
2276
  def Exec(self, feedback_fn):
2277
    """Activate the disks.
2278

2279
    """
2280
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2281
    if not disks_ok:
2282
      raise errors.OpExecError("Cannot activate block devices")
2283

    
2284
    return disks_info
2285

    
2286

    
2287
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2288
  """Prepare the block devices for an instance.
2289

2290
  This sets up the block devices on all nodes.
2291

2292
  @type lu: L{LogicalUnit}
2293
  @param lu: the logical unit on whose behalf we execute
2294
  @type instance: L{objects.Instance}
2295
  @param instance: the instance for whose disks we assemble
2296
  @type ignore_secondaries: boolean
2297
  @param ignore_secondaries: if true, errors on secondary nodes
2298
      won't result in an error return from the function
2299
  @return: False if the operation failed, otherwise a list of
2300
      (host, instance_visible_name, node_visible_name)
2301
      with the mapping from node devices to instance devices
2302

2303
  """
2304
  device_info = []
2305
  disks_ok = True
2306
  iname = instance.name
2307
  # With the two passes mechanism we try to reduce the window of
2308
  # opportunity for the race condition of switching DRBD to primary
2309
  # before handshaking occured, but we do not eliminate it
2310

    
2311
  # The proper fix would be to wait (with some limits) until the
2312
  # connection has been made and drbd transitions from WFConnection
2313
  # into any other network-connected state (Connected, SyncTarget,
2314
  # SyncSource, etc.)
2315

    
2316
  # 1st pass, assemble on all nodes in secondary mode
2317
  for inst_disk in instance.disks:
2318
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2319
      lu.cfg.SetDiskID(node_disk, node)
2320
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2321
      if result.failed or not result:
2322
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2323
                           " (is_primary=False, pass=1)",
2324
                           inst_disk.iv_name, node)
2325
        if not ignore_secondaries:
2326
          disks_ok = False
2327

    
2328
  # FIXME: race condition on drbd migration to primary
2329

    
2330
  # 2nd pass, do only the primary node
2331
  for inst_disk in instance.disks:
2332
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2333
      if node != instance.primary_node:
2334
        continue
2335
      lu.cfg.SetDiskID(node_disk, node)
2336
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2337
      if result.failed or not result:
2338
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2339
                           " (is_primary=True, pass=2)",
2340
                           inst_disk.iv_name, node)
2341
        disks_ok = False
2342
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2343

    
2344
  # leave the disks configured for the primary node
2345
  # this is a workaround that would be fixed better by
2346
  # improving the logical/physical id handling
2347
  for disk in instance.disks:
2348
    lu.cfg.SetDiskID(disk, instance.primary_node)
2349

    
2350
  return disks_ok, device_info
2351

    
2352

    
2353
def _StartInstanceDisks(lu, instance, force):
2354
  """Start the disks of an instance.
2355

2356
  """
2357
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2358
                                           ignore_secondaries=force)
2359
  if not disks_ok:
2360
    _ShutdownInstanceDisks(lu, instance)
2361
    if force is not None and not force:
2362
      lu.proc.LogWarning("", hint="If the message above refers to a"
2363
                         " secondary node,"
2364
                         " you can retry the operation using '--force'.")
2365
    raise errors.OpExecError("Disk consistency error")
2366

    
2367

    
2368
class LUDeactivateInstanceDisks(NoHooksLU):
2369
  """Shutdown an instance's disks.
2370

2371
  """
2372
  _OP_REQP = ["instance_name"]
2373
  REQ_BGL = False
2374

    
2375
  def ExpandNames(self):
2376
    self._ExpandAndLockInstance()
2377
    self.needed_locks[locking.LEVEL_NODE] = []
2378
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2379

    
2380
  def DeclareLocks(self, level):
2381
    if level == locking.LEVEL_NODE:
2382
      self._LockInstancesNodes()
2383

    
2384
  def CheckPrereq(self):
2385
    """Check prerequisites.
2386

2387
    This checks that the instance is in the cluster.
2388

2389
    """
2390
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2391
    assert self.instance is not None, \
2392
      "Cannot retrieve locked instance %s" % self.op.instance_name
2393

    
2394
  def Exec(self, feedback_fn):
2395
    """Deactivate the disks
2396

2397
    """
2398
    instance = self.instance
2399
    _SafeShutdownInstanceDisks(self, instance)
2400

    
2401

    
2402
def _SafeShutdownInstanceDisks(lu, instance):
2403
  """Shutdown block devices of an instance.
2404

2405
  This function checks if an instance is running, before calling
2406
  _ShutdownInstanceDisks.
2407

2408
  """
2409
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2410
                                      [instance.hypervisor])
2411
  ins_l = ins_l[instance.primary_node]
2412
  if ins_l.failed or not isinstance(ins_l.data, list):
2413
    raise errors.OpExecError("Can't contact node '%s'" %
2414
                             instance.primary_node)
2415

    
2416
  if instance.name in ins_l.data:
2417
    raise errors.OpExecError("Instance is running, can't shutdown"
2418
                             " block devices.")
2419

    
2420
  _ShutdownInstanceDisks(lu, instance)
2421

    
2422

    
2423
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2424
  """Shutdown block devices of an instance.
2425

2426
  This does the shutdown on all nodes of the instance.
2427

2428
  If the ignore_primary is false, errors on the primary node are
2429
  ignored.
2430

2431
  """
2432
  result = True
2433
  for disk in instance.disks:
2434
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2435
      lu.cfg.SetDiskID(top_disk, node)
2436
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2437
      if result.failed or not result.data:
2438
        logging.error("Could not shutdown block device %s on node %s",
2439
                      disk.iv_name, node)
2440
        if not ignore_primary or node != instance.primary_node:
2441
          result = False
2442
  return result
2443

    
2444

    
2445
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2446
  """Checks if a node has enough free memory.
2447

2448
  This function check if a given node has the needed amount of free
2449
  memory. In case the node has less memory or we cannot get the
2450
  information from the node, this function raise an OpPrereqError
2451
  exception.
2452

2453
  @type lu: C{LogicalUnit}
2454
  @param lu: a logical unit from which we get configuration data
2455
  @type node: C{str}
2456
  @param node: the node to check
2457
  @type reason: C{str}
2458
  @param reason: string to use in the error message
2459
  @type requested: C{int}
2460
  @param requested: the amount of memory in MiB to check for
2461
  @type hypervisor: C{str}
2462
  @param hypervisor: the hypervisor to ask for memory stats
2463
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2464
      we cannot check the node
2465

2466
  """
2467
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2468
  nodeinfo[node].Raise()
2469
  free_mem = nodeinfo[node].data.get('memory_free')
2470
  if not isinstance(free_mem, int):
2471
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2472
                             " was '%s'" % (node, free_mem))
2473
  if requested > free_mem:
2474
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2475
                             " needed %s MiB, available %s MiB" %
2476
                             (node, reason, requested, free_mem))
2477

    
2478

    
2479
class LUStartupInstance(LogicalUnit):
2480
  """Starts an instance.
2481

2482
  """
2483
  HPATH = "instance-start"
2484
  HTYPE = constants.HTYPE_INSTANCE
2485
  _OP_REQP = ["instance_name", "force"]
2486
  REQ_BGL = False
2487

    
2488
  def ExpandNames(self):
2489
    self._ExpandAndLockInstance()
2490

    
2491
  def BuildHooksEnv(self):
2492
    """Build hooks env.
2493

2494
    This runs on master, primary and secondary nodes of the instance.
2495

2496
    """
2497
    env = {
2498
      "FORCE": self.op.force,
2499
      }
2500
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2501
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2502
          list(self.instance.secondary_nodes))
2503
    return env, nl, nl
2504

    
2505
  def CheckPrereq(self):
2506
    """Check prerequisites.
2507

2508
    This checks that the instance is in the cluster.
2509

2510
    """
2511
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2512
    assert self.instance is not None, \
2513
      "Cannot retrieve locked instance %s" % self.op.instance_name
2514

    
2515
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2516
    # check bridges existance
2517
    _CheckInstanceBridgesExist(self, instance)
2518

    
2519
    _CheckNodeFreeMemory(self, instance.primary_node,
2520
                         "starting instance %s" % instance.name,
2521
                         bep[constants.BE_MEMORY], instance.hypervisor)
2522

    
2523
  def Exec(self, feedback_fn):
2524
    """Start the instance.
2525

2526
    """
2527
    instance = self.instance
2528
    force = self.op.force
2529
    extra_args = getattr(self.op, "extra_args", "")
2530

    
2531
    self.cfg.MarkInstanceUp(instance.name)
2532

    
2533
    node_current = instance.primary_node
2534

    
2535
    _StartInstanceDisks(self, instance, force)
2536

    
2537
    result = self.rpc.call_instance_start(node_current, instance, extra_args)
2538
    if result.failed or not result.data:
2539
      _ShutdownInstanceDisks(self, instance)
2540
      raise errors.OpExecError("Could not start instance")
2541

    
2542

    
2543
class LURebootInstance(LogicalUnit):
2544
  """Reboot an instance.
2545

2546
  """
2547
  HPATH = "instance-reboot"
2548
  HTYPE = constants.HTYPE_INSTANCE
2549
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2550
  REQ_BGL = False
2551

    
2552
  def ExpandNames(self):
2553
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2554
                                   constants.INSTANCE_REBOOT_HARD,
2555
                                   constants.INSTANCE_REBOOT_FULL]:
2556
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2557
                                  (constants.INSTANCE_REBOOT_SOFT,
2558
                                   constants.INSTANCE_REBOOT_HARD,
2559
                                   constants.INSTANCE_REBOOT_FULL))
2560
    self._ExpandAndLockInstance()
2561

    
2562
  def BuildHooksEnv(self):
2563
    """Build hooks env.
2564

2565
    This runs on master, primary and secondary nodes of the instance.
2566

2567
    """
2568
    env = {
2569
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2570
      }
2571
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2572
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2573
          list(self.instance.secondary_nodes))
2574
    return env, nl, nl
2575

    
2576
  def CheckPrereq(self):
2577
    """Check prerequisites.
2578

2579
    This checks that the instance is in the cluster.
2580

2581
    """
2582
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2583
    assert self.instance is not None, \
2584
      "Cannot retrieve locked instance %s" % self.op.instance_name
2585

    
2586
    # check bridges existance
2587
    _CheckInstanceBridgesExist(self, instance)
2588

    
2589
  def Exec(self, feedback_fn):
2590
    """Reboot the instance.
2591

2592
    """
2593
    instance = self.instance
2594
    ignore_secondaries = self.op.ignore_secondaries
2595
    reboot_type = self.op.reboot_type
2596
    extra_args = getattr(self.op, "extra_args", "")
2597

    
2598
    node_current = instance.primary_node
2599

    
2600
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2601
                       constants.INSTANCE_REBOOT_HARD]:
2602
      result = self.rpc.call_instance_reboot(node_current, instance,
2603
                                             reboot_type, extra_args)
2604
      if result.failed or not result.data:
2605
        raise errors.OpExecError("Could not reboot instance")
2606
    else:
2607
      if not self.rpc.call_instance_shutdown(node_current, instance):
2608
        raise errors.OpExecError("could not shutdown instance for full reboot")
2609
      _ShutdownInstanceDisks(self, instance)
2610
      _StartInstanceDisks(self, instance, ignore_secondaries)
2611
      result = self.rpc.call_instance_start(node_current, instance, extra_args)
2612
      if result.failed or not result.data:
2613
        _ShutdownInstanceDisks(self, instance)
2614
        raise errors.OpExecError("Could not start instance for full reboot")
2615

    
2616
    self.cfg.MarkInstanceUp(instance.name)
2617

    
2618

    
2619
class LUShutdownInstance(LogicalUnit):
2620
  """Shutdown an instance.
2621

2622
  """
2623
  HPATH = "instance-stop"
2624
  HTYPE = constants.HTYPE_INSTANCE
2625
  _OP_REQP = ["instance_name"]
2626
  REQ_BGL = False
2627

    
2628
  def ExpandNames(self):
2629
    self._ExpandAndLockInstance()
2630

    
2631
  def BuildHooksEnv(self):
2632
    """Build hooks env.
2633

2634
    This runs on master, primary and secondary nodes of the instance.
2635

2636
    """
2637
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2638
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2639
          list(self.instance.secondary_nodes))
2640
    return env, nl, nl
2641

    
2642
  def CheckPrereq(self):
2643
    """Check prerequisites.
2644

2645
    This checks that the instance is in the cluster.
2646

2647
    """
2648
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2649
    assert self.instance is not None, \
2650
      "Cannot retrieve locked instance %s" % self.op.instance_name
2651

    
2652
  def Exec(self, feedback_fn):
2653
    """Shutdown the instance.
2654

2655
    """
2656
    instance = self.instance
2657
    node_current = instance.primary_node
2658
    self.cfg.MarkInstanceDown(instance.name)
2659
    result = self.rpc.call_instance_shutdown(node_current, instance)
2660
    if result.failed or not result.data:
2661
      self.proc.LogWarning("Could not shutdown instance")
2662

    
2663
    _ShutdownInstanceDisks(self, instance)
2664

    
2665

    
2666
class LUReinstallInstance(LogicalUnit):
2667
  """Reinstall an instance.
2668

2669
  """
2670
  HPATH = "instance-reinstall"
2671
  HTYPE = constants.HTYPE_INSTANCE
2672
  _OP_REQP = ["instance_name"]
2673
  REQ_BGL = False
2674

    
2675
  def ExpandNames(self):
2676
    self._ExpandAndLockInstance()
2677

    
2678
  def BuildHooksEnv(self):
2679
    """Build hooks env.
2680

2681
    This runs on master, primary and secondary nodes of the instance.
2682

2683
    """
2684
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2685
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2686
          list(self.instance.secondary_nodes))
2687
    return env, nl, nl
2688

    
2689
  def CheckPrereq(self):
2690
    """Check prerequisites.
2691

2692
    This checks that the instance is in the cluster and is not running.
2693

2694
    """
2695
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2696
    assert instance is not None, \
2697
      "Cannot retrieve locked instance %s" % self.op.instance_name
2698

    
2699
    if instance.disk_template == constants.DT_DISKLESS:
2700
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2701
                                 self.op.instance_name)
2702
    if instance.status != "down":
2703
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2704
                                 self.op.instance_name)
2705
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2706
                                              instance.name,
2707
                                              instance.hypervisor)
2708
    if remote_info.failed or remote_info.data:
2709
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2710
                                 (self.op.instance_name,
2711
                                  instance.primary_node))
2712

    
2713
    self.op.os_type = getattr(self.op, "os_type", None)
2714
    if self.op.os_type is not None:
2715
      # OS verification
2716
      pnode = self.cfg.GetNodeInfo(
2717
        self.cfg.ExpandNodeName(instance.primary_node))
2718
      if pnode is None:
2719
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2720
                                   self.op.pnode)
2721
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2722
      result.Raise()
2723
      if not isinstance(result.data, objects.OS):
2724
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2725
                                   " primary node"  % self.op.os_type)
2726

    
2727
    self.instance = instance
2728

    
2729
  def Exec(self, feedback_fn):
2730
    """Reinstall the instance.
2731

2732
    """
2733
    inst = self.instance
2734

    
2735
    if self.op.os_type is not None:
2736
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2737
      inst.os = self.op.os_type
2738
      self.cfg.Update(inst)
2739

    
2740
    _StartInstanceDisks(self, inst, None)
2741
    try:
2742
      feedback_fn("Running the instance OS create scripts...")
2743
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2744
      result.Raise()
2745
      if not result.data:
2746
        raise errors.OpExecError("Could not install OS for instance %s"
2747
                                 " on node %s" %
2748
                                 (inst.name, inst.primary_node))
2749
    finally:
2750
      _ShutdownInstanceDisks(self, inst)
2751

    
2752

    
2753
class LURenameInstance(LogicalUnit):
2754
  """Rename an instance.
2755

2756
  """
2757
  HPATH = "instance-rename"
2758
  HTYPE = constants.HTYPE_INSTANCE
2759
  _OP_REQP = ["instance_name", "new_name"]
2760

    
2761
  def BuildHooksEnv(self):
2762
    """Build hooks env.
2763

2764
    This runs on master, primary and secondary nodes of the instance.
2765

2766
    """
2767
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2768
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2769
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2770
          list(self.instance.secondary_nodes))
2771
    return env, nl, nl
2772

    
2773
  def CheckPrereq(self):
2774
    """Check prerequisites.
2775

2776
    This checks that the instance is in the cluster and is not running.
2777

2778
    """
2779
    instance = self.cfg.GetInstanceInfo(
2780
      self.cfg.ExpandInstanceName(self.op.instance_name))
2781
    if instance is None:
2782
      raise errors.OpPrereqError("Instance '%s' not known" %
2783
                                 self.op.instance_name)
2784
    if instance.status != "down":
2785
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2786
                                 self.op.instance_name)
2787
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2788
                                              instance.name,
2789
                                              instance.hypervisor)
2790
    remote_info.Raise()
2791
    if remote_info.data:
2792
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2793
                                 (self.op.instance_name,
2794
                                  instance.primary_node))
2795
    self.instance = instance
2796

    
2797
    # new name verification
2798
    name_info = utils.HostInfo(self.op.new_name)
2799

    
2800
    self.op.new_name = new_name = name_info.name
2801
    instance_list = self.cfg.GetInstanceList()
2802
    if new_name in instance_list:
2803
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2804
                                 new_name)
2805

    
2806
    if not getattr(self.op, "ignore_ip", False):
2807
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2808
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2809
                                   (name_info.ip, new_name))
2810

    
2811

    
2812
  def Exec(self, feedback_fn):
2813
    """Reinstall the instance.
2814

2815
    """
2816
    inst = self.instance
2817
    old_name = inst.name
2818

    
2819
    if inst.disk_template == constants.DT_FILE:
2820
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2821

    
2822
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2823
    # Change the instance lock. This is definitely safe while we hold the BGL
2824
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2825
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2826

    
2827
    # re-read the instance from the configuration after rename
2828
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2829

    
2830
    if inst.disk_template == constants.DT_FILE:
2831
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2832
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2833
                                                     old_file_storage_dir,
2834
                                                     new_file_storage_dir)
2835
      result.Raise()
2836
      if not result.data:
2837
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2838
                                 " directory '%s' to '%s' (but the instance"
2839
                                 " has been renamed in Ganeti)" % (
2840
                                 inst.primary_node, old_file_storage_dir,
2841
                                 new_file_storage_dir))
2842

    
2843
      if not result.data[0]:
2844
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2845
                                 " (but the instance has been renamed in"
2846
                                 " Ganeti)" % (old_file_storage_dir,
2847
                                               new_file_storage_dir))
2848

    
2849
    _StartInstanceDisks(self, inst, None)
2850
    try:
2851
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2852
                                                 old_name)
2853
      if result.failed or not result.data:
2854
        msg = ("Could not run OS rename script for instance %s on node %s"
2855
               " (but the instance has been renamed in Ganeti)" %
2856
               (inst.name, inst.primary_node))
2857
        self.proc.LogWarning(msg)
2858
    finally:
2859
      _ShutdownInstanceDisks(self, inst)
2860

    
2861

    
2862
class LURemoveInstance(LogicalUnit):
2863
  """Remove an instance.
2864

2865
  """
2866
  HPATH = "instance-remove"
2867
  HTYPE = constants.HTYPE_INSTANCE
2868
  _OP_REQP = ["instance_name", "ignore_failures"]
2869
  REQ_BGL = False
2870

    
2871
  def ExpandNames(self):
2872
    self._ExpandAndLockInstance()
2873
    self.needed_locks[locking.LEVEL_NODE] = []
2874
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2875

    
2876
  def DeclareLocks(self, level):
2877
    if level == locking.LEVEL_NODE:
2878
      self._LockInstancesNodes()
2879

    
2880
  def BuildHooksEnv(self):
2881
    """Build hooks env.
2882

2883
    This runs on master, primary and secondary nodes of the instance.
2884

2885
    """
2886
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2887
    nl = [self.cfg.GetMasterNode()]
2888
    return env, nl, nl
2889

    
2890
  def CheckPrereq(self):
2891
    """Check prerequisites.
2892

2893
    This checks that the instance is in the cluster.
2894

2895
    """
2896
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2897
    assert self.instance is not None, \
2898
      "Cannot retrieve locked instance %s" % self.op.instance_name
2899

    
2900
  def Exec(self, feedback_fn):
2901
    """Remove the instance.
2902

2903
    """
2904
    instance = self.instance
2905
    logging.info("Shutting down instance %s on node %s",
2906
                 instance.name, instance.primary_node)
2907

    
2908
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2909
    if result.failed or not result.data:
2910
      if self.op.ignore_failures:
2911
        feedback_fn("Warning: can't shutdown instance")
2912
      else:
2913
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2914
                                 (instance.name, instance.primary_node))
2915

    
2916
    logging.info("Removing block devices for instance %s", instance.name)
2917

    
2918
    if not _RemoveDisks(self, instance):
2919
      if self.op.ignore_failures:
2920
        feedback_fn("Warning: can't remove instance's disks")
2921
      else:
2922
        raise errors.OpExecError("Can't remove instance's disks")
2923

    
2924
    logging.info("Removing instance %s out of cluster config", instance.name)
2925

    
2926
    self.cfg.RemoveInstance(instance.name)
2927
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2928

    
2929

    
2930
class LUQueryInstances(NoHooksLU):
2931
  """Logical unit for querying instances.
2932

2933
  """
2934
  _OP_REQP = ["output_fields", "names"]
2935
  REQ_BGL = False
2936
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2937
                                    "admin_state", "admin_ram",
2938
                                    "disk_template", "ip", "mac", "bridge",
2939
                                    "sda_size", "sdb_size", "vcpus", "tags",
2940
                                    "network_port", "beparams",
2941
                                    "(disk).(size)/([0-9]+)",
2942
                                    "(disk).(sizes)",
2943
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2944
                                    "(nic).(macs|ips|bridges)",
2945
                                    "(disk|nic).(count)",
2946
                                    "serial_no", "hypervisor", "hvparams",] +
2947
                                  ["hv/%s" % name
2948
                                   for name in constants.HVS_PARAMETERS] +
2949
                                  ["be/%s" % name
2950
                                   for name in constants.BES_PARAMETERS])
2951
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2952

    
2953

    
2954
  def ExpandNames(self):
2955
    _CheckOutputFields(static=self._FIELDS_STATIC,
2956
                       dynamic=self._FIELDS_DYNAMIC,
2957
                       selected=self.op.output_fields)
2958

    
2959
    self.needed_locks = {}
2960
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2961
    self.share_locks[locking.LEVEL_NODE] = 1
2962

    
2963
    if self.op.names:
2964
      self.wanted = _GetWantedInstances(self, self.op.names)
2965
    else:
2966
      self.wanted = locking.ALL_SET
2967

    
2968
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2969
    if self.do_locking:
2970
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2971
      self.needed_locks[locking.LEVEL_NODE] = []
2972
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2973

    
2974
  def DeclareLocks(self, level):
2975
    if level == locking.LEVEL_NODE and self.do_locking:
2976
      self._LockInstancesNodes()
2977

    
2978
  def CheckPrereq(self):
2979
    """Check prerequisites.
2980

2981
    """
2982
    pass
2983

    
2984
  def Exec(self, feedback_fn):
2985
    """Computes the list of nodes and their attributes.
2986

2987
    """
2988
    all_info = self.cfg.GetAllInstancesInfo()
2989
    if self.do_locking:
2990
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2991
    elif self.wanted != locking.ALL_SET:
2992
      instance_names = self.wanted
2993
      missing = set(instance_names).difference(all_info.keys())
2994
      if missing:
2995
        raise errors.OpExecError(
2996
          "Some instances were removed before retrieving their data: %s"
2997
          % missing)
2998
    else:
2999
      instance_names = all_info.keys()
3000

    
3001
    instance_names = utils.NiceSort(instance_names)
3002
    instance_list = [all_info[iname] for iname in instance_names]
3003

    
3004
    # begin data gathering
3005

    
3006
    nodes = frozenset([inst.primary_node for inst in instance_list])
3007
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3008

    
3009
    bad_nodes = []
3010
    off_nodes = []
3011
    if self.do_locking:
3012
      live_data = {}
3013
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3014
      for name in nodes:
3015
        result = node_data[name]
3016
        if result.offline:
3017
          # offline nodes will be in both lists
3018
          off_nodes.append(name)
3019
        if result.failed:
3020
          bad_nodes.append(name)
3021
        else:
3022
          if result.data:
3023
            live_data.update(result.data)
3024
            # else no instance is alive
3025
    else:
3026
      live_data = dict([(name, {}) for name in instance_names])
3027

    
3028
    # end data gathering
3029

    
3030
    HVPREFIX = "hv/"
3031
    BEPREFIX = "be/"
3032
    output = []
3033
    for instance in instance_list:
3034
      iout = []
3035
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3036
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3037
      for field in self.op.output_fields:
3038
        st_match = self._FIELDS_STATIC.Matches(field)
3039
        if field == "name":
3040
          val = instance.name
3041
        elif field == "os":
3042
          val = instance.os
3043
        elif field == "pnode":
3044
          val = instance.primary_node
3045
        elif field == "snodes":
3046
          val = list(instance.secondary_nodes)
3047
        elif field == "admin_state":
3048
          val = (instance.status != "down")
3049
        elif field == "oper_state":
3050
          if instance.primary_node in bad_nodes:
3051
            val = None
3052
          else:
3053
            val = bool(live_data.get(instance.name))
3054
        elif field == "status":
3055
          if instance.primary_node in off_nodes:
3056
            val = "ERROR_nodeoffline"
3057
          elif instance.primary_node in bad_nodes:
3058
            val = "ERROR_nodedown"
3059
          else:
3060
            running = bool(live_data.get(instance.name))
3061
            if running:
3062
              if instance.status != "down":
3063
                val = "running"
3064
              else:
3065
                val = "ERROR_up"
3066
            else:
3067
              if instance.status != "down":
3068
                val = "ERROR_down"
3069
              else:
3070
                val = "ADMIN_down"
3071
        elif field == "oper_ram":
3072
          if instance.primary_node in bad_nodes:
3073
            val = None
3074
          elif instance.name in live_data:
3075
            val = live_data[instance.name].get("memory", "?")
3076
          else:
3077
            val = "-"
3078
        elif field == "disk_template":
3079
          val = instance.disk_template
3080
        elif field == "ip":
3081
          val = instance.nics[0].ip
3082
        elif field == "bridge":
3083
          val = instance.nics[0].bridge
3084
        elif field == "mac":
3085
          val = instance.nics[0].mac
3086
        elif field == "sda_size" or field == "sdb_size":
3087
          idx = ord(field[2]) - ord('a')
3088
          try:
3089
            val = instance.FindDisk(idx).size
3090
          except errors.OpPrereqError:
3091
            val = None
3092
        elif field == "tags":
3093
          val = list(instance.GetTags())
3094
        elif field == "serial_no":
3095
          val = instance.serial_no
3096
        elif field == "network_port":
3097
          val = instance.network_port
3098
        elif field == "hypervisor":
3099
          val = instance.hypervisor
3100
        elif field == "hvparams":
3101
          val = i_hv
3102
        elif (field.startswith(HVPREFIX) and
3103
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3104
          val = i_hv.get(field[len(HVPREFIX):], None)
3105
        elif field == "beparams":
3106
          val = i_be
3107
        elif (field.startswith(BEPREFIX) and
3108
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3109
          val = i_be.get(field[len(BEPREFIX):], None)
3110
        elif st_match and st_match.groups():
3111
          # matches a variable list
3112
          st_groups = st_match.groups()
3113
          if st_groups and st_groups[0] == "disk":
3114
            if st_groups[1] == "count":
3115
              val = len(instance.disks)
3116
            elif st_groups[1] == "sizes":
3117
              val = [disk.size for disk in instance.disks]
3118
            elif st_groups[1] == "size":
3119
              try:
3120
                val = instance.FindDisk(st_groups[2]).size
3121
              except errors.OpPrereqError:
3122
                val = None
3123
            else:
3124
              assert False, "Unhandled disk parameter"
3125
          elif st_groups[0] == "nic":
3126
            if st_groups[1] == "count":
3127
              val = len(instance.nics)
3128
            elif st_groups[1] == "macs":
3129
              val = [nic.mac for nic in instance.nics]
3130
            elif st_groups[1] == "ips":
3131
              val = [nic.ip for nic in instance.nics]
3132
            elif st_groups[1] == "bridges":
3133
              val = [nic.bridge for nic in instance.nics]
3134
            else:
3135
              # index-based item
3136
              nic_idx = int(st_groups[2])
3137
              if nic_idx >= len(instance.nics):
3138
                val = None
3139
              else:
3140
                if st_groups[1] == "mac":
3141
                  val = instance.nics[nic_idx].mac
3142
                elif st_groups[1] == "ip":
3143
                  val = instance.nics[nic_idx].ip
3144
                elif st_groups[1] == "bridge":
3145
                  val = instance.nics[nic_idx].bridge
3146
                else:
3147
                  assert False, "Unhandled NIC parameter"
3148
          else:
3149
            assert False, "Unhandled variable parameter"
3150
        else:
3151
          raise errors.ParameterError(field)
3152
        iout.append(val)
3153
      output.append(iout)
3154

    
3155
    return output
3156

    
3157

    
3158
class LUFailoverInstance(LogicalUnit):
3159
  """Failover an instance.
3160

3161
  """
3162
  HPATH = "instance-failover"
3163
  HTYPE = constants.HTYPE_INSTANCE
3164
  _OP_REQP = ["instance_name", "ignore_consistency"]
3165
  REQ_BGL = False
3166

    
3167
  def ExpandNames(self):
3168
    self._ExpandAndLockInstance()
3169
    self.needed_locks[locking.LEVEL_NODE] = []
3170
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3171

    
3172
  def DeclareLocks(self, level):
3173
    if level == locking.LEVEL_NODE:
3174
      self._LockInstancesNodes()
3175

    
3176
  def BuildHooksEnv(self):
3177
    """Build hooks env.
3178

3179
    This runs on master, primary and secondary nodes of the instance.
3180

3181
    """
3182
    env = {
3183
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3184
      }
3185
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3186
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3187
    return env, nl, nl
3188

    
3189
  def CheckPrereq(self):
3190
    """Check prerequisites.
3191

3192
    This checks that the instance is in the cluster.
3193

3194
    """
3195
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3196
    assert self.instance is not None, \
3197
      "Cannot retrieve locked instance %s" % self.op.instance_name
3198

    
3199
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3200
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3201
      raise errors.OpPrereqError("Instance's disk layout is not"
3202
                                 " network mirrored, cannot failover.")
3203

    
3204
    secondary_nodes = instance.secondary_nodes
3205
    if not secondary_nodes:
3206
      raise errors.ProgrammerError("no secondary node but using "
3207
                                   "a mirrored disk template")
3208

    
3209
    target_node = secondary_nodes[0]
3210
    # check memory requirements on the secondary node
3211
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3212
                         instance.name, bep[constants.BE_MEMORY],
3213
                         instance.hypervisor)
3214

    
3215
    # check bridge existance
3216
    brlist = [nic.bridge for nic in instance.nics]
3217
    result = self.rpc.call_bridges_exist(target_node, brlist)
3218
    result.Raise()
3219
    if not result.data:
3220
      raise errors.OpPrereqError("One or more target bridges %s does not"
3221
                                 " exist on destination node '%s'" %
3222
                                 (brlist, target_node))
3223

    
3224
  def Exec(self, feedback_fn):
3225
    """Failover an instance.
3226

3227
    The failover is done by shutting it down on its present node and
3228
    starting it on the secondary.
3229

3230
    """
3231
    instance = self.instance
3232

    
3233
    source_node = instance.primary_node
3234
    target_node = instance.secondary_nodes[0]
3235

    
3236
    feedback_fn("* checking disk consistency between source and target")
3237
    for dev in instance.disks:
3238
      # for drbd, these are drbd over lvm
3239
      if not _CheckDiskConsistency(self, dev, target_node, False):
3240
        if instance.status == "up" and not self.op.ignore_consistency:
3241
          raise errors.OpExecError("Disk %s is degraded on target node,"
3242
                                   " aborting failover." % dev.iv_name)
3243

    
3244
    feedback_fn("* shutting down instance on source node")
3245
    logging.info("Shutting down instance %s on node %s",
3246
                 instance.name, source_node)
3247

    
3248
    result = self.rpc.call_instance_shutdown(source_node, instance)
3249
    if result.failed or not result.data:
3250
      if self.op.ignore_consistency:
3251
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3252
                             " Proceeding"
3253
                             " anyway. Please make sure node %s is down",
3254
                             instance.name, source_node, source_node)
3255
      else:
3256
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3257
                                 (instance.name, source_node))
3258

    
3259
    feedback_fn("* deactivating the instance's disks on source node")
3260
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3261
      raise errors.OpExecError("Can't shut down the instance's disks.")
3262

    
3263
    instance.primary_node = target_node
3264
    # distribute new instance config to the other nodes
3265
    self.cfg.Update(instance)
3266

    
3267
    # Only start the instance if it's marked as up
3268
    if instance.status == "up":
3269
      feedback_fn("* activating the instance's disks on target node")
3270
      logging.info("Starting instance %s on node %s",
3271
                   instance.name, target_node)
3272

    
3273
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3274
                                               ignore_secondaries=True)
3275
      if not disks_ok:
3276
        _ShutdownInstanceDisks(self, instance)
3277
        raise errors.OpExecError("Can't activate the instance's disks")
3278

    
3279
      feedback_fn("* starting the instance on the target node")
3280
      result = self.rpc.call_instance_start(target_node, instance, None)
3281
      if result.failed or not result.data:
3282
        _ShutdownInstanceDisks(self, instance)
3283
        raise errors.OpExecError("Could not start instance %s on node %s." %
3284
                                 (instance.name, target_node))
3285

    
3286

    
3287
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3288
  """Create a tree of block devices on the primary node.
3289

3290
  This always creates all devices.
3291

3292
  """
3293
  if device.children:
3294
    for child in device.children:
3295
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3296
        return False
3297

    
3298
  lu.cfg.SetDiskID(device, node)
3299
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3300
                                       instance.name, True, info)
3301
  if new_id.failed or not new_id.data:
3302
    return False
3303
  if device.physical_id is None:
3304
    device.physical_id = new_id
3305
  return True
3306

    
3307

    
3308
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3309
  """Create a tree of block devices on a secondary node.
3310

3311
  If this device type has to be created on secondaries, create it and
3312
  all its children.
3313

3314
  If not, just recurse to children keeping the same 'force' value.
3315

3316
  """
3317
  if device.CreateOnSecondary():
3318
    force = True
3319
  if device.children:
3320
    for child in device.children:
3321
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3322
                                        child, force, info):
3323
        return False
3324

    
3325
  if not force:
3326
    return True
3327
  lu.cfg.SetDiskID(device, node)
3328
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3329
                                       instance.name, False, info)
3330
  if new_id.failed or not new_id.data:
3331
    return False
3332
  if device.physical_id is None:
3333
    device.physical_id = new_id
3334
  return True
3335

    
3336

    
3337
def _GenerateUniqueNames(lu, exts):
3338
  """Generate a suitable LV name.
3339

3340
  This will generate a logical volume name for the given instance.
3341

3342
  """
3343
  results = []
3344
  for val in exts:
3345
    new_id = lu.cfg.GenerateUniqueID()
3346
    results.append("%s%s" % (new_id, val))
3347
  return results
3348

    
3349

    
3350
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3351
                         p_minor, s_minor):
3352
  """Generate a drbd8 device complete with its children.
3353

3354
  """
3355
  port = lu.cfg.AllocatePort()
3356
  vgname = lu.cfg.GetVGName()
3357
  shared_secret = lu.cfg.GenerateDRBDSecret()
3358
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3359
                          logical_id=(vgname, names[0]))
3360
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3361
                          logical_id=(vgname, names[1]))
3362
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3363
                          logical_id=(primary, secondary, port,
3364
                                      p_minor, s_minor,
3365
                                      shared_secret),
3366
                          children=[dev_data, dev_meta],
3367
                          iv_name=iv_name)
3368
  return drbd_dev
3369

    
3370

    
3371
def _GenerateDiskTemplate(lu, template_name,
3372
                          instance_name, primary_node,
3373
                          secondary_nodes, disk_info,
3374
                          file_storage_dir, file_driver,
3375
                          base_index):
3376
  """Generate the entire disk layout for a given template type.
3377

3378
  """
3379
  #TODO: compute space requirements
3380

    
3381
  vgname = lu.cfg.GetVGName()
3382
  disk_count = len(disk_info)
3383
  disks = []
3384
  if template_name == constants.DT_DISKLESS:
3385
    pass
3386
  elif template_name == constants.DT_PLAIN:
3387
    if len(secondary_nodes) != 0:
3388
      raise errors.ProgrammerError("Wrong template configuration")
3389

    
3390
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3391
                                      for i in range(disk_count)])
3392
    for idx, disk in enumerate(disk_info):
3393
      disk_index = idx + base_index
3394
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3395
                              logical_id=(vgname, names[idx]),
3396
                              iv_name="disk/%d" % disk_index)
3397
      disks.append(disk_dev)
3398
  elif template_name == constants.DT_DRBD8:
3399
    if len(secondary_nodes) != 1:
3400
      raise errors.ProgrammerError("Wrong template configuration")
3401
    remote_node = secondary_nodes[0]
3402
    minors = lu.cfg.AllocateDRBDMinor(
3403
      [primary_node, remote_node] * len(disk_info), instance_name)
3404

    
3405
    names = _GenerateUniqueNames(lu,
3406
                                 [".disk%d_%s" % (i, s)
3407
                                  for i in range(disk_count)
3408
                                  for s in ("data", "meta")
3409
                                  ])
3410
    for idx, disk in enumerate(disk_info):
3411
      disk_index = idx + base_index
3412
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3413
                                      disk["size"], names[idx*2:idx*2+2],
3414
                                      "disk/%d" % disk_index,
3415
                                      minors[idx*2], minors[idx*2+1])
3416
      disks.append(disk_dev)
3417
  elif template_name == constants.DT_FILE:
3418
    if len(secondary_nodes) != 0:
3419
      raise errors.ProgrammerError("Wrong template configuration")
3420

    
3421
    for idx, disk in enumerate(disk_info):
3422
      disk_index = idx + base_index
3423
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3424
                              iv_name="disk/%d" % disk_index,
3425
                              logical_id=(file_driver,
3426
                                          "%s/disk%d" % (file_storage_dir,
3427
                                                         idx)))
3428
      disks.append(disk_dev)
3429
  else:
3430
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3431
  return disks
3432

    
3433

    
3434
def _GetInstanceInfoText(instance):
3435
  """Compute that text that should be added to the disk's metadata.
3436

3437
  """
3438
  return "originstname+%s" % instance.name
3439

    
3440

    
3441
def _CreateDisks(lu, instance):
3442
  """Create all disks for an instance.
3443

3444
  This abstracts away some work from AddInstance.
3445

3446
  @type lu: L{LogicalUnit}
3447
  @param lu: the logical unit on whose behalf we execute
3448
  @type instance: L{objects.Instance}
3449
  @param instance: the instance whose disks we should create
3450
  @rtype: boolean
3451
  @return: the success of the creation
3452

3453
  """
3454
  info = _GetInstanceInfoText(instance)
3455

    
3456
  if instance.disk_template == constants.DT_FILE:
3457
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3458
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3459
                                                 file_storage_dir)
3460

    
3461
    if result.failed or not result.data:
3462
      logging.error("Could not connect to node '%s'", instance.primary_node)
3463
      return False
3464

    
3465
    if not result.data[0]:
3466
      logging.error("Failed to create directory '%s'", file_storage_dir)
3467
      return False
3468

    
3469
  # Note: this needs to be kept in sync with adding of disks in
3470
  # LUSetInstanceParams
3471
  for device in instance.disks:
3472
    logging.info("Creating volume %s for instance %s",
3473
                 device.iv_name, instance.name)
3474
    #HARDCODE
3475
    for secondary_node in instance.secondary_nodes:
3476
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3477
                                        device, False, info):
3478
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3479
                      device.iv_name, device, secondary_node)
3480
        return False
3481
    #HARDCODE
3482
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3483
                                    instance, device, info):
3484
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3485
      return False
3486

    
3487
  return True
3488

    
3489

    
3490
def _RemoveDisks(lu, instance):
3491
  """Remove all disks for an instance.
3492

3493
  This abstracts away some work from `AddInstance()` and
3494
  `RemoveInstance()`. Note that in case some of the devices couldn't
3495
  be removed, the removal will continue with the other ones (compare
3496
  with `_CreateDisks()`).
3497

3498
  @type lu: L{LogicalUnit}
3499
  @param lu: the logical unit on whose behalf we execute
3500
  @type instance: L{objects.Instance}
3501
  @param instance: the instance whose disks we should remove
3502
  @rtype: boolean
3503
  @return: the success of the removal
3504

3505
  """
3506
  logging.info("Removing block devices for instance %s", instance.name)
3507

    
3508
  result = True
3509
  for device in instance.disks:
3510
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3511
      lu.cfg.SetDiskID(disk, node)
3512
      result = lu.rpc.call_blockdev_remove(node, disk)
3513
      if result.failed or not result.data:
3514
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3515
                           " continuing anyway", device.iv_name, node)
3516
        result = False
3517

    
3518
  if instance.disk_template == constants.DT_FILE:
3519
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3520
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3521
                                                 file_storage_dir)
3522
    if result.failed or not result.data:
3523
      logging.error("Could not remove directory '%s'", file_storage_dir)
3524
      result = False
3525

    
3526
  return result
3527

    
3528

    
3529
def _ComputeDiskSize(disk_template, disks):
3530
  """Compute disk size requirements in the volume group
3531

3532
  """
3533
  # Required free disk space as a function of disk and swap space
3534
  req_size_dict = {
3535
    constants.DT_DISKLESS: None,
3536
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3537
    # 128 MB are added for drbd metadata for each disk
3538
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3539
    constants.DT_FILE: None,
3540
  }
3541

    
3542
  if disk_template not in req_size_dict:
3543
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3544
                                 " is unknown" %  disk_template)
3545

    
3546
  return req_size_dict[disk_template]
3547

    
3548

    
3549
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3550
  """Hypervisor parameter validation.
3551

3552
  This function abstract the hypervisor parameter validation to be
3553
  used in both instance create and instance modify.
3554

3555
  @type lu: L{LogicalUnit}
3556
  @param lu: the logical unit for which we check
3557
  @type nodenames: list
3558
  @param nodenames: the list of nodes on which we should check
3559
  @type hvname: string
3560
  @param hvname: the name of the hypervisor we should use
3561
  @type hvparams: dict
3562
  @param hvparams: the parameters which we need to check
3563
  @raise errors.OpPrereqError: if the parameters are not valid
3564

3565
  """
3566
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3567
                                                  hvname,
3568
                                                  hvparams)
3569
  for node in nodenames:
3570
    info = hvinfo[node]
3571
    info.Raise()
3572
    if not info.data or not isinstance(info.data, (tuple, list)):
3573
      raise errors.OpPrereqError("Cannot get current information"
3574
                                 " from node '%s' (%s)" % (node, info.data))
3575
    if not info.data[0]:
3576
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3577
                                 " %s" % info.data[1])
3578

    
3579

    
3580
class LUCreateInstance(LogicalUnit):
3581
  """Create an instance.
3582

3583
  """
3584
  HPATH = "instance-add"
3585
  HTYPE = constants.HTYPE_INSTANCE
3586
  _OP_REQP = ["instance_name", "disks", "disk_template",
3587
              "mode", "start",
3588
              "wait_for_sync", "ip_check", "nics",
3589
              "hvparams", "beparams"]
3590
  REQ_BGL = False
3591

    
3592
  def _ExpandNode(self, node):
3593
    """Expands and checks one node name.
3594

3595
    """
3596
    node_full = self.cfg.ExpandNodeName(node)
3597
    if node_full is None:
3598
      raise errors.OpPrereqError("Unknown node %s" % node)
3599
    return node_full
3600

    
3601
  def ExpandNames(self):
3602
    """ExpandNames for CreateInstance.
3603

3604
    Figure out the right locks for instance creation.
3605

3606
    """
3607
    self.needed_locks = {}
3608

    
3609
    # set optional parameters to none if they don't exist
3610
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3611
      if not hasattr(self.op, attr):
3612
        setattr(self.op, attr, None)
3613

    
3614
    # cheap checks, mostly valid constants given
3615

    
3616
    # verify creation mode
3617
    if self.op.mode not in (constants.INSTANCE_CREATE,
3618
                            constants.INSTANCE_IMPORT):
3619
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3620
                                 self.op.mode)
3621

    
3622
    # disk template and mirror node verification
3623
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3624
      raise errors.OpPrereqError("Invalid disk template name")
3625

    
3626
    if self.op.hypervisor is None:
3627
      self.op.hypervisor = self.cfg.GetHypervisorType()
3628

    
3629
    cluster = self.cfg.GetClusterInfo()
3630
    enabled_hvs = cluster.enabled_hypervisors
3631
    if self.op.hypervisor not in enabled_hvs:
3632
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3633
                                 " cluster (%s)" % (self.op.hypervisor,
3634
                                  ",".join(enabled_hvs)))
3635

    
3636
    # check hypervisor parameter syntax (locally)
3637

    
3638
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3639
                                  self.op.hvparams)
3640
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3641
    hv_type.CheckParameterSyntax(filled_hvp)
3642

    
3643
    # fill and remember the beparams dict
3644
    utils.CheckBEParams(self.op.beparams)
3645
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3646
                                    self.op.beparams)
3647

    
3648
    #### instance parameters check
3649

    
3650
    # instance name verification
3651
    hostname1 = utils.HostInfo(self.op.instance_name)
3652
    self.op.instance_name = instance_name = hostname1.name
3653

    
3654
    # this is just a preventive check, but someone might still add this
3655
    # instance in the meantime, and creation will fail at lock-add time
3656
    if instance_name in self.cfg.GetInstanceList():
3657
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3658
                                 instance_name)
3659

    
3660
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3661

    
3662
    # NIC buildup
3663
    self.nics = []
3664
    for nic in self.op.nics:
3665
      # ip validity checks
3666
      ip = nic.get("ip", None)
3667
      if ip is None or ip.lower() == "none":
3668
        nic_ip = None
3669
      elif ip.lower() == constants.VALUE_AUTO:
3670
        nic_ip = hostname1.ip
3671
      else:
3672
        if not utils.IsValidIP(ip):
3673
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3674
                                     " like a valid IP" % ip)
3675
        nic_ip = ip
3676

    
3677
      # MAC address verification
3678
      mac = nic.get("mac", constants.VALUE_AUTO)
3679
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3680
        if not utils.IsValidMac(mac.lower()):
3681
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3682
                                     mac)
3683
      # bridge verification
3684
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3685
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3686

    
3687
    # disk checks/pre-build
3688
    self.disks = []
3689
    for disk in self.op.disks:
3690
      mode = disk.get("mode", constants.DISK_RDWR)
3691
      if mode not in constants.DISK_ACCESS_SET:
3692
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3693
                                   mode)
3694
      size = disk.get("size", None)
3695
      if size is None:
3696
        raise errors.OpPrereqError("Missing disk size")
3697
      try:
3698
        size = int(size)
3699
      except ValueError:
3700
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3701
      self.disks.append({"size": size, "mode": mode})
3702

    
3703
    # used in CheckPrereq for ip ping check
3704
    self.check_ip = hostname1.ip
3705

    
3706
    # file storage checks
3707
    if (self.op.file_driver and
3708
        not self.op.file_driver in constants.FILE_DRIVER):
3709
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3710
                                 self.op.file_driver)
3711

    
3712
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3713
      raise errors.OpPrereqError("File storage directory path not absolute")
3714

    
3715
    ### Node/iallocator related checks
3716
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3717
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3718
                                 " node must be given")
3719

    
3720
    if self.op.iallocator:
3721
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3722
    else:
3723
      self.op.pnode = self._ExpandNode(self.op.pnode)
3724
      nodelist = [self.op.pnode]
3725
      if self.op.snode is not None:
3726
        self.op.snode = self._ExpandNode(self.op.snode)
3727
        nodelist.append(self.op.snode)
3728
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3729

    
3730
    # in case of import lock the source node too
3731
    if self.op.mode == constants.INSTANCE_IMPORT:
3732
      src_node = getattr(self.op, "src_node", None)
3733
      src_path = getattr(self.op, "src_path", None)
3734

    
3735
      if src_path is None:
3736
        self.op.src_path = src_path = self.op.instance_name
3737

    
3738
      if src_node is None:
3739
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3740
        self.op.src_node = None
3741
        if os.path.isabs(src_path):
3742
          raise errors.OpPrereqError("Importing an instance from an absolute"
3743
                                     " path requires a source node option.")
3744
      else:
3745
        self.op.src_node = src_node = self._ExpandNode(src_node)
3746
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3747
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
3748
        if not os.path.isabs(src_path):
3749
          self.op.src_path = src_path = \
3750
            os.path.join(constants.EXPORT_DIR, src_path)
3751

    
3752
    else: # INSTANCE_CREATE
3753
      if getattr(self.op, "os_type", None) is None:
3754
        raise errors.OpPrereqError("No guest OS specified")
3755

    
3756
  def _RunAllocator(self):
3757
    """Run the allocator based on input opcode.
3758

3759
    """
3760
    nics = [n.ToDict() for n in self.nics]
3761
    ial = IAllocator(self,
3762
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3763
                     name=self.op.instance_name,
3764
                     disk_template=self.op.disk_template,
3765
                     tags=[],
3766
                     os=self.op.os_type,
3767
                     vcpus=self.be_full[constants.BE_VCPUS],
3768
                     mem_size=self.be_full[constants.BE_MEMORY],
3769
                     disks=self.disks,
3770
                     nics=nics,
3771
                     hypervisor=self.op.hypervisor,
3772
                     )
3773

    
3774
    ial.Run(self.op.iallocator)
3775

    
3776
    if not ial.success:
3777
      raise errors.OpPrereqError("Can't compute nodes using"
3778
                                 " iallocator '%s': %s" % (self.op.iallocator,
3779
                                                           ial.info))
3780
    if len(ial.nodes) != ial.required_nodes:
3781
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3782
                                 " of nodes (%s), required %s" %
3783
                                 (self.op.iallocator, len(ial.nodes),
3784
                                  ial.required_nodes))
3785
    self.op.pnode = ial.nodes[0]
3786
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3787
                 self.op.instance_name, self.op.iallocator,
3788
                 ", ".join(ial.nodes))
3789
    if ial.required_nodes == 2:
3790
      self.op.snode = ial.nodes[1]
3791

    
3792
  def BuildHooksEnv(self):
3793
    """Build hooks env.
3794

3795
    This runs on master, primary and secondary nodes of the instance.
3796

3797
    """
3798
    env = {
3799
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3800
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3801
      "INSTANCE_ADD_MODE": self.op.mode,
3802
      }
3803
    if self.op.mode == constants.INSTANCE_IMPORT:
3804
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3805
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3806
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3807

    
3808
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3809
      primary_node=self.op.pnode,
3810
      secondary_nodes=self.secondaries,
3811
      status=self.instance_status,
3812
      os_type=self.op.os_type,
3813
      memory=self.be_full[constants.BE_MEMORY],
3814
      vcpus=self.be_full[constants.BE_VCPUS],
3815
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3816
    ))
3817

    
3818
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3819
          self.secondaries)
3820
    return env, nl, nl
3821

    
3822

    
3823
  def CheckPrereq(self):
3824
    """Check prerequisites.
3825

3826
    """
3827
    if (not self.cfg.GetVGName() and
3828
        self.op.disk_template not in constants.DTS_NOT_LVM):
3829
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3830
                                 " instances")
3831

    
3832

    
3833
    if self.op.mode == constants.INSTANCE_IMPORT:
3834
      src_node = self.op.src_node
3835
      src_path = self.op.src_path
3836

    
3837
      if src_node is None:
3838
        exp_list = self.rpc.call_export_list(
3839
          self.acquired_locks[locking.LEVEL_NODE])
3840
        found = False
3841
        for node in exp_list:
3842
          if not exp_list[node].failed and src_path in exp_list[node].data:
3843
            found = True
3844
            self.op.src_node = src_node = node
3845
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3846
                                                       src_path)
3847
            break
3848
        if not found:
3849
          raise errors.OpPrereqError("No export found for relative path %s" %
3850
                                      src_path)
3851

    
3852
      result = self.rpc.call_export_info(src_node, src_path)
3853
      result.Raise()
3854
      if not result.data:
3855
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3856

    
3857
      export_info = result.data
3858
      if not export_info.has_section(constants.INISECT_EXP):
3859
        raise errors.ProgrammerError("Corrupted export config")
3860

    
3861
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3862
      if (int(ei_version) != constants.EXPORT_VERSION):
3863
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3864
                                   (ei_version, constants.EXPORT_VERSION))
3865

    
3866
      # Check that the new instance doesn't have less disks than the export
3867
      instance_disks = len(self.disks)
3868
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3869
      if instance_disks < export_disks:
3870
        raise errors.OpPrereqError("Not enough disks to import."
3871
                                   " (instance: %d, export: %d)" %
3872
                                   (instance_disks, export_disks))
3873

    
3874
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3875
      disk_images = []
3876
      for idx in range(export_disks):
3877
        option = 'disk%d_dump' % idx
3878
        if export_info.has_option(constants.INISECT_INS, option):
3879
          # FIXME: are the old os-es, disk sizes, etc. useful?
3880
          export_name = export_info.get(constants.INISECT_INS, option)
3881
          image = os.path.join(src_path, export_name)
3882
          disk_images.append(image)
3883
        else:
3884
          disk_images.append(False)
3885

    
3886
      self.src_images = disk_images
3887

    
3888
      old_name = export_info.get(constants.INISECT_INS, 'name')
3889
      # FIXME: int() here could throw a ValueError on broken exports
3890
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3891
      if self.op.instance_name == old_name:
3892
        for idx, nic in enumerate(self.nics):
3893
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3894
            nic_mac_ini = 'nic%d_mac' % idx
3895
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3896

    
3897
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3898
    if self.op.start and not self.op.ip_check:
3899
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3900
                                 " adding an instance in start mode")
3901

    
3902
    if self.op.ip_check:
3903
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3904
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3905
                                   (self.check_ip, self.op.instance_name))
3906

    
3907
    #### allocator run
3908

    
3909
    if self.op.iallocator is not None:
3910
      self._RunAllocator()
3911

    
3912
    #### node related checks
3913

    
3914
    # check primary node
3915
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3916
    assert self.pnode is not None, \
3917
      "Cannot retrieve locked node %s" % self.op.pnode
3918
    self.secondaries = []
3919

    
3920
    # mirror node verification
3921
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3922
      if self.op.snode is None:
3923
        raise errors.OpPrereqError("The networked disk templates need"
3924
                                   " a mirror node")
3925
      if self.op.snode == pnode.name:
3926
        raise errors.OpPrereqError("The secondary node cannot be"
3927
                                   " the primary node.")
3928
      self.secondaries.append(self.op.snode)
3929

    
3930
    nodenames = [pnode.name] + self.secondaries
3931

    
3932
    req_size = _ComputeDiskSize(self.op.disk_template,
3933
                                self.disks)
3934

    
3935
    # Check lv size requirements
3936
    if req_size is not None:
3937
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3938
                                         self.op.hypervisor)
3939
      for node in nodenames:
3940
        info = nodeinfo[node]
3941
        info.Raise()
3942
        info = info.data
3943
        if not info:
3944
          raise errors.OpPrereqError("Cannot get current information"
3945
                                     " from node '%s'" % node)
3946
        vg_free = info.get('vg_free', None)
3947
        if not isinstance(vg_free, int):
3948
          raise errors.OpPrereqError("Can't compute free disk space on"
3949
                                     " node %s" % node)
3950
        if req_size > info['vg_free']:
3951
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3952
                                     " %d MB available, %d MB required" %
3953
                                     (node, info['vg_free'], req_size))
3954

    
3955
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3956

    
3957
    # os verification
3958
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3959
    result.Raise()
3960
    if not isinstance(result.data, objects.OS):
3961
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3962
                                 " primary node"  % self.op.os_type)
3963

    
3964
    # bridge check on primary node
3965
    bridges = [n.bridge for n in self.nics]
3966
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3967
    result.Raise()
3968
    if not result.data:
3969
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
3970
                                 " exist on destination node '%s'" %
3971
                                 (",".join(bridges), pnode.name))
3972

    
3973
    # memory check on primary node
3974
    if self.op.start:
3975
      _CheckNodeFreeMemory(self, self.pnode.name,
3976
                           "creating instance %s" % self.op.instance_name,
3977
                           self.be_full[constants.BE_MEMORY],
3978
                           self.op.hypervisor)
3979

    
3980
    if self.op.start:
3981
      self.instance_status = 'up'
3982
    else:
3983
      self.instance_status = 'down'
3984

    
3985
  def Exec(self, feedback_fn):
3986
    """Create and add the instance to the cluster.
3987

3988
    """
3989
    instance = self.op.instance_name
3990
    pnode_name = self.pnode.name
3991

    
3992
    for nic in self.nics:
3993
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3994
        nic.mac = self.cfg.GenerateMAC()
3995

    
3996
    ht_kind = self.op.hypervisor
3997
    if ht_kind in constants.HTS_REQ_PORT:
3998
      network_port = self.cfg.AllocatePort()
3999
    else:
4000
      network_port = None
4001

    
4002
    ##if self.op.vnc_bind_address is None:
4003
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4004

    
4005
    # this is needed because os.path.join does not accept None arguments
4006
    if self.op.file_storage_dir is None:
4007
      string_file_storage_dir = ""
4008
    else:
4009
      string_file_storage_dir = self.op.file_storage_dir
4010

    
4011
    # build the full file storage dir path
4012
    file_storage_dir = os.path.normpath(os.path.join(
4013
                                        self.cfg.GetFileStorageDir(),
4014
                                        string_file_storage_dir, instance))
4015

    
4016

    
4017
    disks = _GenerateDiskTemplate(self,
4018
                                  self.op.disk_template,
4019
                                  instance, pnode_name,
4020
                                  self.secondaries,
4021
                                  self.disks,
4022
                                  file_storage_dir,
4023
                                  self.op.file_driver,
4024
                                  0)
4025

    
4026
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4027
                            primary_node=pnode_name,
4028
                            nics=self.nics, disks=disks,
4029
                            disk_template=self.op.disk_template,
4030
                            status=self.instance_status,
4031
                            network_port=network_port,
4032
                            beparams=self.op.beparams,
4033
                            hvparams=self.op.hvparams,
4034
                            hypervisor=self.op.hypervisor,
4035
                            )
4036

    
4037
    feedback_fn("* creating instance disks...")
4038
    if not _CreateDisks(self, iobj):
4039
      _RemoveDisks(self, iobj)
4040
      self.cfg.ReleaseDRBDMinors(instance)
4041
      raise errors.OpExecError("Device creation failed, reverting...")
4042

    
4043
    feedback_fn("adding instance %s to cluster config" % instance)
4044

    
4045
    self.cfg.AddInstance(iobj)
4046
    # Declare that we don't want to remove the instance lock anymore, as we've
4047
    # added the instance to the config
4048
    del self.remove_locks[locking.LEVEL_INSTANCE]
4049
    # Remove the temp. assignements for the instance's drbds
4050
    self.cfg.ReleaseDRBDMinors(instance)
4051
    # Unlock all the nodes
4052
    if self.op.mode == constants.INSTANCE_IMPORT:
4053
      nodes_keep = [self.op.src_node]
4054
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4055
                       if node != self.op.src_node]
4056
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4057
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4058
    else:
4059
      self.context.glm.release(locking.LEVEL_NODE)
4060
      del self.acquired_locks[locking.LEVEL_NODE]
4061

    
4062
    if self.op.wait_for_sync:
4063
      disk_abort = not _WaitForSync(self, iobj)
4064
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4065
      # make sure the disks are not degraded (still sync-ing is ok)
4066
      time.sleep(15)
4067
      feedback_fn("* checking mirrors status")
4068
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4069
    else:
4070
      disk_abort = False
4071

    
4072
    if disk_abort:
4073
      _RemoveDisks(self, iobj)
4074
      self.cfg.RemoveInstance(iobj.name)
4075
      # Make sure the instance lock gets removed
4076
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4077
      raise errors.OpExecError("There are some degraded disks for"
4078
                               " this instance")
4079

    
4080
    feedback_fn("creating os for instance %s on node %s" %
4081
                (instance, pnode_name))
4082

    
4083
    if iobj.disk_template != constants.DT_DISKLESS:
4084
      if self.op.mode == constants.INSTANCE_CREATE:
4085
        feedback_fn("* running the instance OS create scripts...")
4086
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4087
        result.Raise()
4088
        if not result.data:
4089
          raise errors.OpExecError("Could not add os for instance %s"
4090
                                   " on node %s" %
4091
                                   (instance, pnode_name))
4092

    
4093
      elif self.op.mode == constants.INSTANCE_IMPORT:
4094
        feedback_fn("* running the instance OS import scripts...")
4095
        src_node = self.op.src_node
4096
        src_images = self.src_images
4097
        cluster_name = self.cfg.GetClusterName()
4098
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4099
                                                         src_node, src_images,
4100
                                                         cluster_name)
4101
        import_result.Raise()
4102
        for idx, result in enumerate(import_result.data):
4103
          if not result:
4104
            self.LogWarning("Could not import the image %s for instance"
4105
                            " %s, disk %d, on node %s" %
4106
                            (src_images[idx], instance, idx, pnode_name))
4107
      else:
4108
        # also checked in the prereq part
4109
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4110
                                     % self.op.mode)
4111

    
4112
    if self.op.start:
4113
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4114
      feedback_fn("* starting instance...")
4115
      result = self.rpc.call_instance_start(pnode_name, iobj, None)
4116
      result.Raise()
4117
      if not result.data:
4118
        raise errors.OpExecError("Could not start instance")
4119

    
4120

    
4121
class LUConnectConsole(NoHooksLU):
4122
  """Connect to an instance's console.
4123

4124
  This is somewhat special in that it returns the command line that
4125
  you need to run on the master node in order to connect to the
4126
  console.
4127

4128
  """
4129
  _OP_REQP = ["instance_name"]
4130
  REQ_BGL = False
4131

    
4132
  def ExpandNames(self):
4133
    self._ExpandAndLockInstance()
4134

    
4135
  def CheckPrereq(self):
4136
    """Check prerequisites.
4137

4138
    This checks that the instance is in the cluster.
4139

4140
    """
4141
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4142
    assert self.instance is not None, \
4143
      "Cannot retrieve locked instance %s" % self.op.instance_name
4144

    
4145
  def Exec(self, feedback_fn):
4146
    """Connect to the console of an instance
4147

4148
    """
4149
    instance = self.instance
4150
    node = instance.primary_node
4151

    
4152
    node_insts = self.rpc.call_instance_list([node],
4153
                                             [instance.hypervisor])[node]
4154
    node_insts.Raise()
4155

    
4156
    if instance.name not in node_insts.data:
4157
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4158

    
4159
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4160

    
4161
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4162
    console_cmd = hyper.GetShellCommandForConsole(instance)
4163

    
4164
    # build ssh cmdline
4165
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4166

    
4167

    
4168
class LUReplaceDisks(LogicalUnit):
4169
  """Replace the disks of an instance.
4170

4171
  """
4172
  HPATH = "mirrors-replace"
4173
  HTYPE = constants.HTYPE_INSTANCE
4174
  _OP_REQP = ["instance_name", "mode", "disks"]
4175
  REQ_BGL = False
4176

    
4177
  def ExpandNames(self):
4178
    self._ExpandAndLockInstance()
4179

    
4180
    if not hasattr(self.op, "remote_node"):
4181
      self.op.remote_node = None
4182

    
4183
    ia_name = getattr(self.op, "iallocator", None)
4184
    if ia_name is not None:
4185
      if self.op.remote_node is not None:
4186
        raise errors.OpPrereqError("Give either the iallocator or the new"
4187
                                   " secondary, not both")
4188
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4189
    elif self.op.remote_node is not None:
4190
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4191
      if remote_node is None:
4192
        raise errors.OpPrereqError("Node '%s' not known" %
4193
                                   self.op.remote_node)
4194
      self.op.remote_node = remote_node
4195
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4196
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4197
    else:
4198
      self.needed_locks[locking.LEVEL_NODE] = []
4199
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4200

    
4201
  def DeclareLocks(self, level):
4202
    # If we're not already locking all nodes in the set we have to declare the
4203
    # instance's primary/secondary nodes.
4204
    if (level == locking.LEVEL_NODE and
4205
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4206
      self._LockInstancesNodes()
4207

    
4208
  def _RunAllocator(self):
4209
    """Compute a new secondary node using an IAllocator.
4210

4211
    """
4212
    ial = IAllocator(self,
4213
                     mode=constants.IALLOCATOR_MODE_RELOC,
4214
                     name=self.op.instance_name,
4215
                     relocate_from=[self.sec_node])
4216

    
4217
    ial.Run(self.op.iallocator)
4218

    
4219
    if not ial.success:
4220
      raise errors.OpPrereqError("Can't compute nodes using"
4221
                                 " iallocator '%s': %s" % (self.op.iallocator,
4222
                                                           ial.info))
4223
    if len(ial.nodes) != ial.required_nodes:
4224
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4225
                                 " of nodes (%s), required %s" %
4226
                                 (len(ial.nodes), ial.required_nodes))
4227
    self.op.remote_node = ial.nodes[0]
4228
    self.LogInfo("Selected new secondary for the instance: %s",
4229
                 self.op.remote_node)
4230

    
4231
  def BuildHooksEnv(self):
4232
    """Build hooks env.
4233

4234
    This runs on the master, the primary and all the secondaries.
4235

4236
    """
4237
    env = {
4238
      "MODE": self.op.mode,
4239
      "NEW_SECONDARY": self.op.remote_node,
4240
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4241
      }
4242
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4243
    nl = [
4244
      self.cfg.GetMasterNode(),
4245
      self.instance.primary_node,
4246
      ]
4247
    if self.op.remote_node is not None:
4248
      nl.append(self.op.remote_node)
4249
    return env, nl, nl
4250

    
4251
  def CheckPrereq(self):
4252
    """Check prerequisites.
4253

4254
    This checks that the instance is in the cluster.
4255

4256
    """
4257
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4258
    assert instance is not None, \
4259
      "Cannot retrieve locked instance %s" % self.op.instance_name
4260
    self.instance = instance
4261

    
4262
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4263
      raise errors.OpPrereqError("Instance's disk layout is not"
4264
                                 " network mirrored.")
4265

    
4266
    if len(instance.secondary_nodes) != 1:
4267
      raise errors.OpPrereqError("The instance has a strange layout,"
4268
                                 " expected one secondary but found %d" %
4269
                                 len(instance.secondary_nodes))
4270

    
4271
    self.sec_node = instance.secondary_nodes[0]
4272

    
4273
    ia_name = getattr(self.op, "iallocator", None)
4274
    if ia_name is not None:
4275
      self._RunAllocator()
4276

    
4277
    remote_node = self.op.remote_node
4278
    if remote_node is not None:
4279
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4280
      assert self.remote_node_info is not None, \
4281
        "Cannot retrieve locked node %s" % remote_node
4282
    else:
4283
      self.remote_node_info = None
4284
    if remote_node == instance.primary_node:
4285
      raise errors.OpPrereqError("The specified node is the primary node of"
4286
                                 " the instance.")
4287
    elif remote_node == self.sec_node:
4288
      if self.op.mode == constants.REPLACE_DISK_SEC:
4289
        # this is for DRBD8, where we can't execute the same mode of
4290
        # replacement as for drbd7 (no different port allocated)
4291
        raise errors.OpPrereqError("Same secondary given, cannot execute"
4292
                                   " replacement")
4293
    if instance.disk_template == constants.DT_DRBD8:
4294
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4295
          remote_node is not None):
4296
        # switch to replace secondary mode
4297
        self.op.mode = constants.REPLACE_DISK_SEC
4298

    
4299
      if self.op.mode == constants.REPLACE_DISK_ALL:
4300
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4301
                                   " secondary disk replacement, not"
4302
                                   " both at once")
4303
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4304
        if remote_node is not None:
4305
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4306
                                     " the secondary while doing a primary"
4307
                                     " node disk replacement")
4308
        self.tgt_node = instance.primary_node
4309
        self.oth_node = instance.secondary_nodes[0]
4310
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4311
        self.new_node = remote_node # this can be None, in which case
4312
                                    # we don't change the secondary
4313
        self.tgt_node = instance.secondary_nodes[0]
4314
        self.oth_node = instance.primary_node
4315
      else:
4316
        raise errors.ProgrammerError("Unhandled disk replace mode")
4317

    
4318
    if not self.op.disks:
4319
      self.op.disks = range(len(instance.disks))
4320

    
4321
    for disk_idx in self.op.disks:
4322
      instance.FindDisk(disk_idx)
4323

    
4324
  def _ExecD8DiskOnly(self, feedback_fn):
4325
    """Replace a disk on the primary or secondary for dbrd8.
4326

4327
    The algorithm for replace is quite complicated:
4328

4329
      1. for each disk to be replaced:
4330

4331
        1. create new LVs on the target node with unique names
4332
        1. detach old LVs from the drbd device
4333
        1. rename old LVs to name_replaced.<time_t>
4334
        1. rename new LVs to old LVs
4335
        1. attach the new LVs (with the old names now) to the drbd device
4336

4337
      1. wait for sync across all devices
4338

4339
      1. for each modified disk:
4340

4341
        1. remove old LVs (which have the name name_replaces.<time_t>)
4342

4343
    Failures are not very well handled.
4344

4345
    """
4346
    steps_total = 6
4347
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4348
    instance = self.instance
4349
    iv_names = {}
4350
    vgname = self.cfg.GetVGName()
4351
    # start of work
4352
    cfg = self.cfg
4353
    tgt_node = self.tgt_node
4354
    oth_node = self.oth_node
4355

    
4356
    # Step: check device activation
4357
    self.proc.LogStep(1, steps_total, "check device existence")
4358
    info("checking volume groups")
4359
    my_vg = cfg.GetVGName()
4360
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4361
    if not results:
4362
      raise errors.OpExecError("Can't list volume groups on the nodes")
4363
    for node in oth_node, tgt_node:
4364
      res = results[node]
4365
      if res.failed or not res.data or my_vg not in res.data:
4366
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4367
                                 (my_vg, node))
4368
    for idx, dev in enumerate(instance.disks):
4369
      if idx not in self.op.disks:
4370
        continue
4371
      for node in tgt_node, oth_node:
4372
        info("checking disk/%d on %s" % (idx, node))
4373
        cfg.SetDiskID(dev, node)
4374
        if not self.rpc.call_blockdev_find(node, dev):
4375
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4376
                                   (idx, node))
4377

    
4378
    # Step: check other node consistency
4379
    self.proc.LogStep(2, steps_total, "check peer consistency")
4380
    for idx, dev in enumerate(instance.disks):
4381
      if idx not in self.op.disks:
4382
        continue
4383
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4384
      if not _CheckDiskConsistency(self, dev, oth_node,
4385
                                   oth_node==instance.primary_node):
4386
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4387
                                 " to replace disks on this node (%s)" %
4388
                                 (oth_node, tgt_node))
4389

    
4390
    # Step: create new storage
4391
    self.proc.LogStep(3, steps_total, "allocate new storage")
4392
    for idx, dev in enumerate(instance.disks):
4393
      if idx not in self.op.disks:
4394
        continue
4395
      size = dev.size
4396
      cfg.SetDiskID(dev, tgt_node)
4397
      lv_names = [".disk%d_%s" % (idx, suf)
4398
                  for suf in ["data", "meta"]]
4399
      names = _GenerateUniqueNames(self, lv_names)
4400
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4401
                             logical_id=(vgname, names[0]))
4402
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4403
                             logical_id=(vgname, names[1]))
4404
      new_lvs = [lv_data, lv_meta]
4405
      old_lvs = dev.children
4406
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4407
      info("creating new local storage on %s for %s" %
4408
           (tgt_node, dev.iv_name))
4409
      # since we *always* want to create this LV, we use the
4410
      # _Create...OnPrimary (which forces the creation), even if we
4411
      # are talking about the secondary node
4412
      for new_lv in new_lvs:
4413
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4414
                                        _GetInstanceInfoText(instance)):
4415
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4416
                                   " node '%s'" %
4417
                                   (new_lv.logical_id[1], tgt_node))
4418

    
4419
    # Step: for each lv, detach+rename*2+attach
4420
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4421
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4422
      info("detaching %s drbd from local storage" % dev.iv_name)
4423
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4424
      result.Raise()
4425
      if not result.data:
4426
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4427
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4428
      #dev.children = []
4429
      #cfg.Update(instance)
4430

    
4431
      # ok, we created the new LVs, so now we know we have the needed
4432
      # storage; as such, we proceed on the target node to rename
4433
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4434
      # using the assumption that logical_id == physical_id (which in
4435
      # turn is the unique_id on that node)
4436

    
4437
      # FIXME(iustin): use a better name for the replaced LVs
4438
      temp_suffix = int(time.time())
4439
      ren_fn = lambda d, suff: (d.physical_id[0],
4440
                                d.physical_id[1] + "_replaced-%s" % suff)
4441
      # build the rename list based on what LVs exist on the node
4442
      rlist = []
4443
      for to_ren in old_lvs:
4444
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4445
        if not find_res.failed and find_res.data is not None: # device exists
4446
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4447

    
4448
      info("renaming the old LVs on the target node")
4449
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4450
      result.Raise()
4451
      if not result.data:
4452
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4453
      # now we rename the new LVs to the old LVs
4454
      info("renaming the new LVs on the target node")
4455
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4456
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4457
      result.Raise()
4458
      if not result.data:
4459
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4460

    
4461
      for old, new in zip(old_lvs, new_lvs):
4462
        new.logical_id = old.logical_id
4463
        cfg.SetDiskID(new, tgt_node)
4464

    
4465
      for disk in old_lvs:
4466
        disk.logical_id = ren_fn(disk, temp_suffix)
4467
        cfg.SetDiskID(disk, tgt_node)
4468

    
4469
      # now that the new lvs have the old name, we can add them to the device
4470
      info("adding new mirror component on %s" % tgt_node)
4471
      result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4472
      if result.failed or not result.data:
4473
        for new_lv in new_lvs:
4474
          result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4475
          if result.failed or not result.data:
4476
            warning("Can't rollback device %s", hint="manually cleanup unused"
4477
                    " logical volumes")
4478
        raise errors.OpExecError("Can't add local storage to drbd")
4479

    
4480
      dev.children = new_lvs
4481
      cfg.Update(instance)
4482

    
4483
    # Step: wait for sync
4484

    
4485
    # this can fail as the old devices are degraded and _WaitForSync
4486
    # does a combined result over all disks, so we don't check its
4487
    # return value
4488
    self.proc.LogStep(5, steps_total, "sync devices")
4489
    _WaitForSync(self, instance, unlock=True)
4490

    
4491
    # so check manually all the devices
4492
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4493
      cfg.SetDiskID(dev, instance.primary_node)
4494
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4495
      if result.failed or result.data[5]:
4496
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4497

    
4498
    # Step: remove old storage
4499
    self.proc.LogStep(6, steps_total, "removing old storage")
4500
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4501
      info("remove logical volumes for %s" % name)
4502
      for lv in old_lvs:
4503
        cfg.SetDiskID(lv, tgt_node)
4504
        result = self.rpc.call_blockdev_remove(tgt_node, lv)
4505
        if result.failed or not result.data:
4506
          warning("Can't remove old LV", hint="manually remove unused LVs")
4507
          continue
4508

    
4509
  def _ExecD8Secondary(self, feedback_fn):
4510
    """Replace the secondary node for drbd8.
4511

4512
    The algorithm for replace is quite complicated:
4513
      - for all disks of the instance:
4514
        - create new LVs on the new node with same names
4515
        - shutdown the drbd device on the old secondary
4516
        - disconnect the drbd network on the primary
4517
        - create the drbd device on the new secondary
4518
        - network attach the drbd on the primary, using an artifice:
4519
          the drbd code for Attach() will connect to the network if it
4520
          finds a device which is connected to the good local disks but
4521
          not network enabled
4522
      - wait for sync across all devices
4523
      - remove all disks from the old secondary
4524

4525
    Failures are not very well handled.
4526

4527
    """
4528
    steps_total = 6
4529
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4530
    instance = self.instance
4531
    iv_names = {}
4532
    vgname = self.cfg.GetVGName()
4533
    # start of work
4534
    cfg = self.cfg
4535
    old_node = self.tgt_node
4536
    new_node = self.new_node
4537
    pri_node = instance.primary_node
4538

    
4539
    # Step: check device activation
4540
    self.proc.LogStep(1, steps_total, "check device existence")
4541
    info("checking volume groups")
4542
    my_vg = cfg.GetVGName()
4543
    results = self.rpc.call_vg_list([pri_node, new_node])
4544
    for node in pri_node, new_node:
4545
      res = results[node]
4546
      if res.failed or not res.data or my_vg not in res.data:
4547
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4548
                                 (my_vg, node))
4549
    for idx, dev in enumerate(instance.disks):
4550
      if idx not in self.op.disks:
4551
        continue
4552
      info("checking disk/%d on %s" % (idx, pri_node))
4553
      cfg.SetDiskID(dev, pri_node)
4554
      result = self.rpc.call_blockdev_find(pri_node, dev)
4555
      result.Raise()
4556
      if not result.data:
4557
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4558
                                 (idx, pri_node))
4559

    
4560
    # Step: check other node consistency
4561
    self.proc.LogStep(2, steps_total, "check peer consistency")
4562
    for idx, dev in enumerate(instance.disks):
4563
      if idx not in self.op.disks:
4564
        continue
4565
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4566
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4567
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4568
                                 " unsafe to replace the secondary" %
4569
                                 pri_node)
4570

    
4571
    # Step: create new storage
4572
    self.proc.LogStep(3, steps_total, "allocate new storage")
4573
    for idx, dev in enumerate(instance.disks):
4574
      size = dev.size
4575
      info("adding new local storage on %s for disk/%d" %
4576
           (new_node, idx))
4577
      # since we *always* want to create this LV, we use the
4578
      # _Create...OnPrimary (which forces the creation), even if we
4579
      # are talking about the secondary node
4580
      for new_lv in dev.children:
4581
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4582
                                        _GetInstanceInfoText(instance)):
4583
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4584
                                   " node '%s'" %
4585
                                   (new_lv.logical_id[1], new_node))
4586

    
4587
    # Step 4: dbrd minors and drbd setups changes
4588
    # after this, we must manually remove the drbd minors on both the
4589
    # error and the success paths
4590
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4591
                                   instance.name)
4592
    logging.debug("Allocated minors %s" % (minors,))
4593
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4594
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4595
      size = dev.size
4596
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4597
      # create new devices on new_node
4598
      if pri_node == dev.logical_id[0]:
4599
        new_logical_id = (pri_node, new_node,
4600
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4601
                          dev.logical_id[5])
4602
      else:
4603
        new_logical_id = (new_node, pri_node,
4604
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4605
                          dev.logical_id[5])
4606
      iv_names[idx] = (dev, dev.children, new_logical_id)
4607
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4608
                    new_logical_id)
4609
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4610
                              logical_id=new_logical_id,
4611
                              children=dev.children)
4612
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4613
                                        new_drbd, False,
4614
                                        _GetInstanceInfoText(instance)):
4615
        self.cfg.ReleaseDRBDMinors(instance.name)
4616
        raise errors.OpExecError("Failed to create new DRBD on"
4617
                                 " node '%s'" % new_node)
4618

    
4619
    for idx, dev in enumerate(instance.disks):
4620
      # we have new devices, shutdown the drbd on the old secondary
4621
      info("shutting down drbd for disk/%d on old node" % idx)
4622
      cfg.SetDiskID(dev, old_node)
4623
      result = self.rpc.call_blockdev_shutdown(old_node, dev)
4624
      if result.failed or not result.data:
4625
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4626
                hint="Please cleanup this device manually as soon as possible")
4627

    
4628
    info("detaching primary drbds from the network (=> standalone)")
4629
    done = 0
4630
    for idx, dev in enumerate(instance.disks):
4631
      cfg.SetDiskID(dev, pri_node)
4632
      # set the network part of the physical (unique in bdev terms) id
4633
      # to None, meaning detach from network
4634
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4635
      # and 'find' the device, which will 'fix' it to match the
4636
      # standalone state
4637
      result = self.rpc.call_blockdev_find(pri_node, dev)
4638
      if not result.failed and result.data:
4639
        done += 1
4640
      else:
4641
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4642
                idx)
4643

    
4644
    if not done:
4645
      # no detaches succeeded (very unlikely)
4646
      self.cfg.ReleaseDRBDMinors(instance.name)
4647
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4648

    
4649
    # if we managed to detach at least one, we update all the disks of
4650
    # the instance to point to the new secondary
4651
    info("updating instance configuration")
4652
    for dev, _, new_logical_id in iv_names.itervalues():
4653
      dev.logical_id = new_logical_id
4654
      cfg.SetDiskID(dev, pri_node)
4655
    cfg.Update(instance)
4656
    # we can remove now the temp minors as now the new values are
4657
    # written to the config file (and therefore stable)
4658
    self.cfg.ReleaseDRBDMinors(instance.name)
4659

    
4660
    # and now perform the drbd attach
4661
    info("attaching primary drbds to new secondary (standalone => connected)")
4662
    failures = []
4663
    for idx, dev in enumerate(instance.disks):
4664
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4665
      # since the attach is smart, it's enough to 'find' the device,
4666
      # it will automatically activate the network, if the physical_id
4667
      # is correct
4668
      cfg.SetDiskID(dev, pri_node)
4669
      logging.debug("Disk to attach: %s", dev)
4670
      result = self.rpc.call_blockdev_find(pri_node, dev)
4671
      if result.failed or not result.data:
4672
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4673
                "please do a gnt-instance info to see the status of disks")
4674

    
4675
    # this can fail as the old devices are degraded and _WaitForSync
4676
    # does a combined result over all disks, so we don't check its
4677
    # return value
4678
    self.proc.LogStep(5, steps_total, "sync devices")
4679
    _WaitForSync(self, instance, unlock=True)
4680

    
4681
    # so check manually all the devices
4682
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4683
      cfg.SetDiskID(dev, pri_node)
4684
      result = self.rpc.call_blockdev_find(pri_node, dev)
4685
      result.Raise()
4686
      if result.data[5]:
4687
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4688

    
4689
    self.proc.LogStep(6, steps_total, "removing old storage")
4690
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4691
      info("remove logical volumes for disk/%d" % idx)
4692
      for lv in old_lvs:
4693
        cfg.SetDiskID(lv, old_node)
4694
        result = self.rpc.call_blockdev_remove(old_node, lv)
4695
        if result.failed or not result.data:
4696
          warning("Can't remove LV on old secondary",
4697
                  hint="Cleanup stale volumes by hand")
4698

    
4699
  def Exec(self, feedback_fn):
4700
    """Execute disk replacement.
4701

4702
    This dispatches the disk replacement to the appropriate handler.
4703

4704
    """
4705
    instance = self.instance
4706

    
4707
    # Activate the instance disks if we're replacing them on a down instance
4708
    if instance.status == "down":
4709
      _StartInstanceDisks(self, instance, True)
4710

    
4711
    if instance.disk_template == constants.DT_DRBD8:
4712
      if self.op.remote_node is None:
4713
        fn = self._ExecD8DiskOnly
4714
      else:
4715
        fn = self._ExecD8Secondary
4716
    else:
4717
      raise errors.ProgrammerError("Unhandled disk replacement case")
4718

    
4719
    ret = fn(feedback_fn)
4720

    
4721
    # Deactivate the instance disks if we're replacing them on a down instance
4722
    if instance.status == "down":
4723
      _SafeShutdownInstanceDisks(self, instance)
4724

    
4725
    return ret
4726

    
4727

    
4728
class LUGrowDisk(LogicalUnit):
4729
  """Grow a disk of an instance.
4730

4731
  """
4732
  HPATH = "disk-grow"
4733
  HTYPE = constants.HTYPE_INSTANCE
4734
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4735
  REQ_BGL = False
4736

    
4737
  def ExpandNames(self):
4738
    self._ExpandAndLockInstance()
4739
    self.needed_locks[locking.LEVEL_NODE] = []
4740
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4741

    
4742
  def DeclareLocks(self, level):
4743
    if level == locking.LEVEL_NODE:
4744
      self._LockInstancesNodes()
4745

    
4746
  def BuildHooksEnv(self):
4747
    """Build hooks env.
4748

4749
    This runs on the master, the primary and all the secondaries.
4750

4751
    """
4752
    env = {
4753
      "DISK": self.op.disk,
4754
      "AMOUNT": self.op.amount,
4755
      }
4756
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4757
    nl = [
4758
      self.cfg.GetMasterNode(),
4759
      self.instance.primary_node,
4760
      ]
4761
    return env, nl, nl
4762

    
4763
  def CheckPrereq(self):
4764
    """Check prerequisites.
4765

4766
    This checks that the instance is in the cluster.
4767

4768
    """
4769
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4770
    assert instance is not None, \
4771
      "Cannot retrieve locked instance %s" % self.op.instance_name
4772

    
4773
    self.instance = instance
4774

    
4775
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4776
      raise errors.OpPrereqError("Instance's disk layout does not support"
4777
                                 " growing.")
4778

    
4779
    self.disk = instance.FindDisk(self.op.disk)
4780

    
4781
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4782
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4783
                                       instance.hypervisor)
4784
    for node in nodenames:
4785
      info = nodeinfo[node]
4786
      if info.failed or not info.data:
4787
        raise errors.OpPrereqError("Cannot get current information"
4788
                                   " from node '%s'" % node)
4789
      vg_free = info.data.get('vg_free', None)
4790
      if not isinstance(vg_free, int):
4791
        raise errors.OpPrereqError("Can't compute free disk space on"
4792
                                   " node %s" % node)
4793
      if self.op.amount > vg_free:
4794
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4795
                                   " %d MiB available, %d MiB required" %
4796
                                   (node, vg_free, self.op.amount))
4797

    
4798
  def Exec(self, feedback_fn):
4799
    """Execute disk grow.
4800

4801
    """
4802
    instance = self.instance
4803
    disk = self.disk
4804
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4805
      self.cfg.SetDiskID(disk, node)
4806
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4807
      result.Raise()
4808
      if (not result.data or not isinstance(result.data, (list, tuple)) or
4809
          len(result.data) != 2):
4810
        raise errors.OpExecError("Grow request failed to node %s" % node)
4811
      elif not result.data[0]:
4812
        raise errors.OpExecError("Grow request failed to node %s: %s" %
4813
                                 (node, result.data[1]))
4814
    disk.RecordGrow(self.op.amount)
4815
    self.cfg.Update(instance)
4816
    if self.op.wait_for_sync:
4817
      disk_abort = not _WaitForSync(self, instance)
4818
      if disk_abort:
4819
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4820
                             " status.\nPlease check the instance.")
4821

    
4822

    
4823
class LUQueryInstanceData(NoHooksLU):
4824
  """Query runtime instance data.
4825

4826
  """
4827
  _OP_REQP = ["instances", "static"]
4828
  REQ_BGL = False
4829

    
4830
  def ExpandNames(self):
4831
    self.needed_locks = {}
4832
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4833

    
4834
    if not isinstance(self.op.instances, list):
4835
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4836

    
4837
    if self.op.instances:
4838
      self.wanted_names = []
4839
      for name in self.op.instances:
4840
        full_name = self.cfg.ExpandInstanceName(name)
4841
        if full_name is None:
4842
          raise errors.OpPrereqError("Instance '%s' not known" %
4843
                                     self.op.instance_name)
4844
        self.wanted_names.append(full_name)
4845
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4846
    else:
4847
      self.wanted_names = None
4848
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4849

    
4850
    self.needed_locks[locking.LEVEL_NODE] = []
4851
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4852

    
4853
  def DeclareLocks(self, level):
4854
    if level == locking.LEVEL_NODE:
4855
      self._LockInstancesNodes()
4856

    
4857
  def CheckPrereq(self):
4858
    """Check prerequisites.
4859

4860
    This only checks the optional instance list against the existing names.
4861

4862
    """
4863
    if self.wanted_names is None:
4864
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4865

    
4866
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4867
                             in self.wanted_names]
4868
    return
4869

    
4870
  def _ComputeDiskStatus(self, instance, snode, dev):
4871
    """Compute block device status.
4872

4873
    """
4874
    static = self.op.static
4875
    if not static:
4876
      self.cfg.SetDiskID(dev, instance.primary_node)
4877
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4878
      dev_pstatus.Raise()
4879
      dev_pstatus = dev_pstatus.data
4880
    else:
4881
      dev_pstatus = None
4882

    
4883
    if dev.dev_type in constants.LDS_DRBD:
4884
      # we change the snode then (otherwise we use the one passed in)
4885
      if dev.logical_id[0] == instance.primary_node:
4886
        snode = dev.logical_id[1]
4887
      else:
4888
        snode = dev.logical_id[0]
4889

    
4890
    if snode and not static:
4891
      self.cfg.SetDiskID(dev, snode)
4892
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4893
      dev_sstatus.Raise()
4894
      dev_sstatus = dev_sstatus.data
4895
    else:
4896
      dev_sstatus = None
4897

    
4898
    if dev.children:
4899
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4900
                      for child in dev.children]
4901
    else:
4902
      dev_children = []
4903

    
4904
    data = {
4905
      "iv_name": dev.iv_name,
4906
      "dev_type": dev.dev_type,
4907
      "logical_id": dev.logical_id,
4908
      "physical_id": dev.physical_id,
4909
      "pstatus": dev_pstatus,
4910
      "sstatus": dev_sstatus,
4911
      "children": dev_children,
4912
      "mode": dev.mode,
4913
      }
4914

    
4915
    return data
4916

    
4917
  def Exec(self, feedback_fn):
4918
    """Gather and return data"""
4919
    result = {}
4920

    
4921
    cluster = self.cfg.GetClusterInfo()
4922

    
4923
    for instance in self.wanted_instances:
4924
      if not self.op.static:
4925
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4926
                                                  instance.name,
4927
                                                  instance.hypervisor)
4928
        remote_info.Raise()
4929
        remote_info = remote_info.data
4930
        if remote_info and "state" in remote_info:
4931
          remote_state = "up"
4932
        else:
4933
          remote_state = "down"
4934
      else:
4935
        remote_state = None
4936
      if instance.status == "down":
4937
        config_state = "down"
4938
      else:
4939
        config_state = "up"
4940

    
4941
      disks = [self._ComputeDiskStatus(instance, None, device)
4942
               for device in instance.disks]
4943

    
4944
      idict = {
4945
        "name": instance.name,
4946
        "config_state": config_state,
4947
        "run_state": remote_state,
4948
        "pnode": instance.primary_node,
4949
        "snodes": instance.secondary_nodes,
4950
        "os": instance.os,
4951
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4952
        "disks": disks,
4953
        "hypervisor": instance.hypervisor,
4954
        "network_port": instance.network_port,
4955
        "hv_instance": instance.hvparams,
4956
        "hv_actual": cluster.FillHV(instance),
4957
        "be_instance": instance.beparams,
4958
        "be_actual": cluster.FillBE(instance),
4959
        }
4960

    
4961
      result[instance.name] = idict
4962

    
4963
    return result
4964

    
4965

    
4966
class LUSetInstanceParams(LogicalUnit):
4967
  """Modifies an instances's parameters.
4968

4969
  """
4970
  HPATH = "instance-modify"
4971
  HTYPE = constants.HTYPE_INSTANCE
4972
  _OP_REQP = ["instance_name"]
4973
  REQ_BGL = False
4974

    
4975
  def CheckArguments(self):
4976
    if not hasattr(self.op, 'nics'):
4977
      self.op.nics = []
4978
    if not hasattr(self.op, 'disks'):
4979
      self.op.disks = []
4980
    if not hasattr(self.op, 'beparams'):
4981
      self.op.beparams = {}
4982
    if not hasattr(self.op, 'hvparams'):
4983
      self.op.hvparams = {}
4984
    self.op.force = getattr(self.op, "force", False)
4985
    if not (self.op.nics or self.op.disks or
4986
            self.op.hvparams or self.op.beparams):
4987
      raise errors.OpPrereqError("No changes submitted")
4988

    
4989
    utils.CheckBEParams(self.op.beparams)
4990

    
4991
    # Disk validation
4992
    disk_addremove = 0
4993
    for disk_op, disk_dict in self.op.disks:
4994
      if disk_op == constants.DDM_REMOVE:
4995
        disk_addremove += 1
4996
        continue
4997
      elif disk_op == constants.DDM_ADD:
4998
        disk_addremove += 1
4999
      else:
5000
        if not isinstance(disk_op, int):
5001
          raise errors.OpPrereqError("Invalid disk index")
5002
      if disk_op == constants.DDM_ADD:
5003
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5004
        if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5005
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5006
        size = disk_dict.get('size', None)
5007
        if size is None:
5008
          raise errors.OpPrereqError("Required disk parameter size missing")
5009
        try:
5010
          size = int(size)
5011
        except ValueError, err:
5012
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5013
                                     str(err))
5014
        disk_dict['size'] = size
5015
      else:
5016
        # modification of disk
5017
        if 'size' in disk_dict:
5018
          raise errors.OpPrereqError("Disk size change not possible, use"
5019
                                     " grow-disk")
5020

    
5021
    if disk_addremove > 1:
5022
      raise errors.OpPrereqError("Only one disk add or remove operation"
5023
                                 " supported at a time")
5024

    
5025
    # NIC validation
5026
    nic_addremove = 0
5027
    for nic_op, nic_dict in self.op.nics:
5028
      if nic_op == constants.DDM_REMOVE:
5029
        nic_addremove += 1
5030
        continue
5031
      elif nic_op == constants.DDM_ADD:
5032
        nic_addremove += 1
5033
      else:
5034
        if not isinstance(nic_op, int):
5035
          raise errors.OpPrereqError("Invalid nic index")
5036

    
5037
      # nic_dict should be a dict
5038
      nic_ip = nic_dict.get('ip', None)
5039
      if nic_ip is not None:
5040
        if nic_ip.lower() == "none":
5041
          nic_dict['ip'] = None
5042
        else:
5043
          if not utils.IsValidIP(nic_ip):
5044
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5045
      # we can only check None bridges and assign the default one
5046
      nic_bridge = nic_dict.get('bridge', None)
5047
      if nic_bridge is None:
5048
        nic_dict['bridge'] = self.cfg.GetDefBridge()
5049
      # but we can validate MACs
5050
      nic_mac = nic_dict.get('mac', None)
5051
      if nic_mac is not None:
5052
        if self.cfg.IsMacInUse(nic_mac):
5053
          raise errors.OpPrereqError("MAC address %s already in use"
5054
                                     " in cluster" % nic_mac)
5055
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5056
          if not utils.IsValidMac(nic_mac):
5057
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5058
    if nic_addremove > 1:
5059
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5060
                                 " supported at a time")
5061

    
5062
  def ExpandNames(self):
5063
    self._ExpandAndLockInstance()
5064
    self.needed_locks[locking.LEVEL_NODE] = []
5065
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5066

    
5067
  def DeclareLocks(self, level):
5068
    if level == locking.LEVEL_NODE:
5069
      self._LockInstancesNodes()
5070

    
5071
  def BuildHooksEnv(self):
5072
    """Build hooks env.
5073

5074
    This runs on the master, primary and secondaries.
5075

5076
    """
5077
    args = dict()
5078
    if constants.BE_MEMORY in self.be_new:
5079
      args['memory'] = self.be_new[constants.BE_MEMORY]
5080
    if constants.BE_VCPUS in self.be_new:
5081
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5082
    # FIXME: readd disk/nic changes
5083
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5084
    nl = [self.cfg.GetMasterNode(),
5085
          self.instance.primary_node] + list(self.instance.secondary_nodes)
5086
    return env, nl, nl
5087

    
5088
  def CheckPrereq(self):
5089
    """Check prerequisites.
5090

5091
    This only checks the instance list against the existing names.
5092

5093
    """
5094
    force = self.force = self.op.force
5095

    
5096
    # checking the new params on the primary/secondary nodes
5097

    
5098
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5099
    assert self.instance is not None, \
5100
      "Cannot retrieve locked instance %s" % self.op.instance_name
5101
    pnode = self.instance.primary_node
5102
    nodelist = [pnode]
5103
    nodelist.extend(instance.secondary_nodes)
5104

    
5105
    # hvparams processing
5106
    if self.op.hvparams:
5107
      i_hvdict = copy.deepcopy(instance.hvparams)
5108
      for key, val in self.op.hvparams.iteritems():
5109
        if val == constants.VALUE_DEFAULT:
5110
          try:
5111
            del i_hvdict[key]
5112
          except KeyError:
5113
            pass
5114
        elif val == constants.VALUE_NONE:
5115
          i_hvdict[key] = None
5116
        else:
5117
          i_hvdict[key] = val
5118
      cluster = self.cfg.GetClusterInfo()
5119
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5120
                                i_hvdict)
5121
      # local check
5122
      hypervisor.GetHypervisor(
5123
        instance.hypervisor).CheckParameterSyntax(hv_new)
5124
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5125
      self.hv_new = hv_new # the new actual values
5126
      self.hv_inst = i_hvdict # the new dict (without defaults)
5127
    else:
5128
      self.hv_new = self.hv_inst = {}
5129

    
5130
    # beparams processing
5131
    if self.op.beparams:
5132
      i_bedict = copy.deepcopy(instance.beparams)
5133
      for key, val in self.op.beparams.iteritems():
5134
        if val == constants.VALUE_DEFAULT:
5135
          try:
5136
            del i_bedict[key]
5137
          except KeyError:
5138
            pass
5139
        else:
5140
          i_bedict[key] = val
5141
      cluster = self.cfg.GetClusterInfo()
5142
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5143
                                i_bedict)
5144
      self.be_new = be_new # the new actual values
5145
      self.be_inst = i_bedict # the new dict (without defaults)
5146
    else:
5147
      self.be_new = self.be_inst = {}
5148

    
5149
    self.warn = []
5150

    
5151
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5152
      mem_check_list = [pnode]
5153
      if be_new[constants.BE_AUTO_BALANCE]:
5154
        # either we changed auto_balance to yes or it was from before
5155
        mem_check_list.extend(instance.secondary_nodes)
5156
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5157
                                                  instance.hypervisor)
5158
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5159
                                         instance.hypervisor)
5160
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5161
        # Assume the primary node is unreachable and go ahead
5162
        self.warn.append("Can't get info from primary node %s" % pnode)
5163
      else:
5164
        if not instance_info.failed and instance_info.data:
5165
          current_mem = instance_info.data['memory']
5166
        else:
5167
          # Assume instance not running
5168
          # (there is a slight race condition here, but it's not very probable,
5169
          # and we have no other way to check)
5170
          current_mem = 0
5171
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5172
                    nodeinfo[pnode].data['memory_free'])
5173
        if miss_mem > 0:
5174
          raise errors.OpPrereqError("This change will prevent the instance"
5175
                                     " from starting, due to %d MB of memory"
5176
                                     " missing on its primary node" % miss_mem)
5177

    
5178
      if be_new[constants.BE_AUTO_BALANCE]:
5179
        for node, nres in instance.secondary_nodes.iteritems():
5180
          if nres.failed or not isinstance(nres.data, dict):
5181
            self.warn.append("Can't get info from secondary node %s" % node)
5182
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5183
            self.warn.append("Not enough memory to failover instance to"
5184
                             " secondary node %s" % node)
5185

    
5186
    # NIC processing
5187
    for nic_op, nic_dict in self.op.nics:
5188
      if nic_op == constants.DDM_REMOVE:
5189
        if not instance.nics:
5190
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5191
        continue
5192
      if nic_op != constants.DDM_ADD:
5193
        # an existing nic
5194
        if nic_op < 0 or nic_op >= len(instance.nics):
5195
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5196
                                     " are 0 to %d" %
5197
                                     (nic_op, len(instance.nics)))
5198
      nic_bridge = nic_dict.get('bridge', None)
5199
      if nic_bridge is not None:
5200
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5201
          msg = ("Bridge '%s' doesn't exist on one of"
5202
                 " the instance nodes" % nic_bridge)
5203
          if self.force:
5204
            self.warn.append(msg)
5205
          else:
5206
            raise errors.OpPrereqError(msg)
5207

    
5208
    # DISK processing
5209
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5210
      raise errors.OpPrereqError("Disk operations not supported for"
5211
                                 " diskless instances")
5212
    for disk_op, disk_dict in self.op.disks:
5213
      if disk_op == constants.DDM_REMOVE:
5214
        if len(instance.disks) == 1:
5215
          raise errors.OpPrereqError("Cannot remove the last disk of"
5216
                                     " an instance")
5217
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5218
        ins_l = ins_l[pnode]
5219
        if not type(ins_l) is list:
5220
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5221
        if instance.name in ins_l:
5222
          raise errors.OpPrereqError("Instance is running, can't remove"
5223
                                     " disks.")
5224

    
5225
      if (disk_op == constants.DDM_ADD and
5226
          len(instance.nics) >= constants.MAX_DISKS):
5227
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5228
                                   " add more" % constants.MAX_DISKS)
5229
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5230
        # an existing disk
5231
        if disk_op < 0 or disk_op >= len(instance.disks):
5232
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5233
                                     " are 0 to %d" %
5234
                                     (disk_op, len(instance.disks)))
5235

    
5236
    return
5237

    
5238
  def Exec(self, feedback_fn):
5239
    """Modifies an instance.
5240

5241
    All parameters take effect only at the next restart of the instance.
5242

5243
    """
5244
    # Process here the warnings from CheckPrereq, as we don't have a
5245
    # feedback_fn there.
5246
    for warn in self.warn:
5247
      feedback_fn("WARNING: %s" % warn)
5248

    
5249
    result = []
5250
    instance = self.instance
5251
    # disk changes
5252
    for disk_op, disk_dict in self.op.disks:
5253
      if disk_op == constants.DDM_REMOVE:
5254
        # remove the last disk
5255
        device = instance.disks.pop()
5256
        device_idx = len(instance.disks)
5257
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5258
          self.cfg.SetDiskID(disk, node)
5259
          result = self.rpc.call_blockdev_remove(node, disk)
5260
          if result.failed or not result.data:
5261
            self.proc.LogWarning("Could not remove disk/%d on node %s,"
5262
                                 " continuing anyway", device_idx, node)
5263
        result.append(("disk/%d" % device_idx, "remove"))
5264
      elif disk_op == constants.DDM_ADD:
5265
        # add a new disk
5266
        if instance.disk_template == constants.DT_FILE:
5267
          file_driver, file_path = instance.disks[0].logical_id
5268
          file_path = os.path.dirname(file_path)
5269
        else:
5270
          file_driver = file_path = None
5271
        disk_idx_base = len(instance.disks)
5272
        new_disk = _GenerateDiskTemplate(self,
5273
                                         instance.disk_template,
5274
                                         instance, instance.primary_node,
5275
                                         instance.secondary_nodes,
5276
                                         [disk_dict],
5277
                                         file_path,
5278
                                         file_driver,
5279
                                         disk_idx_base)[0]
5280
        new_disk.mode = disk_dict['mode']
5281
        instance.disks.append(new_disk)
5282
        info = _GetInstanceInfoText(instance)
5283

    
5284
        logging.info("Creating volume %s for instance %s",
5285
                     new_disk.iv_name, instance.name)
5286
        # Note: this needs to be kept in sync with _CreateDisks
5287
        #HARDCODE
5288
        for secondary_node in instance.secondary_nodes:
5289
          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5290
                                            new_disk, False, info):
5291
            self.LogWarning("Failed to create volume %s (%s) on"
5292
                            " secondary node %s!",
5293
                            new_disk.iv_name, new_disk, secondary_node)
5294
        #HARDCODE
5295
        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5296
                                        instance, new_disk, info):
5297
          self.LogWarning("Failed to create volume %s on primary!",
5298
                          new_disk.iv_name)
5299
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5300
                       (new_disk.size, new_disk.mode)))
5301
      else:
5302
        # change a given disk
5303
        instance.disks[disk_op].mode = disk_dict['mode']
5304
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5305
    # NIC changes
5306
    for nic_op, nic_dict in self.op.nics:
5307
      if nic_op == constants.DDM_REMOVE:
5308
        # remove the last nic
5309
        del instance.nics[-1]
5310
        result.append(("nic.%d" % len(instance.nics), "remove"))
5311
      elif nic_op == constants.DDM_ADD:
5312
        # add a new nic
5313
        if 'mac' not in nic_dict:
5314
          mac = constants.VALUE_GENERATE
5315
        else:
5316
          mac = nic_dict['mac']
5317
        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5318
          mac = self.cfg.GenerateMAC()
5319
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5320
                              bridge=nic_dict.get('bridge', None))
5321
        instance.nics.append(new_nic)
5322
        result.append(("nic.%d" % (len(instance.nics) - 1),
5323
                       "add:mac=%s,ip=%s,bridge=%s" %
5324
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
5325
      else:
5326
        # change a given nic
5327
        for key in 'mac', 'ip', 'bridge':
5328
          if key in nic_dict:
5329
            setattr(instance.nics[nic_op], key, nic_dict[key])
5330
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5331

    
5332
    # hvparams changes
5333
    if self.op.hvparams:
5334
      instance.hvparams = self.hv_new
5335
      for key, val in self.op.hvparams.iteritems():
5336
        result.append(("hv/%s" % key, val))
5337

    
5338
    # beparams changes
5339
    if self.op.beparams:
5340
      instance.beparams = self.be_inst
5341
      for key, val in self.op.beparams.iteritems():
5342
        result.append(("be/%s" % key, val))
5343

    
5344
    self.cfg.Update(instance)
5345

    
5346
    return result
5347

    
5348

    
5349
class LUQueryExports(NoHooksLU):
5350
  """Query the exports list
5351

5352
  """
5353
  _OP_REQP = ['nodes']
5354
  REQ_BGL = False
5355

    
5356
  def ExpandNames(self):
5357
    self.needed_locks = {}
5358
    self.share_locks[locking.LEVEL_NODE] = 1
5359
    if not self.op.nodes:
5360
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5361
    else:
5362
      self.needed_locks[locking.LEVEL_NODE] = \
5363
        _GetWantedNodes(self, self.op.nodes)
5364

    
5365
  def CheckPrereq(self):
5366
    """Check prerequisites.
5367

5368
    """
5369
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5370

    
5371
  def Exec(self, feedback_fn):
5372
    """Compute the list of all the exported system images.
5373

5374
    @rtype: dict
5375
    @return: a dictionary with the structure node->(export-list)
5376
        where export-list is a list of the instances exported on
5377
        that node.
5378

5379
    """
5380
    rpcresult = self.rpc.call_export_list(self.nodes)
5381
    result = {}
5382
    for node in rpcresult:
5383
      if rpcresult[node].failed:
5384
        result[node] = False
5385
      else:
5386
        result[node] = rpcresult[node].data
5387

    
5388
    return result
5389

    
5390

    
5391
class LUExportInstance(LogicalUnit):
5392
  """Export an instance to an image in the cluster.
5393

5394
  """
5395
  HPATH = "instance-export"
5396
  HTYPE = constants.HTYPE_INSTANCE
5397
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
5398
  REQ_BGL = False
5399

    
5400
  def ExpandNames(self):
5401
    self._ExpandAndLockInstance()
5402
    # FIXME: lock only instance primary and destination node
5403
    #
5404
    # Sad but true, for now we have do lock all nodes, as we don't know where
5405
    # the previous export might be, and and in this LU we search for it and
5406
    # remove it from its current node. In the future we could fix this by:
5407
    #  - making a tasklet to search (share-lock all), then create the new one,
5408
    #    then one to remove, after
5409
    #  - removing the removal operation altoghether
5410
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5411

    
5412
  def DeclareLocks(self, level):
5413
    """Last minute lock declaration."""
5414
    # All nodes are locked anyway, so nothing to do here.
5415

    
5416
  def BuildHooksEnv(self):
5417
    """Build hooks env.
5418

5419
    This will run on the master, primary node and target node.
5420

5421
    """
5422
    env = {
5423
      "EXPORT_NODE": self.op.target_node,
5424
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5425
      }
5426
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5427
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5428
          self.op.target_node]
5429
    return env, nl, nl
5430

    
5431
  def CheckPrereq(self):
5432
    """Check prerequisites.
5433

5434
    This checks that the instance and node names are valid.
5435

5436
    """
5437
    instance_name = self.op.instance_name
5438
    self.instance = self.cfg.GetInstanceInfo(instance_name)
5439
    assert self.instance is not None, \
5440
          "Cannot retrieve locked instance %s" % self.op.instance_name
5441

    
5442
    self.dst_node = self.cfg.GetNodeInfo(
5443
      self.cfg.ExpandNodeName(self.op.target_node))
5444

    
5445
    if self.dst_node is None:
5446
      # This is wrong node name, not a non-locked node
5447
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5448

    
5449
    # instance disk type verification
5450
    for disk in self.instance.disks:
5451
      if disk.dev_type == constants.LD_FILE:
5452
        raise errors.OpPrereqError("Export not supported for instances with"
5453
                                   " file-based disks")
5454

    
5455
  def Exec(self, feedback_fn):
5456
    """Export an instance to an image in the cluster.
5457

5458
    """
5459
    instance = self.instance
5460
    dst_node = self.dst_node
5461
    src_node = instance.primary_node
5462
    if self.op.shutdown:
5463
      # shutdown the instance, but not the disks
5464
      result = self.rpc.call_instance_shutdown(src_node, instance)
5465
      result.Raise()
5466
      if not result.data:
5467
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5468
                                 (instance.name, src_node))
5469

    
5470
    vgname = self.cfg.GetVGName()
5471

    
5472
    snap_disks = []
5473

    
5474
    try:
5475
      for disk in instance.disks:
5476
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5477
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5478
        if new_dev_name.failed or not new_dev_name.data:
5479
          self.LogWarning("Could not snapshot block device %s on node %s",
5480
                          disk.logical_id[1], src_node)
5481
          snap_disks.append(False)
5482
        else:
5483
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5484
                                 logical_id=(vgname, new_dev_name.data),
5485
                                 physical_id=(vgname, new_dev_name.data),
5486
                                 iv_name=disk.iv_name)
5487
          snap_disks.append(new_dev)
5488

    
5489
    finally:
5490
      if self.op.shutdown and instance.status == "up":
5491
        result = self.rpc.call_instance_start(src_node, instance, None)
5492
        if result.failed or not result.data:
5493
          _ShutdownInstanceDisks(self, instance)
5494
          raise errors.OpExecError("Could not start instance")
5495

    
5496
    # TODO: check for size
5497

    
5498
    cluster_name = self.cfg.GetClusterName()
5499
    for idx, dev in enumerate(snap_disks):
5500
      if dev:
5501
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5502
                                               instance, cluster_name, idx)
5503
        if result.failed or not result.data:
5504
          self.LogWarning("Could not export block device %s from node %s to"
5505
                          " node %s", dev.logical_id[1], src_node,
5506
                          dst_node.name)
5507
        result = self.rpc.call_blockdev_remove(src_node, dev)
5508
        if result.failed or not result.data:
5509
          self.LogWarning("Could not remove snapshot block device %s from node"
5510
                          " %s", dev.logical_id[1], src_node)
5511

    
5512
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5513
    if result.failed or not result.data:
5514
      self.LogWarning("Could not finalize export for instance %s on node %s",
5515
                      instance.name, dst_node.name)
5516

    
5517
    nodelist = self.cfg.GetNodeList()
5518
    nodelist.remove(dst_node.name)
5519

    
5520
    # on one-node clusters nodelist will be empty after the removal
5521
    # if we proceed the backup would be removed because OpQueryExports
5522
    # substitutes an empty list with the full cluster node list.
5523
    if nodelist:
5524
      exportlist = self.rpc.call_export_list(nodelist)
5525
      for node in exportlist:
5526
        if exportlist[node].failed:
5527
          continue
5528
        if instance.name in exportlist[node].data:
5529
          if not self.rpc.call_export_remove(node, instance.name):
5530
            self.LogWarning("Could not remove older export for instance %s"
5531
                            " on node %s", instance.name, node)
5532

    
5533

    
5534
class LURemoveExport(NoHooksLU):
5535
  """Remove exports related to the named instance.
5536

5537
  """
5538
  _OP_REQP = ["instance_name"]
5539
  REQ_BGL = False
5540

    
5541
  def ExpandNames(self):
5542
    self.needed_locks = {}
5543
    # We need all nodes to be locked in order for RemoveExport to work, but we
5544
    # don't need to lock the instance itself, as nothing will happen to it (and
5545
    # we can remove exports also for a removed instance)
5546
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5547

    
5548
  def CheckPrereq(self):
5549
    """Check prerequisites.
5550
    """
5551
    pass
5552

    
5553
  def Exec(self, feedback_fn):
5554
    """Remove any export.
5555

5556
    """
5557
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5558
    # If the instance was not found we'll try with the name that was passed in.
5559
    # This will only work if it was an FQDN, though.
5560
    fqdn_warn = False
5561
    if not instance_name:
5562
      fqdn_warn = True
5563
      instance_name = self.op.instance_name
5564

    
5565
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5566
      locking.LEVEL_NODE])
5567
    found = False
5568
    for node in exportlist:
5569
      if exportlist[node].failed:
5570
        self.LogWarning("Failed to query node %s, continuing" % node)
5571
        continue
5572
      if instance_name in exportlist[node].data:
5573
        found = True
5574
        result = self.rpc.call_export_remove(node, instance_name)
5575
        if result.failed or not result.data:
5576
          logging.error("Could not remove export for instance %s"
5577
                        " on node %s", instance_name, node)
5578

    
5579
    if fqdn_warn and not found:
5580
      feedback_fn("Export not found. If trying to remove an export belonging"
5581
                  " to a deleted instance please use its Fully Qualified"
5582
                  " Domain Name.")
5583

    
5584

    
5585
class TagsLU(NoHooksLU):
5586
  """Generic tags LU.
5587

5588
  This is an abstract class which is the parent of all the other tags LUs.
5589

5590
  """
5591

    
5592
  def ExpandNames(self):
5593
    self.needed_locks = {}
5594
    if self.op.kind == constants.TAG_NODE:
5595
      name = self.cfg.ExpandNodeName(self.op.name)
5596
      if name is None:
5597
        raise errors.OpPrereqError("Invalid node name (%s)" %
5598
                                   (self.op.name,))
5599
      self.op.name = name
5600
      self.needed_locks[locking.LEVEL_NODE] = name
5601
    elif self.op.kind == constants.TAG_INSTANCE:
5602
      name = self.cfg.ExpandInstanceName(self.op.name)
5603
      if name is None:
5604
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5605
                                   (self.op.name,))
5606
      self.op.name = name
5607
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5608

    
5609
  def CheckPrereq(self):
5610
    """Check prerequisites.
5611

5612
    """
5613
    if self.op.kind == constants.TAG_CLUSTER:
5614
      self.target = self.cfg.GetClusterInfo()
5615
    elif self.op.kind == constants.TAG_NODE:
5616
      self.target = self.cfg.GetNodeInfo(self.op.name)
5617
    elif self.op.kind == constants.TAG_INSTANCE:
5618
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5619
    else:
5620
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5621
                                 str(self.op.kind))
5622

    
5623

    
5624
class LUGetTags(TagsLU):
5625
  """Returns the tags of a given object.
5626

5627
  """
5628
  _OP_REQP = ["kind", "name"]
5629
  REQ_BGL = False
5630

    
5631
  def Exec(self, feedback_fn):
5632
    """Returns the tag list.
5633

5634
    """
5635
    return list(self.target.GetTags())
5636

    
5637

    
5638
class LUSearchTags(NoHooksLU):
5639
  """Searches the tags for a given pattern.
5640

5641
  """
5642
  _OP_REQP = ["pattern"]
5643
  REQ_BGL = False
5644

    
5645
  def ExpandNames(self):
5646
    self.needed_locks = {}
5647

    
5648
  def CheckPrereq(self):
5649
    """Check prerequisites.
5650

5651
    This checks the pattern passed for validity by compiling it.
5652

5653
    """
5654
    try:
5655
      self.re = re.compile(self.op.pattern)
5656
    except re.error, err:
5657
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5658
                                 (self.op.pattern, err))
5659

    
5660
  def Exec(self, feedback_fn):
5661
    """Returns the tag list.
5662

5663
    """
5664
    cfg = self.cfg
5665
    tgts = [("/cluster", cfg.GetClusterInfo())]
5666
    ilist = cfg.GetAllInstancesInfo().values()
5667
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5668
    nlist = cfg.GetAllNodesInfo().values()
5669
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5670
    results = []
5671
    for path, target in tgts:
5672
      for tag in target.GetTags():
5673
        if self.re.search(tag):
5674
          results.append((path, tag))
5675
    return results
5676

    
5677

    
5678
class LUAddTags(TagsLU):
5679
  """Sets a tag on a given object.
5680

5681
  """
5682
  _OP_REQP = ["kind", "name", "tags"]
5683
  REQ_BGL = False
5684

    
5685
  def CheckPrereq(self):
5686
    """Check prerequisites.
5687

5688
    This checks the type and length of the tag name and value.
5689

5690
    """
5691
    TagsLU.CheckPrereq(self)
5692
    for tag in self.op.tags:
5693
      objects.TaggableObject.ValidateTag(tag)
5694

    
5695
  def Exec(self, feedback_fn):
5696
    """Sets the tag.
5697

5698
    """
5699
    try:
5700
      for tag in self.op.tags:
5701
        self.target.AddTag(tag)
5702
    except errors.TagError, err:
5703
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5704
    try:
5705
      self.cfg.Update(self.target)
5706
    except errors.ConfigurationError:
5707
      raise errors.OpRetryError("There has been a modification to the"
5708
                                " config file and the operation has been"
5709
                                " aborted. Please retry.")
5710

    
5711

    
5712
class LUDelTags(TagsLU):
5713
  """Delete a list of tags from a given object.
5714

5715
  """
5716
  _OP_REQP = ["kind", "name", "tags"]
5717
  REQ_BGL = False
5718

    
5719
  def CheckPrereq(self):
5720
    """Check prerequisites.
5721

5722
    This checks that we have the given tag.
5723

5724
    """
5725
    TagsLU.CheckPrereq(self)
5726
    for tag in self.op.tags:
5727
      objects.TaggableObject.ValidateTag(tag)
5728
    del_tags = frozenset(self.op.tags)
5729
    cur_tags = self.target.GetTags()
5730
    if not del_tags <= cur_tags:
5731
      diff_tags = del_tags - cur_tags
5732
      diff_names = ["'%s'" % tag for tag in diff_tags]
5733
      diff_names.sort()
5734
      raise errors.OpPrereqError("Tag(s) %s not found" %
5735
                                 (",".join(diff_names)))
5736

    
5737
  def Exec(self, feedback_fn):
5738
    """Remove the tag from the object.
5739

5740
    """
5741
    for tag in self.op.tags:
5742
      self.target.RemoveTag(tag)
5743
    try:
5744
      self.cfg.Update(self.target)
5745
    except errors.ConfigurationError:
5746
      raise errors.OpRetryError("There has been a modification to the"
5747
                                " config file and the operation has been"
5748
                                " aborted. Please retry.")
5749

    
5750

    
5751
class LUTestDelay(NoHooksLU):
5752
  """Sleep for a specified amount of time.
5753

5754
  This LU sleeps on the master and/or nodes for a specified amount of
5755
  time.
5756

5757
  """
5758
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5759
  REQ_BGL = False
5760

    
5761
  def ExpandNames(self):
5762
    """Expand names and set required locks.
5763

5764
    This expands the node list, if any.
5765

5766
    """
5767
    self.needed_locks = {}
5768
    if self.op.on_nodes:
5769
      # _GetWantedNodes can be used here, but is not always appropriate to use
5770
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5771
      # more information.
5772
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5773
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5774

    
5775
  def CheckPrereq(self):
5776
    """Check prerequisites.
5777

5778
    """
5779

    
5780
  def Exec(self, feedback_fn):
5781
    """Do the actual sleep.
5782

5783
    """
5784
    if self.op.on_master:
5785
      if not utils.TestDelay(self.op.duration):
5786
        raise errors.OpExecError("Error during master delay test")
5787
    if self.op.on_nodes:
5788
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5789
      if not result:
5790
        raise errors.OpExecError("Complete failure from rpc call")
5791
      for node, node_result in result.items():
5792
        node_result.Raise()
5793
        if not node_result.data:
5794
          raise errors.OpExecError("Failure during rpc call to node %s,"
5795
                                   " result: %s" % (node, node_result.data))
5796

    
5797

    
5798
class IAllocator(object):
5799
  """IAllocator framework.
5800

5801
  An IAllocator instance has three sets of attributes:
5802
    - cfg that is needed to query the cluster
5803
    - input data (all members of the _KEYS class attribute are required)
5804
    - four buffer attributes (in|out_data|text), that represent the
5805
      input (to the external script) in text and data structure format,
5806
      and the output from it, again in two formats
5807
    - the result variables from the script (success, info, nodes) for
5808
      easy usage
5809

5810
  """
5811
  _ALLO_KEYS = [
5812
    "mem_size", "disks", "disk_template",
5813
    "os", "tags", "nics", "vcpus", "hypervisor",
5814
    ]
5815
  _RELO_KEYS = [
5816
    "relocate_from",
5817
    ]
5818

    
5819
  def __init__(self, lu, mode, name, **kwargs):
5820
    self.lu = lu
5821
    # init buffer variables
5822
    self.in_text = self.out_text = self.in_data = self.out_data = None
5823
    # init all input fields so that pylint is happy
5824
    self.mode = mode
5825
    self.name = name
5826
    self.mem_size = self.disks = self.disk_template = None
5827
    self.os = self.tags = self.nics = self.vcpus = None
5828
    self.relocate_from = None
5829
    # computed fields
5830
    self.required_nodes = None
5831
    # init result fields
5832
    self.success = self.info = self.nodes = None
5833
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5834
      keyset = self._ALLO_KEYS
5835
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5836
      keyset = self._RELO_KEYS
5837
    else:
5838
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5839
                                   " IAllocator" % self.mode)
5840
    for key in kwargs:
5841
      if key not in keyset:
5842
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5843
                                     " IAllocator" % key)
5844
      setattr(self, key, kwargs[key])
5845
    for key in keyset:
5846
      if key not in kwargs:
5847
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5848
                                     " IAllocator" % key)
5849
    self._BuildInputData()
5850

    
5851
  def _ComputeClusterData(self):
5852
    """Compute the generic allocator input data.
5853

5854
    This is the data that is independent of the actual operation.
5855

5856
    """
5857
    cfg = self.lu.cfg
5858
    cluster_info = cfg.GetClusterInfo()
5859
    # cluster data
5860
    data = {
5861
      "version": 1,
5862
      "cluster_name": cfg.GetClusterName(),
5863
      "cluster_tags": list(cluster_info.GetTags()),
5864
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5865
      # we don't have job IDs
5866
      }
5867
    iinfo = cfg.GetAllInstancesInfo().values()
5868
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5869

    
5870
    # node data
5871
    node_results = {}
5872
    node_list = cfg.GetNodeList()
5873

    
5874
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5875
      hypervisor = self.hypervisor
5876
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5877
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5878

    
5879
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5880
                                           hypervisor)
5881
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5882
                       cluster_info.enabled_hypervisors)
5883
    for nname in node_list:
5884
      ninfo = cfg.GetNodeInfo(nname)
5885
      node_data[nname].Raise()
5886
      if not isinstance(node_data[nname].data, dict):
5887
        raise errors.OpExecError("Can't get data for node %s" % nname)
5888
      remote_info = node_data[nname].data
5889
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5890
                   'vg_size', 'vg_free', 'cpu_total']:
5891
        if attr not in remote_info:
5892
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5893
                                   (nname, attr))
5894
        try:
5895
          remote_info[attr] = int(remote_info[attr])
5896
        except ValueError, err:
5897
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5898
                                   " %s" % (nname, attr, str(err)))
5899
      # compute memory used by primary instances
5900
      i_p_mem = i_p_up_mem = 0
5901
      for iinfo, beinfo in i_list:
5902
        if iinfo.primary_node == nname:
5903
          i_p_mem += beinfo[constants.BE_MEMORY]
5904
          if iinfo.name not in node_iinfo[nname]:
5905
            i_used_mem = 0
5906
          else:
5907
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5908
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5909
          remote_info['memory_free'] -= max(0, i_mem_diff)
5910

    
5911
          if iinfo.status == "up":
5912
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5913

    
5914
      # compute memory used by instances
5915
      pnr = {
5916
        "tags": list(ninfo.GetTags()),
5917
        "total_memory": remote_info['memory_total'],
5918
        "reserved_memory": remote_info['memory_dom0'],
5919
        "free_memory": remote_info['memory_free'],
5920
        "i_pri_memory": i_p_mem,
5921
        "i_pri_up_memory": i_p_up_mem,
5922
        "total_disk": remote_info['vg_size'],
5923
        "free_disk": remote_info['vg_free'],
5924
        "primary_ip": ninfo.primary_ip,
5925
        "secondary_ip": ninfo.secondary_ip,
5926
        "total_cpus": remote_info['cpu_total'],
5927
        "offline": ninfo.offline,
5928
        }
5929
      node_results[nname] = pnr
5930
    data["nodes"] = node_results
5931

    
5932
    # instance data
5933
    instance_data = {}
5934
    for iinfo, beinfo in i_list:
5935
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5936
                  for n in iinfo.nics]
5937
      pir = {
5938
        "tags": list(iinfo.GetTags()),
5939
        "should_run": iinfo.status == "up",
5940
        "vcpus": beinfo[constants.BE_VCPUS],
5941
        "memory": beinfo[constants.BE_MEMORY],
5942
        "os": iinfo.os,
5943
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5944
        "nics": nic_data,
5945
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5946
        "disk_template": iinfo.disk_template,
5947
        "hypervisor": iinfo.hypervisor,
5948
        }
5949
      instance_data[iinfo.name] = pir
5950

    
5951
    data["instances"] = instance_data
5952

    
5953
    self.in_data = data
5954

    
5955
  def _AddNewInstance(self):
5956
    """Add new instance data to allocator structure.
5957

5958
    This in combination with _AllocatorGetClusterData will create the
5959
    correct structure needed as input for the allocator.
5960

5961
    The checks for the completeness of the opcode must have already been
5962
    done.
5963

5964
    """
5965
    data = self.in_data
5966
    if len(self.disks) != 2:
5967
      raise errors.OpExecError("Only two-disk configurations supported")
5968

    
5969
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5970

    
5971
    if self.disk_template in constants.DTS_NET_MIRROR:
5972
      self.required_nodes = 2
5973
    else:
5974
      self.required_nodes = 1
5975
    request = {
5976
      "type": "allocate",
5977
      "name": self.name,
5978
      "disk_template": self.disk_template,
5979
      "tags": self.tags,
5980
      "os": self.os,
5981
      "vcpus": self.vcpus,
5982
      "memory": self.mem_size,
5983
      "disks": self.disks,
5984
      "disk_space_total": disk_space,
5985
      "nics": self.nics,
5986
      "required_nodes": self.required_nodes,
5987
      }
5988
    data["request"] = request
5989

    
5990
  def _AddRelocateInstance(self):
5991
    """Add relocate instance data to allocator structure.
5992

5993
    This in combination with _IAllocatorGetClusterData will create the
5994
    correct structure needed as input for the allocator.
5995

5996
    The checks for the completeness of the opcode must have already been
5997
    done.
5998

5999
    """
6000
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6001
    if instance is None:
6002
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6003
                                   " IAllocator" % self.name)
6004

    
6005
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6006
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6007

    
6008
    if len(instance.secondary_nodes) != 1:
6009
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6010

    
6011
    self.required_nodes = 1
6012
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6013
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6014

    
6015
    request = {
6016
      "type": "relocate",
6017
      "name": self.name,
6018
      "disk_space_total": disk_space,
6019
      "required_nodes": self.required_nodes,
6020
      "relocate_from": self.relocate_from,
6021
      }
6022
    self.in_data["request"] = request
6023

    
6024
  def _BuildInputData(self):
6025
    """Build input data structures.
6026

6027
    """
6028
    self._ComputeClusterData()
6029

    
6030
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6031
      self._AddNewInstance()
6032
    else:
6033
      self._AddRelocateInstance()
6034

    
6035
    self.in_text = serializer.Dump(self.in_data)
6036

    
6037
  def Run(self, name, validate=True, call_fn=None):
6038
    """Run an instance allocator and return the results.
6039

6040
    """
6041
    if call_fn is None:
6042
      call_fn = self.lu.rpc.call_iallocator_runner
6043
    data = self.in_text
6044

    
6045
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6046
    result.Raise()
6047

    
6048
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6049
      raise errors.OpExecError("Invalid result from master iallocator runner")
6050

    
6051
    rcode, stdout, stderr, fail = result.data
6052

    
6053
    if rcode == constants.IARUN_NOTFOUND:
6054
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6055
    elif rcode == constants.IARUN_FAILURE:
6056
      raise errors.OpExecError("Instance allocator call failed: %s,"
6057
                               " output: %s" % (fail, stdout+stderr))
6058
    self.out_text = stdout
6059
    if validate:
6060
      self._ValidateResult()
6061

    
6062
  def _ValidateResult(self):
6063
    """Process the allocator results.
6064

6065
    This will process and if successful save the result in
6066
    self.out_data and the other parameters.
6067

6068
    """
6069
    try:
6070
      rdict = serializer.Load(self.out_text)
6071
    except Exception, err:
6072
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6073

    
6074
    if not isinstance(rdict, dict):
6075
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6076

    
6077
    for key in "success", "info", "nodes":
6078
      if key not in rdict:
6079
        raise errors.OpExecError("Can't parse iallocator results:"
6080
                                 " missing key '%s'" % key)
6081
      setattr(self, key, rdict[key])
6082

    
6083
    if not isinstance(rdict["nodes"], list):
6084
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6085
                               " is not a list")
6086
    self.out_data = rdict
6087

    
6088

    
6089
class LUTestAllocator(NoHooksLU):
6090
  """Run allocator tests.
6091

6092
  This LU runs the allocator tests
6093

6094
  """
6095
  _OP_REQP = ["direction", "mode", "name"]
6096

    
6097
  def CheckPrereq(self):
6098
    """Check prerequisites.
6099

6100
    This checks the opcode parameters depending on the director and mode test.
6101

6102
    """
6103
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6104
      for attr in ["name", "mem_size", "disks", "disk_template",
6105
                   "os", "tags", "nics", "vcpus"]:
6106
        if not hasattr(self.op, attr):
6107
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6108
                                     attr)
6109
      iname = self.cfg.ExpandInstanceName(self.op.name)
6110
      if iname is not None:
6111
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6112
                                   iname)
6113
      if not isinstance(self.op.nics, list):
6114
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6115
      for row in self.op.nics:
6116
        if (not isinstance(row, dict) or
6117
            "mac" not in row or
6118
            "ip" not in row or
6119
            "bridge" not in row):
6120
          raise errors.OpPrereqError("Invalid contents of the"
6121
                                     " 'nics' parameter")
6122
      if not isinstance(self.op.disks, list):
6123
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6124
      if len(self.op.disks) != 2:
6125
        raise errors.OpPrereqError("Only two-disk configurations supported")
6126
      for row in self.op.disks:
6127
        if (not isinstance(row, dict) or
6128
            "size" not in row or
6129
            not isinstance(row["size"], int) or
6130
            "mode" not in row or
6131
            row["mode"] not in ['r', 'w']):
6132
          raise errors.OpPrereqError("Invalid contents of the"
6133
                                     " 'disks' parameter")
6134
      if self.op.hypervisor is None:
6135
        self.op.hypervisor = self.cfg.GetHypervisorType()
6136
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6137
      if not hasattr(self.op, "name"):
6138
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6139
      fname = self.cfg.ExpandInstanceName(self.op.name)
6140
      if fname is None:
6141
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6142
                                   self.op.name)
6143
      self.op.name = fname
6144
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6145
    else:
6146
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6147
                                 self.op.mode)
6148

    
6149
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6150
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6151
        raise errors.OpPrereqError("Missing allocator name")
6152
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6153
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6154
                                 self.op.direction)
6155

    
6156
  def Exec(self, feedback_fn):
6157
    """Run the allocator test.
6158

6159
    """
6160
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6161
      ial = IAllocator(self,
6162
                       mode=self.op.mode,
6163
                       name=self.op.name,
6164
                       mem_size=self.op.mem_size,
6165
                       disks=self.op.disks,
6166
                       disk_template=self.op.disk_template,
6167
                       os=self.op.os,
6168
                       tags=self.op.tags,
6169
                       nics=self.op.nics,
6170
                       vcpus=self.op.vcpus,
6171
                       hypervisor=self.op.hypervisor,
6172
                       )
6173
    else:
6174
      ial = IAllocator(self,
6175
                       mode=self.op.mode,
6176
                       name=self.op.name,
6177
                       relocate_from=list(self.relocate_from),
6178
                       )
6179

    
6180
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6181
      result = ial.in_text
6182
    else:
6183
      ial.Run(self.op.allocator, validate=False)
6184
      result = ial.out_text
6185
    return result