Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 9a4f63d1

History | View | Annotate | Download (189.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_MASTER = True
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90

    
91
    for attr_name in self._OP_REQP:
92
      attr_val = getattr(op, attr_name, None)
93
      if attr_val is None:
94
        raise errors.OpPrereqError("Required parameter '%s' missing" %
95
                                   attr_name)
96

    
97
    if not self.cfg.IsCluster():
98
      raise errors.OpPrereqError("Cluster not initialized yet,"
99
                                 " use 'gnt-cluster init' first.")
100
    if self.REQ_MASTER:
101
      master = self.cfg.GetMasterNode()
102
      if master != utils.HostInfo().name:
103
        raise errors.OpPrereqError("Commands must be run on the master"
104
                                   " node %s" % master)
105

    
106
  def __GetSSH(self):
107
    """Returns the SshRunner object
108

109
    """
110
    if not self.__ssh:
111
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
112
    return self.__ssh
113

    
114
  ssh = property(fget=__GetSSH)
115

    
116
  def ExpandNames(self):
117
    """Expand names for this LU.
118

119
    This method is called before starting to execute the opcode, and it should
120
    update all the parameters of the opcode to their canonical form (e.g. a
121
    short node name must be fully expanded after this method has successfully
122
    completed). This way locking, hooks, logging, ecc. can work correctly.
123

124
    LUs which implement this method must also populate the self.needed_locks
125
    member, as a dict with lock levels as keys, and a list of needed lock names
126
    as values. Rules:
127
      - Use an empty dict if you don't need any lock
128
      - If you don't need any lock at a particular level omit that level
129
      - Don't put anything for the BGL level
130
      - If you want all locks at a level use locking.ALL_SET as a value
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: locking.ALL_SET,
140
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265
  def _LockInstancesNodes(self, primary_only=False):
266
    """Helper function to declare instances' nodes for locking.
267

268
    This function should be called after locking one or more instances to lock
269
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270
    with all primary or secondary nodes for instances already locked and
271
    present in self.needed_locks[locking.LEVEL_INSTANCE].
272

273
    It should be called from DeclareLocks, and for safety only works if
274
    self.recalculate_locks[locking.LEVEL_NODE] is set.
275

276
    In the future it may grow parameters to just lock some instance's nodes, or
277
    to just lock primaries or secondary nodes, if needed.
278

279
    If should be called in DeclareLocks in a way similar to:
280

281
    if level == locking.LEVEL_NODE:
282
      self._LockInstancesNodes()
283

284
    @type primary_only: boolean
285
    @param primary_only: only lock primary nodes of locked instances
286

287
    """
288
    assert locking.LEVEL_NODE in self.recalculate_locks, \
289
      "_LockInstancesNodes helper function called with no nodes to recalculate"
290

    
291
    # TODO: check if we're really been called with the instance locks held
292

    
293
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294
    # future we might want to have different behaviors depending on the value
295
    # of self.recalculate_locks[locking.LEVEL_NODE]
296
    wanted_nodes = []
297
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298
      instance = self.context.cfg.GetInstanceInfo(instance_name)
299
      wanted_nodes.append(instance.primary_node)
300
      if not primary_only:
301
        wanted_nodes.extend(instance.secondary_nodes)
302

    
303
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
307

    
308
    del self.recalculate_locks[locking.LEVEL_NODE]
309

    
310

    
311
class NoHooksLU(LogicalUnit):
312
  """Simple LU which runs no hooks.
313

314
  This LU is intended as a parent for other LogicalUnits which will
315
  run no hooks, in order to reduce duplicate code.
316

317
  """
318
  HPATH = None
319
  HTYPE = None
320

    
321

    
322
def _GetWantedNodes(lu, nodes):
323
  """Returns list of checked and expanded node names.
324

325
  Args:
326
    nodes: List of nodes (strings) or None for all
327

328
  """
329
  if not isinstance(nodes, list):
330
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
331

    
332
  if not nodes:
333
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334
      " non-empty list of nodes whose name is to be expanded.")
335

    
336
  wanted = []
337
  for name in nodes:
338
    node = lu.cfg.ExpandNodeName(name)
339
    if node is None:
340
      raise errors.OpPrereqError("No such node name '%s'" % name)
341
    wanted.append(node)
342

    
343
  return utils.NiceSort(wanted)
344

    
345

    
346
def _GetWantedInstances(lu, instances):
347
  """Returns list of checked and expanded instance names.
348

349
  Args:
350
    instances: List of instances (strings) or None for all
351

352
  """
353
  if not isinstance(instances, list):
354
    raise errors.OpPrereqError("Invalid argument type 'instances'")
355

    
356
  if instances:
357
    wanted = []
358

    
359
    for name in instances:
360
      instance = lu.cfg.ExpandInstanceName(name)
361
      if instance is None:
362
        raise errors.OpPrereqError("No such instance name '%s'" % name)
363
      wanted.append(instance)
364

    
365
  else:
366
    wanted = lu.cfg.GetInstanceList()
367
  return utils.NiceSort(wanted)
368

    
369

    
370
def _CheckOutputFields(static, dynamic, selected):
371
  """Checks whether all selected fields are valid.
372

373
  Args:
374
    static: Static fields
375
    dynamic: Dynamic fields
376

377
  """
378
  static_fields = frozenset(static)
379
  dynamic_fields = frozenset(dynamic)
380

    
381
  all_fields = static_fields | dynamic_fields
382

    
383
  if not all_fields.issuperset(selected):
384
    raise errors.OpPrereqError("Unknown output fields selected: %s"
385
                               % ",".join(frozenset(selected).
386
                                          difference(all_fields)))
387

    
388

    
389
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390
                          memory, vcpus, nics):
391
  """Builds instance related env variables for hooks from single variables.
392

393
  Args:
394
    secondary_nodes: List of secondary nodes as strings
395
  """
396
  env = {
397
    "OP_TARGET": name,
398
    "INSTANCE_NAME": name,
399
    "INSTANCE_PRIMARY": primary_node,
400
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401
    "INSTANCE_OS_TYPE": os_type,
402
    "INSTANCE_STATUS": status,
403
    "INSTANCE_MEMORY": memory,
404
    "INSTANCE_VCPUS": vcpus,
405
  }
406

    
407
  if nics:
408
    nic_count = len(nics)
409
    for idx, (ip, bridge, mac) in enumerate(nics):
410
      if ip is None:
411
        ip = ""
412
      env["INSTANCE_NIC%d_IP" % idx] = ip
413
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
415
  else:
416
    nic_count = 0
417

    
418
  env["INSTANCE_NIC_COUNT"] = nic_count
419

    
420
  return env
421

    
422

    
423
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
424
  """Builds instance related env variables for hooks from an object.
425

426
  Args:
427
    instance: objects.Instance object of instance
428
    override: dict of values to override
429
  """
430
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
431
  args = {
432
    'name': instance.name,
433
    'primary_node': instance.primary_node,
434
    'secondary_nodes': instance.secondary_nodes,
435
    'os_type': instance.os,
436
    'status': instance.os,
437
    'memory': bep[constants.BE_MEMORY],
438
    'vcpus': bep[constants.BE_VCPUS],
439
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
440
  }
441
  if override:
442
    args.update(override)
443
  return _BuildInstanceHookEnv(**args)
444

    
445

    
446
def _CheckInstanceBridgesExist(lu, instance):
447
  """Check that the brigdes needed by an instance exist.
448

449
  """
450
  # check bridges existance
451
  brlist = [nic.bridge for nic in instance.nics]
452
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
453
    raise errors.OpPrereqError("one or more target bridges %s does not"
454
                               " exist on destination node '%s'" %
455
                               (brlist, instance.primary_node))
456

    
457

    
458
class LUDestroyCluster(NoHooksLU):
459
  """Logical unit for destroying the cluster.
460

461
  """
462
  _OP_REQP = []
463

    
464
  def CheckPrereq(self):
465
    """Check prerequisites.
466

467
    This checks whether the cluster is empty.
468

469
    Any errors are signalled by raising errors.OpPrereqError.
470

471
    """
472
    master = self.cfg.GetMasterNode()
473

    
474
    nodelist = self.cfg.GetNodeList()
475
    if len(nodelist) != 1 or nodelist[0] != master:
476
      raise errors.OpPrereqError("There are still %d node(s) in"
477
                                 " this cluster." % (len(nodelist) - 1))
478
    instancelist = self.cfg.GetInstanceList()
479
    if instancelist:
480
      raise errors.OpPrereqError("There are still %d instance(s) in"
481
                                 " this cluster." % len(instancelist))
482

    
483
  def Exec(self, feedback_fn):
484
    """Destroys the cluster.
485

486
    """
487
    master = self.cfg.GetMasterNode()
488
    if not self.rpc.call_node_stop_master(master, False):
489
      raise errors.OpExecError("Could not disable the master role")
490
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
491
    utils.CreateBackup(priv_key)
492
    utils.CreateBackup(pub_key)
493
    return master
494

    
495

    
496
class LUVerifyCluster(LogicalUnit):
497
  """Verifies the cluster status.
498

499
  """
500
  HPATH = "cluster-verify"
501
  HTYPE = constants.HTYPE_CLUSTER
502
  _OP_REQP = ["skip_checks"]
503
  REQ_BGL = False
504

    
505
  def ExpandNames(self):
506
    self.needed_locks = {
507
      locking.LEVEL_NODE: locking.ALL_SET,
508
      locking.LEVEL_INSTANCE: locking.ALL_SET,
509
    }
510
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
511

    
512
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
513
                  remote_version, feedback_fn):
514
    """Run multiple tests against a node.
515

516
    Test list:
517
      - compares ganeti version
518
      - checks vg existance and size > 20G
519
      - checks config file checksum
520
      - checks ssh to other nodes
521

522
    Args:
523
      node: name of the node to check
524
      file_list: required list of files
525
      local_cksum: dictionary of local files and their checksums
526

527
    """
528
    # compares ganeti version
529
    local_version = constants.PROTOCOL_VERSION
530
    if not remote_version:
531
      feedback_fn("  - ERROR: connection to %s failed" % (node))
532
      return True
533

    
534
    if local_version != remote_version:
535
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
536
                      (local_version, node, remote_version))
537
      return True
538

    
539
    # checks vg existance and size > 20G
540

    
541
    bad = False
542
    if not vglist:
543
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
544
                      (node,))
545
      bad = True
546
    else:
547
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
548
                                            constants.MIN_VG_SIZE)
549
      if vgstatus:
550
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
551
        bad = True
552

    
553
    if not node_result:
554
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
555
      return True
556

    
557
    # checks config file checksum
558
    # checks ssh to any
559

    
560
    if 'filelist' not in node_result:
561
      bad = True
562
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
563
    else:
564
      remote_cksum = node_result['filelist']
565
      for file_name in file_list:
566
        if file_name not in remote_cksum:
567
          bad = True
568
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
569
        elif remote_cksum[file_name] != local_cksum[file_name]:
570
          bad = True
571
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
572

    
573
    if 'nodelist' not in node_result:
574
      bad = True
575
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
576
    else:
577
      if node_result['nodelist']:
578
        bad = True
579
        for node in node_result['nodelist']:
580
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
581
                          (node, node_result['nodelist'][node]))
582
    if 'node-net-test' not in node_result:
583
      bad = True
584
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
585
    else:
586
      if node_result['node-net-test']:
587
        bad = True
588
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
589
        for node in nlist:
590
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
591
                          (node, node_result['node-net-test'][node]))
592

    
593
    hyp_result = node_result.get('hypervisor', None)
594
    if isinstance(hyp_result, dict):
595
      for hv_name, hv_result in hyp_result.iteritems():
596
        if hv_result is not None:
597
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
598
                      (hv_name, hv_result))
599
    return bad
600

    
601
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
602
                      node_instance, feedback_fn):
603
    """Verify an instance.
604

605
    This function checks to see if the required block devices are
606
    available on the instance's node.
607

608
    """
609
    bad = False
610

    
611
    node_current = instanceconfig.primary_node
612

    
613
    node_vol_should = {}
614
    instanceconfig.MapLVsByNode(node_vol_should)
615

    
616
    for node in node_vol_should:
617
      for volume in node_vol_should[node]:
618
        if node not in node_vol_is or volume not in node_vol_is[node]:
619
          feedback_fn("  - ERROR: volume %s missing on node %s" %
620
                          (volume, node))
621
          bad = True
622

    
623
    if not instanceconfig.status == 'down':
624
      if (node_current not in node_instance or
625
          not instance in node_instance[node_current]):
626
        feedback_fn("  - ERROR: instance %s not running on node %s" %
627
                        (instance, node_current))
628
        bad = True
629

    
630
    for node in node_instance:
631
      if (not node == node_current):
632
        if instance in node_instance[node]:
633
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
634
                          (instance, node))
635
          bad = True
636

    
637
    return bad
638

    
639
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
640
    """Verify if there are any unknown volumes in the cluster.
641

642
    The .os, .swap and backup volumes are ignored. All other volumes are
643
    reported as unknown.
644

645
    """
646
    bad = False
647

    
648
    for node in node_vol_is:
649
      for volume in node_vol_is[node]:
650
        if node not in node_vol_should or volume not in node_vol_should[node]:
651
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
652
                      (volume, node))
653
          bad = True
654
    return bad
655

    
656
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
657
    """Verify the list of running instances.
658

659
    This checks what instances are running but unknown to the cluster.
660

661
    """
662
    bad = False
663
    for node in node_instance:
664
      for runninginstance in node_instance[node]:
665
        if runninginstance not in instancelist:
666
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
667
                          (runninginstance, node))
668
          bad = True
669
    return bad
670

    
671
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
672
    """Verify N+1 Memory Resilience.
673

674
    Check that if one single node dies we can still start all the instances it
675
    was primary for.
676

677
    """
678
    bad = False
679

    
680
    for node, nodeinfo in node_info.iteritems():
681
      # This code checks that every node which is now listed as secondary has
682
      # enough memory to host all instances it is supposed to should a single
683
      # other node in the cluster fail.
684
      # FIXME: not ready for failover to an arbitrary node
685
      # FIXME: does not support file-backed instances
686
      # WARNING: we currently take into account down instances as well as up
687
      # ones, considering that even if they're down someone might want to start
688
      # them even in the event of a node failure.
689
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
690
        needed_mem = 0
691
        for instance in instances:
692
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
693
          if bep[constants.BE_AUTO_BALANCE]:
694
            needed_mem += bep[constants.BE_MEMORY]
695
        if nodeinfo['mfree'] < needed_mem:
696
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
697
                      " failovers should node %s fail" % (node, prinode))
698
          bad = True
699
    return bad
700

    
701
  def CheckPrereq(self):
702
    """Check prerequisites.
703

704
    Transform the list of checks we're going to skip into a set and check that
705
    all its members are valid.
706

707
    """
708
    self.skip_set = frozenset(self.op.skip_checks)
709
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
710
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
711

    
712
  def BuildHooksEnv(self):
713
    """Build hooks env.
714

715
    Cluster-Verify hooks just rone in the post phase and their failure makes
716
    the output be logged in the verify output and the verification to fail.
717

718
    """
719
    all_nodes = self.cfg.GetNodeList()
720
    # TODO: populate the environment with useful information for verify hooks
721
    env = {}
722
    return env, [], all_nodes
723

    
724
  def Exec(self, feedback_fn):
725
    """Verify integrity of cluster, performing various test on nodes.
726

727
    """
728
    bad = False
729
    feedback_fn("* Verifying global settings")
730
    for msg in self.cfg.VerifyConfig():
731
      feedback_fn("  - ERROR: %s" % msg)
732

    
733
    vg_name = self.cfg.GetVGName()
734
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
735
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
736
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
737
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
738
    i_non_redundant = [] # Non redundant instances
739
    i_non_a_balanced = [] # Non auto-balanced instances
740
    node_volume = {}
741
    node_instance = {}
742
    node_info = {}
743
    instance_cfg = {}
744

    
745
    # FIXME: verify OS list
746
    # do local checksums
747
    file_names = []
748
    file_names.append(constants.SSL_CERT_FILE)
749
    file_names.append(constants.CLUSTER_CONF_FILE)
750
    local_checksums = utils.FingerprintFiles(file_names)
751

    
752
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
753
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
754
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
755
    all_vglist = self.rpc.call_vg_list(nodelist)
756
    node_verify_param = {
757
      'filelist': file_names,
758
      'nodelist': nodelist,
759
      'hypervisor': hypervisors,
760
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
761
                        for node in nodeinfo]
762
      }
763
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
764
                                           self.cfg.GetClusterName())
765
    all_rversion = self.rpc.call_version(nodelist)
766
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
767
                                        self.cfg.GetHypervisorType())
768

    
769
    cluster = self.cfg.GetClusterInfo()
770
    for node in nodelist:
771
      feedback_fn("* Verifying node %s" % node)
772
      result = self._VerifyNode(node, file_names, local_checksums,
773
                                all_vglist[node], all_nvinfo[node],
774
                                all_rversion[node], feedback_fn)
775
      bad = bad or result
776

    
777
      # node_volume
778
      volumeinfo = all_volumeinfo[node]
779

    
780
      if isinstance(volumeinfo, basestring):
781
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
782
                    (node, volumeinfo[-400:].encode('string_escape')))
783
        bad = True
784
        node_volume[node] = {}
785
      elif not isinstance(volumeinfo, dict):
786
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
787
        bad = True
788
        continue
789
      else:
790
        node_volume[node] = volumeinfo
791

    
792
      # node_instance
793
      nodeinstance = all_instanceinfo[node]
794
      if type(nodeinstance) != list:
795
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
796
        bad = True
797
        continue
798

    
799
      node_instance[node] = nodeinstance
800

    
801
      # node_info
802
      nodeinfo = all_ninfo[node]
803
      if not isinstance(nodeinfo, dict):
804
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
805
        bad = True
806
        continue
807

    
808
      try:
809
        node_info[node] = {
810
          "mfree": int(nodeinfo['memory_free']),
811
          "dfree": int(nodeinfo['vg_free']),
812
          "pinst": [],
813
          "sinst": [],
814
          # dictionary holding all instances this node is secondary for,
815
          # grouped by their primary node. Each key is a cluster node, and each
816
          # value is a list of instances which have the key as primary and the
817
          # current node as secondary.  this is handy to calculate N+1 memory
818
          # availability if you can only failover from a primary to its
819
          # secondary.
820
          "sinst-by-pnode": {},
821
        }
822
      except ValueError:
823
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
824
        bad = True
825
        continue
826

    
827
    node_vol_should = {}
828

    
829
    for instance in instancelist:
830
      feedback_fn("* Verifying instance %s" % instance)
831
      inst_config = self.cfg.GetInstanceInfo(instance)
832
      result =  self._VerifyInstance(instance, inst_config, node_volume,
833
                                     node_instance, feedback_fn)
834
      bad = bad or result
835

    
836
      inst_config.MapLVsByNode(node_vol_should)
837

    
838
      instance_cfg[instance] = inst_config
839

    
840
      pnode = inst_config.primary_node
841
      if pnode in node_info:
842
        node_info[pnode]['pinst'].append(instance)
843
      else:
844
        feedback_fn("  - ERROR: instance %s, connection to primary node"
845
                    " %s failed" % (instance, pnode))
846
        bad = True
847

    
848
      # If the instance is non-redundant we cannot survive losing its primary
849
      # node, so we are not N+1 compliant. On the other hand we have no disk
850
      # templates with more than one secondary so that situation is not well
851
      # supported either.
852
      # FIXME: does not support file-backed instances
853
      if len(inst_config.secondary_nodes) == 0:
854
        i_non_redundant.append(instance)
855
      elif len(inst_config.secondary_nodes) > 1:
856
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
857
                    % instance)
858

    
859
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
860
        i_non_a_balanced.append(instance)
861

    
862
      for snode in inst_config.secondary_nodes:
863
        if snode in node_info:
864
          node_info[snode]['sinst'].append(instance)
865
          if pnode not in node_info[snode]['sinst-by-pnode']:
866
            node_info[snode]['sinst-by-pnode'][pnode] = []
867
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
868
        else:
869
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
870
                      " %s failed" % (instance, snode))
871

    
872
    feedback_fn("* Verifying orphan volumes")
873
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
874
                                       feedback_fn)
875
    bad = bad or result
876

    
877
    feedback_fn("* Verifying remaining instances")
878
    result = self._VerifyOrphanInstances(instancelist, node_instance,
879
                                         feedback_fn)
880
    bad = bad or result
881

    
882
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
883
      feedback_fn("* Verifying N+1 Memory redundancy")
884
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
885
      bad = bad or result
886

    
887
    feedback_fn("* Other Notes")
888
    if i_non_redundant:
889
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
890
                  % len(i_non_redundant))
891

    
892
    if i_non_a_balanced:
893
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
894
                  % len(i_non_a_balanced))
895

    
896
    return not bad
897

    
898
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
899
    """Analize the post-hooks' result, handle it, and send some
900
    nicely-formatted feedback back to the user.
901

902
    Args:
903
      phase: the hooks phase that has just been run
904
      hooks_results: the results of the multi-node hooks rpc call
905
      feedback_fn: function to send feedback back to the caller
906
      lu_result: previous Exec result
907

908
    """
909
    # We only really run POST phase hooks, and are only interested in
910
    # their results
911
    if phase == constants.HOOKS_PHASE_POST:
912
      # Used to change hooks' output to proper indentation
913
      indent_re = re.compile('^', re.M)
914
      feedback_fn("* Hooks Results")
915
      if not hooks_results:
916
        feedback_fn("  - ERROR: general communication failure")
917
        lu_result = 1
918
      else:
919
        for node_name in hooks_results:
920
          show_node_header = True
921
          res = hooks_results[node_name]
922
          if res is False or not isinstance(res, list):
923
            feedback_fn("    Communication failure")
924
            lu_result = 1
925
            continue
926
          for script, hkr, output in res:
927
            if hkr == constants.HKR_FAIL:
928
              # The node header is only shown once, if there are
929
              # failing hooks on that node
930
              if show_node_header:
931
                feedback_fn("  Node %s:" % node_name)
932
                show_node_header = False
933
              feedback_fn("    ERROR: Script %s failed, output:" % script)
934
              output = indent_re.sub('      ', output)
935
              feedback_fn("%s" % output)
936
              lu_result = 1
937

    
938
      return lu_result
939

    
940

    
941
class LUVerifyDisks(NoHooksLU):
942
  """Verifies the cluster disks status.
943

944
  """
945
  _OP_REQP = []
946
  REQ_BGL = False
947

    
948
  def ExpandNames(self):
949
    self.needed_locks = {
950
      locking.LEVEL_NODE: locking.ALL_SET,
951
      locking.LEVEL_INSTANCE: locking.ALL_SET,
952
    }
953
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
954

    
955
  def CheckPrereq(self):
956
    """Check prerequisites.
957

958
    This has no prerequisites.
959

960
    """
961
    pass
962

    
963
  def Exec(self, feedback_fn):
964
    """Verify integrity of cluster disks.
965

966
    """
967
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
968

    
969
    vg_name = self.cfg.GetVGName()
970
    nodes = utils.NiceSort(self.cfg.GetNodeList())
971
    instances = [self.cfg.GetInstanceInfo(name)
972
                 for name in self.cfg.GetInstanceList()]
973

    
974
    nv_dict = {}
975
    for inst in instances:
976
      inst_lvs = {}
977
      if (inst.status != "up" or
978
          inst.disk_template not in constants.DTS_NET_MIRROR):
979
        continue
980
      inst.MapLVsByNode(inst_lvs)
981
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
982
      for node, vol_list in inst_lvs.iteritems():
983
        for vol in vol_list:
984
          nv_dict[(node, vol)] = inst
985

    
986
    if not nv_dict:
987
      return result
988

    
989
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
990

    
991
    to_act = set()
992
    for node in nodes:
993
      # node_volume
994
      lvs = node_lvs[node]
995

    
996
      if isinstance(lvs, basestring):
997
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
998
        res_nlvm[node] = lvs
999
      elif not isinstance(lvs, dict):
1000
        logging.warning("Connection to node %s failed or invalid data"
1001
                        " returned", node)
1002
        res_nodes.append(node)
1003
        continue
1004

    
1005
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1006
        inst = nv_dict.pop((node, lv_name), None)
1007
        if (not lv_online and inst is not None
1008
            and inst.name not in res_instances):
1009
          res_instances.append(inst.name)
1010

    
1011
    # any leftover items in nv_dict are missing LVs, let's arrange the
1012
    # data better
1013
    for key, inst in nv_dict.iteritems():
1014
      if inst.name not in res_missing:
1015
        res_missing[inst.name] = []
1016
      res_missing[inst.name].append(key)
1017

    
1018
    return result
1019

    
1020

    
1021
class LURenameCluster(LogicalUnit):
1022
  """Rename the cluster.
1023

1024
  """
1025
  HPATH = "cluster-rename"
1026
  HTYPE = constants.HTYPE_CLUSTER
1027
  _OP_REQP = ["name"]
1028

    
1029
  def BuildHooksEnv(self):
1030
    """Build hooks env.
1031

1032
    """
1033
    env = {
1034
      "OP_TARGET": self.cfg.GetClusterName(),
1035
      "NEW_NAME": self.op.name,
1036
      }
1037
    mn = self.cfg.GetMasterNode()
1038
    return env, [mn], [mn]
1039

    
1040
  def CheckPrereq(self):
1041
    """Verify that the passed name is a valid one.
1042

1043
    """
1044
    hostname = utils.HostInfo(self.op.name)
1045

    
1046
    new_name = hostname.name
1047
    self.ip = new_ip = hostname.ip
1048
    old_name = self.cfg.GetClusterName()
1049
    old_ip = self.cfg.GetMasterIP()
1050
    if new_name == old_name and new_ip == old_ip:
1051
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1052
                                 " cluster has changed")
1053
    if new_ip != old_ip:
1054
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1055
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1056
                                   " reachable on the network. Aborting." %
1057
                                   new_ip)
1058

    
1059
    self.op.name = new_name
1060

    
1061
  def Exec(self, feedback_fn):
1062
    """Rename the cluster.
1063

1064
    """
1065
    clustername = self.op.name
1066
    ip = self.ip
1067

    
1068
    # shutdown the master IP
1069
    master = self.cfg.GetMasterNode()
1070
    if not self.rpc.call_node_stop_master(master, False):
1071
      raise errors.OpExecError("Could not disable the master role")
1072

    
1073
    try:
1074
      # modify the sstore
1075
      # TODO: sstore
1076
      ss.SetKey(ss.SS_MASTER_IP, ip)
1077
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1078

    
1079
      # Distribute updated ss config to all nodes
1080
      myself = self.cfg.GetNodeInfo(master)
1081
      dist_nodes = self.cfg.GetNodeList()
1082
      if myself.name in dist_nodes:
1083
        dist_nodes.remove(myself.name)
1084

    
1085
      logging.debug("Copying updated ssconf data to all nodes")
1086
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1087
        fname = ss.KeyToFilename(keyname)
1088
        result = self.rpc.call_upload_file(dist_nodes, fname)
1089
        for to_node in dist_nodes:
1090
          if not result[to_node]:
1091
            logging.error("Copy of file %s to node %s failed", fname, to_node)
1092
    finally:
1093
      if not self.rpc.call_node_start_master(master, False):
1094
        logging.error("Could not re-enable the master role on the master,"
1095
                      " please restart manually.")
1096

    
1097

    
1098
def _RecursiveCheckIfLVMBased(disk):
1099
  """Check if the given disk or its children are lvm-based.
1100

1101
  Args:
1102
    disk: ganeti.objects.Disk object
1103

1104
  Returns:
1105
    boolean indicating whether a LD_LV dev_type was found or not
1106

1107
  """
1108
  if disk.children:
1109
    for chdisk in disk.children:
1110
      if _RecursiveCheckIfLVMBased(chdisk):
1111
        return True
1112
  return disk.dev_type == constants.LD_LV
1113

    
1114

    
1115
class LUSetClusterParams(LogicalUnit):
1116
  """Change the parameters of the cluster.
1117

1118
  """
1119
  HPATH = "cluster-modify"
1120
  HTYPE = constants.HTYPE_CLUSTER
1121
  _OP_REQP = []
1122
  REQ_BGL = False
1123

    
1124
  def ExpandNames(self):
1125
    # FIXME: in the future maybe other cluster params won't require checking on
1126
    # all nodes to be modified.
1127
    self.needed_locks = {
1128
      locking.LEVEL_NODE: locking.ALL_SET,
1129
    }
1130
    self.share_locks[locking.LEVEL_NODE] = 1
1131

    
1132
  def BuildHooksEnv(self):
1133
    """Build hooks env.
1134

1135
    """
1136
    env = {
1137
      "OP_TARGET": self.cfg.GetClusterName(),
1138
      "NEW_VG_NAME": self.op.vg_name,
1139
      }
1140
    mn = self.cfg.GetMasterNode()
1141
    return env, [mn], [mn]
1142

    
1143
  def CheckPrereq(self):
1144
    """Check prerequisites.
1145

1146
    This checks whether the given params don't conflict and
1147
    if the given volume group is valid.
1148

1149
    """
1150
    # FIXME: This only works because there is only one parameter that can be
1151
    # changed or removed.
1152
    if self.op.vg_name is not None and not self.op.vg_name:
1153
      instances = self.cfg.GetAllInstancesInfo().values()
1154
      for inst in instances:
1155
        for disk in inst.disks:
1156
          if _RecursiveCheckIfLVMBased(disk):
1157
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1158
                                       " lvm-based instances exist")
1159

    
1160
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1161

    
1162
    # if vg_name not None, checks given volume group on all nodes
1163
    if self.op.vg_name:
1164
      vglist = self.rpc.call_vg_list(node_list)
1165
      for node in node_list:
1166
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1167
                                              constants.MIN_VG_SIZE)
1168
        if vgstatus:
1169
          raise errors.OpPrereqError("Error on node '%s': %s" %
1170
                                     (node, vgstatus))
1171

    
1172
    self.cluster = cluster = self.cfg.GetClusterInfo()
1173
    # beparams changes do not need validation (we can't validate?),
1174
    # but we still process here
1175
    if self.op.beparams:
1176
      self.new_beparams = cluster.FillDict(
1177
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1178

    
1179
    # hypervisor list/parameters
1180
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1181
    if self.op.hvparams:
1182
      if not isinstance(self.op.hvparams, dict):
1183
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1184
      for hv_name, hv_dict in self.op.hvparams.items():
1185
        if hv_name not in self.new_hvparams:
1186
          self.new_hvparams[hv_name] = hv_dict
1187
        else:
1188
          self.new_hvparams[hv_name].update(hv_dict)
1189

    
1190
    if self.op.enabled_hypervisors is not None:
1191
      self.hv_list = self.op.enabled_hypervisors
1192
    else:
1193
      self.hv_list = cluster.enabled_hypervisors
1194

    
1195
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1196
      # either the enabled list has changed, or the parameters have, validate
1197
      for hv_name, hv_params in self.new_hvparams.items():
1198
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1199
            (self.op.enabled_hypervisors and
1200
             hv_name in self.op.enabled_hypervisors)):
1201
          # either this is a new hypervisor, or its parameters have changed
1202
          hv_class = hypervisor.GetHypervisor(hv_name)
1203
          hv_class.CheckParameterSyntax(hv_params)
1204
          _CheckHVParams(self, node_list, hv_name, hv_params)
1205

    
1206
  def Exec(self, feedback_fn):
1207
    """Change the parameters of the cluster.
1208

1209
    """
1210
    if self.op.vg_name is not None:
1211
      if self.op.vg_name != self.cfg.GetVGName():
1212
        self.cfg.SetVGName(self.op.vg_name)
1213
      else:
1214
        feedback_fn("Cluster LVM configuration already in desired"
1215
                    " state, not changing")
1216
    if self.op.hvparams:
1217
      self.cluster.hvparams = self.new_hvparams
1218
    if self.op.enabled_hypervisors is not None:
1219
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1220
    if self.op.beparams:
1221
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1222
    self.cfg.Update(self.cluster)
1223

    
1224

    
1225
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1226
  """Sleep and poll for an instance's disk to sync.
1227

1228
  """
1229
  if not instance.disks:
1230
    return True
1231

    
1232
  if not oneshot:
1233
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1234

    
1235
  node = instance.primary_node
1236

    
1237
  for dev in instance.disks:
1238
    lu.cfg.SetDiskID(dev, node)
1239

    
1240
  retries = 0
1241
  while True:
1242
    max_time = 0
1243
    done = True
1244
    cumul_degraded = False
1245
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1246
    if not rstats:
1247
      lu.proc.LogWarning("Can't get any data from node %s" % node)
1248
      retries += 1
1249
      if retries >= 10:
1250
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1251
                                 " aborting." % node)
1252
      time.sleep(6)
1253
      continue
1254
    retries = 0
1255
    for i in range(len(rstats)):
1256
      mstat = rstats[i]
1257
      if mstat is None:
1258
        lu.proc.LogWarning("Can't compute data for node %s/%s" %
1259
                           (node, instance.disks[i].iv_name))
1260
        continue
1261
      # we ignore the ldisk parameter
1262
      perc_done, est_time, is_degraded, _ = mstat
1263
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1264
      if perc_done is not None:
1265
        done = False
1266
        if est_time is not None:
1267
          rem_time = "%d estimated seconds remaining" % est_time
1268
          max_time = est_time
1269
        else:
1270
          rem_time = "no time estimate"
1271
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1272
                        (instance.disks[i].iv_name, perc_done, rem_time))
1273
    if done or oneshot:
1274
      break
1275

    
1276
    time.sleep(min(60, max_time))
1277

    
1278
  if done:
1279
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1280
  return not cumul_degraded
1281

    
1282

    
1283
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1284
  """Check that mirrors are not degraded.
1285

1286
  The ldisk parameter, if True, will change the test from the
1287
  is_degraded attribute (which represents overall non-ok status for
1288
  the device(s)) to the ldisk (representing the local storage status).
1289

1290
  """
1291
  lu.cfg.SetDiskID(dev, node)
1292
  if ldisk:
1293
    idx = 6
1294
  else:
1295
    idx = 5
1296

    
1297
  result = True
1298
  if on_primary or dev.AssembleOnSecondary():
1299
    rstats = lu.rpc.call_blockdev_find(node, dev)
1300
    if not rstats:
1301
      logging.warning("Node %s: disk degraded, not found or node down", node)
1302
      result = False
1303
    else:
1304
      result = result and (not rstats[idx])
1305
  if dev.children:
1306
    for child in dev.children:
1307
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1308

    
1309
  return result
1310

    
1311

    
1312
class LUDiagnoseOS(NoHooksLU):
1313
  """Logical unit for OS diagnose/query.
1314

1315
  """
1316
  _OP_REQP = ["output_fields", "names"]
1317
  REQ_BGL = False
1318

    
1319
  def ExpandNames(self):
1320
    if self.op.names:
1321
      raise errors.OpPrereqError("Selective OS query not supported")
1322

    
1323
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1324
    _CheckOutputFields(static=[],
1325
                       dynamic=self.dynamic_fields,
1326
                       selected=self.op.output_fields)
1327

    
1328
    # Lock all nodes, in shared mode
1329
    self.needed_locks = {}
1330
    self.share_locks[locking.LEVEL_NODE] = 1
1331
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1332

    
1333
  def CheckPrereq(self):
1334
    """Check prerequisites.
1335

1336
    """
1337

    
1338
  @staticmethod
1339
  def _DiagnoseByOS(node_list, rlist):
1340
    """Remaps a per-node return list into an a per-os per-node dictionary
1341

1342
      Args:
1343
        node_list: a list with the names of all nodes
1344
        rlist: a map with node names as keys and OS objects as values
1345

1346
      Returns:
1347
        map: a map with osnames as keys and as value another map, with
1348
             nodes as
1349
             keys and list of OS objects as values
1350
             e.g. {"debian-etch": {"node1": [<object>,...],
1351
                                   "node2": [<object>,]}
1352
                  }
1353

1354
    """
1355
    all_os = {}
1356
    for node_name, nr in rlist.iteritems():
1357
      if not nr:
1358
        continue
1359
      for os_obj in nr:
1360
        if os_obj.name not in all_os:
1361
          # build a list of nodes for this os containing empty lists
1362
          # for each node in node_list
1363
          all_os[os_obj.name] = {}
1364
          for nname in node_list:
1365
            all_os[os_obj.name][nname] = []
1366
        all_os[os_obj.name][node_name].append(os_obj)
1367
    return all_os
1368

    
1369
  def Exec(self, feedback_fn):
1370
    """Compute the list of OSes.
1371

1372
    """
1373
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1374
    node_data = self.rpc.call_os_diagnose(node_list)
1375
    if node_data == False:
1376
      raise errors.OpExecError("Can't gather the list of OSes")
1377
    pol = self._DiagnoseByOS(node_list, node_data)
1378
    output = []
1379
    for os_name, os_data in pol.iteritems():
1380
      row = []
1381
      for field in self.op.output_fields:
1382
        if field == "name":
1383
          val = os_name
1384
        elif field == "valid":
1385
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1386
        elif field == "node_status":
1387
          val = {}
1388
          for node_name, nos_list in os_data.iteritems():
1389
            val[node_name] = [(v.status, v.path) for v in nos_list]
1390
        else:
1391
          raise errors.ParameterError(field)
1392
        row.append(val)
1393
      output.append(row)
1394

    
1395
    return output
1396

    
1397

    
1398
class LURemoveNode(LogicalUnit):
1399
  """Logical unit for removing a node.
1400

1401
  """
1402
  HPATH = "node-remove"
1403
  HTYPE = constants.HTYPE_NODE
1404
  _OP_REQP = ["node_name"]
1405

    
1406
  def BuildHooksEnv(self):
1407
    """Build hooks env.
1408

1409
    This doesn't run on the target node in the pre phase as a failed
1410
    node would then be impossible to remove.
1411

1412
    """
1413
    env = {
1414
      "OP_TARGET": self.op.node_name,
1415
      "NODE_NAME": self.op.node_name,
1416
      }
1417
    all_nodes = self.cfg.GetNodeList()
1418
    all_nodes.remove(self.op.node_name)
1419
    return env, all_nodes, all_nodes
1420

    
1421
  def CheckPrereq(self):
1422
    """Check prerequisites.
1423

1424
    This checks:
1425
     - the node exists in the configuration
1426
     - it does not have primary or secondary instances
1427
     - it's not the master
1428

1429
    Any errors are signalled by raising errors.OpPrereqError.
1430

1431
    """
1432
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1433
    if node is None:
1434
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1435

    
1436
    instance_list = self.cfg.GetInstanceList()
1437

    
1438
    masternode = self.cfg.GetMasterNode()
1439
    if node.name == masternode:
1440
      raise errors.OpPrereqError("Node is the master node,"
1441
                                 " you need to failover first.")
1442

    
1443
    for instance_name in instance_list:
1444
      instance = self.cfg.GetInstanceInfo(instance_name)
1445
      if node.name == instance.primary_node:
1446
        raise errors.OpPrereqError("Instance %s still running on the node,"
1447
                                   " please remove first." % instance_name)
1448
      if node.name in instance.secondary_nodes:
1449
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1450
                                   " please remove first." % instance_name)
1451
    self.op.node_name = node.name
1452
    self.node = node
1453

    
1454
  def Exec(self, feedback_fn):
1455
    """Removes the node from the cluster.
1456

1457
    """
1458
    node = self.node
1459
    logging.info("Stopping the node daemon and removing configs from node %s",
1460
                 node.name)
1461

    
1462
    self.context.RemoveNode(node.name)
1463

    
1464
    self.rpc.call_node_leave_cluster(node.name)
1465

    
1466

    
1467
class LUQueryNodes(NoHooksLU):
1468
  """Logical unit for querying nodes.
1469

1470
  """
1471
  _OP_REQP = ["output_fields", "names"]
1472
  REQ_BGL = False
1473

    
1474
  def ExpandNames(self):
1475
    self.dynamic_fields = frozenset([
1476
      "dtotal", "dfree",
1477
      "mtotal", "mnode", "mfree",
1478
      "bootid",
1479
      "ctotal",
1480
      ])
1481

    
1482
    self.static_fields = frozenset([
1483
      "name", "pinst_cnt", "sinst_cnt",
1484
      "pinst_list", "sinst_list",
1485
      "pip", "sip", "tags",
1486
      "serial_no",
1487
      ])
1488

    
1489
    _CheckOutputFields(static=self.static_fields,
1490
                       dynamic=self.dynamic_fields,
1491
                       selected=self.op.output_fields)
1492

    
1493
    self.needed_locks = {}
1494
    self.share_locks[locking.LEVEL_NODE] = 1
1495

    
1496
    if self.op.names:
1497
      self.wanted = _GetWantedNodes(self, self.op.names)
1498
    else:
1499
      self.wanted = locking.ALL_SET
1500

    
1501
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1502
    if self.do_locking:
1503
      # if we don't request only static fields, we need to lock the nodes
1504
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1505

    
1506

    
1507
  def CheckPrereq(self):
1508
    """Check prerequisites.
1509

1510
    """
1511
    # The validation of the node list is done in the _GetWantedNodes,
1512
    # if non empty, and if empty, there's no validation to do
1513
    pass
1514

    
1515
  def Exec(self, feedback_fn):
1516
    """Computes the list of nodes and their attributes.
1517

1518
    """
1519
    all_info = self.cfg.GetAllNodesInfo()
1520
    if self.do_locking:
1521
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1522
    elif self.wanted != locking.ALL_SET:
1523
      nodenames = self.wanted
1524
      missing = set(nodenames).difference(all_info.keys())
1525
      if missing:
1526
        raise errors.OpExecError(
1527
          "Some nodes were removed before retrieving their data: %s" % missing)
1528
    else:
1529
      nodenames = all_info.keys()
1530

    
1531
    nodenames = utils.NiceSort(nodenames)
1532
    nodelist = [all_info[name] for name in nodenames]
1533

    
1534
    # begin data gathering
1535

    
1536
    if self.dynamic_fields.intersection(self.op.output_fields):
1537
      live_data = {}
1538
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1539
                                          self.cfg.GetHypervisorType())
1540
      for name in nodenames:
1541
        nodeinfo = node_data.get(name, None)
1542
        if nodeinfo:
1543
          live_data[name] = {
1544
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1545
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1546
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1547
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1548
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1549
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1550
            "bootid": nodeinfo['bootid'],
1551
            }
1552
        else:
1553
          live_data[name] = {}
1554
    else:
1555
      live_data = dict.fromkeys(nodenames, {})
1556

    
1557
    node_to_primary = dict([(name, set()) for name in nodenames])
1558
    node_to_secondary = dict([(name, set()) for name in nodenames])
1559

    
1560
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1561
                             "sinst_cnt", "sinst_list"))
1562
    if inst_fields & frozenset(self.op.output_fields):
1563
      instancelist = self.cfg.GetInstanceList()
1564

    
1565
      for instance_name in instancelist:
1566
        inst = self.cfg.GetInstanceInfo(instance_name)
1567
        if inst.primary_node in node_to_primary:
1568
          node_to_primary[inst.primary_node].add(inst.name)
1569
        for secnode in inst.secondary_nodes:
1570
          if secnode in node_to_secondary:
1571
            node_to_secondary[secnode].add(inst.name)
1572

    
1573
    # end data gathering
1574

    
1575
    output = []
1576
    for node in nodelist:
1577
      node_output = []
1578
      for field in self.op.output_fields:
1579
        if field == "name":
1580
          val = node.name
1581
        elif field == "pinst_list":
1582
          val = list(node_to_primary[node.name])
1583
        elif field == "sinst_list":
1584
          val = list(node_to_secondary[node.name])
1585
        elif field == "pinst_cnt":
1586
          val = len(node_to_primary[node.name])
1587
        elif field == "sinst_cnt":
1588
          val = len(node_to_secondary[node.name])
1589
        elif field == "pip":
1590
          val = node.primary_ip
1591
        elif field == "sip":
1592
          val = node.secondary_ip
1593
        elif field == "tags":
1594
          val = list(node.GetTags())
1595
        elif field == "serial_no":
1596
          val = node.serial_no
1597
        elif field in self.dynamic_fields:
1598
          val = live_data[node.name].get(field, None)
1599
        else:
1600
          raise errors.ParameterError(field)
1601
        node_output.append(val)
1602
      output.append(node_output)
1603

    
1604
    return output
1605

    
1606

    
1607
class LUQueryNodeVolumes(NoHooksLU):
1608
  """Logical unit for getting volumes on node(s).
1609

1610
  """
1611
  _OP_REQP = ["nodes", "output_fields"]
1612
  REQ_BGL = False
1613

    
1614
  def ExpandNames(self):
1615
    _CheckOutputFields(static=["node"],
1616
                       dynamic=["phys", "vg", "name", "size", "instance"],
1617
                       selected=self.op.output_fields)
1618

    
1619
    self.needed_locks = {}
1620
    self.share_locks[locking.LEVEL_NODE] = 1
1621
    if not self.op.nodes:
1622
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1623
    else:
1624
      self.needed_locks[locking.LEVEL_NODE] = \
1625
        _GetWantedNodes(self, self.op.nodes)
1626

    
1627
  def CheckPrereq(self):
1628
    """Check prerequisites.
1629

1630
    This checks that the fields required are valid output fields.
1631

1632
    """
1633
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1634

    
1635
  def Exec(self, feedback_fn):
1636
    """Computes the list of nodes and their attributes.
1637

1638
    """
1639
    nodenames = self.nodes
1640
    volumes = self.rpc.call_node_volumes(nodenames)
1641

    
1642
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1643
             in self.cfg.GetInstanceList()]
1644

    
1645
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1646

    
1647
    output = []
1648
    for node in nodenames:
1649
      if node not in volumes or not volumes[node]:
1650
        continue
1651

    
1652
      node_vols = volumes[node][:]
1653
      node_vols.sort(key=lambda vol: vol['dev'])
1654

    
1655
      for vol in node_vols:
1656
        node_output = []
1657
        for field in self.op.output_fields:
1658
          if field == "node":
1659
            val = node
1660
          elif field == "phys":
1661
            val = vol['dev']
1662
          elif field == "vg":
1663
            val = vol['vg']
1664
          elif field == "name":
1665
            val = vol['name']
1666
          elif field == "size":
1667
            val = int(float(vol['size']))
1668
          elif field == "instance":
1669
            for inst in ilist:
1670
              if node not in lv_by_node[inst]:
1671
                continue
1672
              if vol['name'] in lv_by_node[inst][node]:
1673
                val = inst.name
1674
                break
1675
            else:
1676
              val = '-'
1677
          else:
1678
            raise errors.ParameterError(field)
1679
          node_output.append(str(val))
1680

    
1681
        output.append(node_output)
1682

    
1683
    return output
1684

    
1685

    
1686
class LUAddNode(LogicalUnit):
1687
  """Logical unit for adding node to the cluster.
1688

1689
  """
1690
  HPATH = "node-add"
1691
  HTYPE = constants.HTYPE_NODE
1692
  _OP_REQP = ["node_name"]
1693

    
1694
  def BuildHooksEnv(self):
1695
    """Build hooks env.
1696

1697
    This will run on all nodes before, and on all nodes + the new node after.
1698

1699
    """
1700
    env = {
1701
      "OP_TARGET": self.op.node_name,
1702
      "NODE_NAME": self.op.node_name,
1703
      "NODE_PIP": self.op.primary_ip,
1704
      "NODE_SIP": self.op.secondary_ip,
1705
      }
1706
    nodes_0 = self.cfg.GetNodeList()
1707
    nodes_1 = nodes_0 + [self.op.node_name, ]
1708
    return env, nodes_0, nodes_1
1709

    
1710
  def CheckPrereq(self):
1711
    """Check prerequisites.
1712

1713
    This checks:
1714
     - the new node is not already in the config
1715
     - it is resolvable
1716
     - its parameters (single/dual homed) matches the cluster
1717

1718
    Any errors are signalled by raising errors.OpPrereqError.
1719

1720
    """
1721
    node_name = self.op.node_name
1722
    cfg = self.cfg
1723

    
1724
    dns_data = utils.HostInfo(node_name)
1725

    
1726
    node = dns_data.name
1727
    primary_ip = self.op.primary_ip = dns_data.ip
1728
    secondary_ip = getattr(self.op, "secondary_ip", None)
1729
    if secondary_ip is None:
1730
      secondary_ip = primary_ip
1731
    if not utils.IsValidIP(secondary_ip):
1732
      raise errors.OpPrereqError("Invalid secondary IP given")
1733
    self.op.secondary_ip = secondary_ip
1734

    
1735
    node_list = cfg.GetNodeList()
1736
    if not self.op.readd and node in node_list:
1737
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1738
                                 node)
1739
    elif self.op.readd and node not in node_list:
1740
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1741

    
1742
    for existing_node_name in node_list:
1743
      existing_node = cfg.GetNodeInfo(existing_node_name)
1744

    
1745
      if self.op.readd and node == existing_node_name:
1746
        if (existing_node.primary_ip != primary_ip or
1747
            existing_node.secondary_ip != secondary_ip):
1748
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1749
                                     " address configuration as before")
1750
        continue
1751

    
1752
      if (existing_node.primary_ip == primary_ip or
1753
          existing_node.secondary_ip == primary_ip or
1754
          existing_node.primary_ip == secondary_ip or
1755
          existing_node.secondary_ip == secondary_ip):
1756
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1757
                                   " existing node %s" % existing_node.name)
1758

    
1759
    # check that the type of the node (single versus dual homed) is the
1760
    # same as for the master
1761
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1762
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1763
    newbie_singlehomed = secondary_ip == primary_ip
1764
    if master_singlehomed != newbie_singlehomed:
1765
      if master_singlehomed:
1766
        raise errors.OpPrereqError("The master has no private ip but the"
1767
                                   " new node has one")
1768
      else:
1769
        raise errors.OpPrereqError("The master has a private ip but the"
1770
                                   " new node doesn't have one")
1771

    
1772
    # checks reachablity
1773
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1774
      raise errors.OpPrereqError("Node not reachable by ping")
1775

    
1776
    if not newbie_singlehomed:
1777
      # check reachability from my secondary ip to newbie's secondary ip
1778
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1779
                           source=myself.secondary_ip):
1780
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1781
                                   " based ping to noded port")
1782

    
1783
    self.new_node = objects.Node(name=node,
1784
                                 primary_ip=primary_ip,
1785
                                 secondary_ip=secondary_ip)
1786

    
1787
  def Exec(self, feedback_fn):
1788
    """Adds the new node to the cluster.
1789

1790
    """
1791
    new_node = self.new_node
1792
    node = new_node.name
1793

    
1794
    # check connectivity
1795
    result = self.rpc.call_version([node])[node]
1796
    if result:
1797
      if constants.PROTOCOL_VERSION == result:
1798
        logging.info("Communication to node %s fine, sw version %s match",
1799
                     node, result)
1800
      else:
1801
        raise errors.OpExecError("Version mismatch master version %s,"
1802
                                 " node version %s" %
1803
                                 (constants.PROTOCOL_VERSION, result))
1804
    else:
1805
      raise errors.OpExecError("Cannot get version from the new node")
1806

    
1807
    # setup ssh on node
1808
    logging.info("Copy ssh key to node %s", node)
1809
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1810
    keyarray = []
1811
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1812
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1813
                priv_key, pub_key]
1814

    
1815
    for i in keyfiles:
1816
      f = open(i, 'r')
1817
      try:
1818
        keyarray.append(f.read())
1819
      finally:
1820
        f.close()
1821

    
1822
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1823
                                    keyarray[2],
1824
                                    keyarray[3], keyarray[4], keyarray[5])
1825

    
1826
    if not result:
1827
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1828

    
1829
    # Add node to our /etc/hosts, and add key to known_hosts
1830
    utils.AddHostToEtcHosts(new_node.name)
1831

    
1832
    if new_node.secondary_ip != new_node.primary_ip:
1833
      if not self.rpc.call_node_has_ip_address(new_node.name,
1834
                                               new_node.secondary_ip):
1835
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1836
                                 " you gave (%s). Please fix and re-run this"
1837
                                 " command." % new_node.secondary_ip)
1838

    
1839
    node_verify_list = [self.cfg.GetMasterNode()]
1840
    node_verify_param = {
1841
      'nodelist': [node],
1842
      # TODO: do a node-net-test as well?
1843
    }
1844

    
1845
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1846
                                       self.cfg.GetClusterName())
1847
    for verifier in node_verify_list:
1848
      if not result[verifier]:
1849
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1850
                                 " for remote verification" % verifier)
1851
      if result[verifier]['nodelist']:
1852
        for failed in result[verifier]['nodelist']:
1853
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1854
                      (verifier, result[verifier]['nodelist'][failed]))
1855
        raise errors.OpExecError("ssh/hostname verification failed.")
1856

    
1857
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1858
    # including the node just added
1859
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1860
    dist_nodes = self.cfg.GetNodeList()
1861
    if not self.op.readd:
1862
      dist_nodes.append(node)
1863
    if myself.name in dist_nodes:
1864
      dist_nodes.remove(myself.name)
1865

    
1866
    logging.debug("Copying hosts and known_hosts to all nodes")
1867
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1868
      result = self.rpc.call_upload_file(dist_nodes, fname)
1869
      for to_node in dist_nodes:
1870
        if not result[to_node]:
1871
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1872

    
1873
    to_copy = []
1874
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1875
      to_copy.append(constants.VNC_PASSWORD_FILE)
1876
    for fname in to_copy:
1877
      result = self.rpc.call_upload_file([node], fname)
1878
      if not result[node]:
1879
        logging.error("Could not copy file %s to node %s", fname, node)
1880

    
1881
    if self.op.readd:
1882
      self.context.ReaddNode(new_node)
1883
    else:
1884
      self.context.AddNode(new_node)
1885

    
1886

    
1887
class LUQueryClusterInfo(NoHooksLU):
1888
  """Query cluster configuration.
1889

1890
  """
1891
  _OP_REQP = []
1892
  REQ_MASTER = False
1893
  REQ_BGL = False
1894

    
1895
  def ExpandNames(self):
1896
    self.needed_locks = {}
1897

    
1898
  def CheckPrereq(self):
1899
    """No prerequsites needed for this LU.
1900

1901
    """
1902
    pass
1903

    
1904
  def Exec(self, feedback_fn):
1905
    """Return cluster config.
1906

1907
    """
1908
    cluster = self.cfg.GetClusterInfo()
1909
    result = {
1910
      "software_version": constants.RELEASE_VERSION,
1911
      "protocol_version": constants.PROTOCOL_VERSION,
1912
      "config_version": constants.CONFIG_VERSION,
1913
      "os_api_version": constants.OS_API_VERSION,
1914
      "export_version": constants.EXPORT_VERSION,
1915
      "architecture": (platform.architecture()[0], platform.machine()),
1916
      "name": cluster.cluster_name,
1917
      "master": cluster.master_node,
1918
      "hypervisor_type": cluster.hypervisor,
1919
      "enabled_hypervisors": cluster.enabled_hypervisors,
1920
      "hvparams": cluster.hvparams,
1921
      "beparams": cluster.beparams,
1922
      }
1923

    
1924
    return result
1925

    
1926

    
1927
class LUQueryConfigValues(NoHooksLU):
1928
  """Return configuration values.
1929

1930
  """
1931
  _OP_REQP = []
1932
  REQ_BGL = False
1933

    
1934
  def ExpandNames(self):
1935
    self.needed_locks = {}
1936

    
1937
    static_fields = ["cluster_name", "master_node", "drain_flag"]
1938
    _CheckOutputFields(static=static_fields,
1939
                       dynamic=[],
1940
                       selected=self.op.output_fields)
1941

    
1942
  def CheckPrereq(self):
1943
    """No prerequisites.
1944

1945
    """
1946
    pass
1947

    
1948
  def Exec(self, feedback_fn):
1949
    """Dump a representation of the cluster config to the standard output.
1950

1951
    """
1952
    values = []
1953
    for field in self.op.output_fields:
1954
      if field == "cluster_name":
1955
        entry = self.cfg.GetClusterName()
1956
      elif field == "master_node":
1957
        entry = self.cfg.GetMasterNode()
1958
      elif field == "drain_flag":
1959
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1960
      else:
1961
        raise errors.ParameterError(field)
1962
      values.append(entry)
1963
    return values
1964

    
1965

    
1966
class LUActivateInstanceDisks(NoHooksLU):
1967
  """Bring up an instance's disks.
1968

1969
  """
1970
  _OP_REQP = ["instance_name"]
1971
  REQ_BGL = False
1972

    
1973
  def ExpandNames(self):
1974
    self._ExpandAndLockInstance()
1975
    self.needed_locks[locking.LEVEL_NODE] = []
1976
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1977

    
1978
  def DeclareLocks(self, level):
1979
    if level == locking.LEVEL_NODE:
1980
      self._LockInstancesNodes()
1981

    
1982
  def CheckPrereq(self):
1983
    """Check prerequisites.
1984

1985
    This checks that the instance is in the cluster.
1986

1987
    """
1988
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1989
    assert self.instance is not None, \
1990
      "Cannot retrieve locked instance %s" % self.op.instance_name
1991

    
1992
  def Exec(self, feedback_fn):
1993
    """Activate the disks.
1994

1995
    """
1996
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1997
    if not disks_ok:
1998
      raise errors.OpExecError("Cannot activate block devices")
1999

    
2000
    return disks_info
2001

    
2002

    
2003
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2004
  """Prepare the block devices for an instance.
2005

2006
  This sets up the block devices on all nodes.
2007

2008
  Args:
2009
    instance: a ganeti.objects.Instance object
2010
    ignore_secondaries: if true, errors on secondary nodes won't result
2011
                        in an error return from the function
2012

2013
  Returns:
2014
    false if the operation failed
2015
    list of (host, instance_visible_name, node_visible_name) if the operation
2016
         suceeded with the mapping from node devices to instance devices
2017
  """
2018
  device_info = []
2019
  disks_ok = True
2020
  iname = instance.name
2021
  # With the two passes mechanism we try to reduce the window of
2022
  # opportunity for the race condition of switching DRBD to primary
2023
  # before handshaking occured, but we do not eliminate it
2024

    
2025
  # The proper fix would be to wait (with some limits) until the
2026
  # connection has been made and drbd transitions from WFConnection
2027
  # into any other network-connected state (Connected, SyncTarget,
2028
  # SyncSource, etc.)
2029

    
2030
  # 1st pass, assemble on all nodes in secondary mode
2031
  for inst_disk in instance.disks:
2032
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2033
      lu.cfg.SetDiskID(node_disk, node)
2034
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2035
      if not result:
2036
        logging.error("Could not prepare block device %s on node %s"
2037
                      " (is_primary=False, pass=1)", inst_disk.iv_name, node)
2038
        if not ignore_secondaries:
2039
          disks_ok = False
2040

    
2041
  # FIXME: race condition on drbd migration to primary
2042

    
2043
  # 2nd pass, do only the primary node
2044
  for inst_disk in instance.disks:
2045
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2046
      if node != instance.primary_node:
2047
        continue
2048
      lu.cfg.SetDiskID(node_disk, node)
2049
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2050
      if not result:
2051
        logging.error("Could not prepare block device %s on node %s"
2052
                      " (is_primary=True, pass=2)", inst_disk.iv_name, node)
2053
        disks_ok = False
2054
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2055

    
2056
  # leave the disks configured for the primary node
2057
  # this is a workaround that would be fixed better by
2058
  # improving the logical/physical id handling
2059
  for disk in instance.disks:
2060
    lu.cfg.SetDiskID(disk, instance.primary_node)
2061

    
2062
  return disks_ok, device_info
2063

    
2064

    
2065
def _StartInstanceDisks(lu, instance, force):
2066
  """Start the disks of an instance.
2067

2068
  """
2069
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2070
                                           ignore_secondaries=force)
2071
  if not disks_ok:
2072
    _ShutdownInstanceDisks(lu, instance)
2073
    if force is not None and not force:
2074
      logging.error("If the message above refers to a secondary node,"
2075
                    " you can retry the operation using '--force'.")
2076
    raise errors.OpExecError("Disk consistency error")
2077

    
2078

    
2079
class LUDeactivateInstanceDisks(NoHooksLU):
2080
  """Shutdown an instance's disks.
2081

2082
  """
2083
  _OP_REQP = ["instance_name"]
2084
  REQ_BGL = False
2085

    
2086
  def ExpandNames(self):
2087
    self._ExpandAndLockInstance()
2088
    self.needed_locks[locking.LEVEL_NODE] = []
2089
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2090

    
2091
  def DeclareLocks(self, level):
2092
    if level == locking.LEVEL_NODE:
2093
      self._LockInstancesNodes()
2094

    
2095
  def CheckPrereq(self):
2096
    """Check prerequisites.
2097

2098
    This checks that the instance is in the cluster.
2099

2100
    """
2101
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2102
    assert self.instance is not None, \
2103
      "Cannot retrieve locked instance %s" % self.op.instance_name
2104

    
2105
  def Exec(self, feedback_fn):
2106
    """Deactivate the disks
2107

2108
    """
2109
    instance = self.instance
2110
    _SafeShutdownInstanceDisks(self, instance)
2111

    
2112

    
2113
def _SafeShutdownInstanceDisks(lu, instance):
2114
  """Shutdown block devices of an instance.
2115

2116
  This function checks if an instance is running, before calling
2117
  _ShutdownInstanceDisks.
2118

2119
  """
2120
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2121
                                      [instance.hypervisor])
2122
  ins_l = ins_l[instance.primary_node]
2123
  if not type(ins_l) is list:
2124
    raise errors.OpExecError("Can't contact node '%s'" %
2125
                             instance.primary_node)
2126

    
2127
  if instance.name in ins_l:
2128
    raise errors.OpExecError("Instance is running, can't shutdown"
2129
                             " block devices.")
2130

    
2131
  _ShutdownInstanceDisks(lu, instance)
2132

    
2133

    
2134
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2135
  """Shutdown block devices of an instance.
2136

2137
  This does the shutdown on all nodes of the instance.
2138

2139
  If the ignore_primary is false, errors on the primary node are
2140
  ignored.
2141

2142
  """
2143
  result = True
2144
  for disk in instance.disks:
2145
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2146
      lu.cfg.SetDiskID(top_disk, node)
2147
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2148
        logging.error("Could not shutdown block device %s on node %s",
2149
                      disk.iv_name, node)
2150
        if not ignore_primary or node != instance.primary_node:
2151
          result = False
2152
  return result
2153

    
2154

    
2155
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2156
  """Checks if a node has enough free memory.
2157

2158
  This function check if a given node has the needed amount of free
2159
  memory. In case the node has less memory or we cannot get the
2160
  information from the node, this function raise an OpPrereqError
2161
  exception.
2162

2163
  @type lu: C{LogicalUnit}
2164
  @param lu: a logical unit from which we get configuration data
2165
  @type node: C{str}
2166
  @param node: the node to check
2167
  @type reason: C{str}
2168
  @param reason: string to use in the error message
2169
  @type requested: C{int}
2170
  @param requested: the amount of memory in MiB to check for
2171
  @type hypervisor: C{str}
2172
  @param hypervisor: the hypervisor to ask for memory stats
2173
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2174
      we cannot check the node
2175

2176
  """
2177
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2178
  if not nodeinfo or not isinstance(nodeinfo, dict):
2179
    raise errors.OpPrereqError("Could not contact node %s for resource"
2180
                             " information" % (node,))
2181

    
2182
  free_mem = nodeinfo[node].get('memory_free')
2183
  if not isinstance(free_mem, int):
2184
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2185
                             " was '%s'" % (node, free_mem))
2186
  if requested > free_mem:
2187
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2188
                             " needed %s MiB, available %s MiB" %
2189
                             (node, reason, requested, free_mem))
2190

    
2191

    
2192
class LUStartupInstance(LogicalUnit):
2193
  """Starts an instance.
2194

2195
  """
2196
  HPATH = "instance-start"
2197
  HTYPE = constants.HTYPE_INSTANCE
2198
  _OP_REQP = ["instance_name", "force"]
2199
  REQ_BGL = False
2200

    
2201
  def ExpandNames(self):
2202
    self._ExpandAndLockInstance()
2203
    self.needed_locks[locking.LEVEL_NODE] = []
2204
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2205

    
2206
  def DeclareLocks(self, level):
2207
    if level == locking.LEVEL_NODE:
2208
      self._LockInstancesNodes()
2209

    
2210
  def BuildHooksEnv(self):
2211
    """Build hooks env.
2212

2213
    This runs on master, primary and secondary nodes of the instance.
2214

2215
    """
2216
    env = {
2217
      "FORCE": self.op.force,
2218
      }
2219
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2220
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2221
          list(self.instance.secondary_nodes))
2222
    return env, nl, nl
2223

    
2224
  def CheckPrereq(self):
2225
    """Check prerequisites.
2226

2227
    This checks that the instance is in the cluster.
2228

2229
    """
2230
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2231
    assert self.instance is not None, \
2232
      "Cannot retrieve locked instance %s" % self.op.instance_name
2233

    
2234
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2235
    # check bridges existance
2236
    _CheckInstanceBridgesExist(self, instance)
2237

    
2238
    _CheckNodeFreeMemory(self, instance.primary_node,
2239
                         "starting instance %s" % instance.name,
2240
                         bep[constants.BE_MEMORY], instance.hypervisor)
2241

    
2242
  def Exec(self, feedback_fn):
2243
    """Start the instance.
2244

2245
    """
2246
    instance = self.instance
2247
    force = self.op.force
2248
    extra_args = getattr(self.op, "extra_args", "")
2249

    
2250
    self.cfg.MarkInstanceUp(instance.name)
2251

    
2252
    node_current = instance.primary_node
2253

    
2254
    _StartInstanceDisks(self, instance, force)
2255

    
2256
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2257
      _ShutdownInstanceDisks(self, instance)
2258
      raise errors.OpExecError("Could not start instance")
2259

    
2260

    
2261
class LURebootInstance(LogicalUnit):
2262
  """Reboot an instance.
2263

2264
  """
2265
  HPATH = "instance-reboot"
2266
  HTYPE = constants.HTYPE_INSTANCE
2267
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2268
  REQ_BGL = False
2269

    
2270
  def ExpandNames(self):
2271
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2272
                                   constants.INSTANCE_REBOOT_HARD,
2273
                                   constants.INSTANCE_REBOOT_FULL]:
2274
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2275
                                  (constants.INSTANCE_REBOOT_SOFT,
2276
                                   constants.INSTANCE_REBOOT_HARD,
2277
                                   constants.INSTANCE_REBOOT_FULL))
2278
    self._ExpandAndLockInstance()
2279
    self.needed_locks[locking.LEVEL_NODE] = []
2280
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2281

    
2282
  def DeclareLocks(self, level):
2283
    if level == locking.LEVEL_NODE:
2284
      primary_only = not constants.INSTANCE_REBOOT_FULL
2285
      self._LockInstancesNodes(primary_only=primary_only)
2286

    
2287
  def BuildHooksEnv(self):
2288
    """Build hooks env.
2289

2290
    This runs on master, primary and secondary nodes of the instance.
2291

2292
    """
2293
    env = {
2294
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2295
      }
2296
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2297
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2298
          list(self.instance.secondary_nodes))
2299
    return env, nl, nl
2300

    
2301
  def CheckPrereq(self):
2302
    """Check prerequisites.
2303

2304
    This checks that the instance is in the cluster.
2305

2306
    """
2307
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2308
    assert self.instance is not None, \
2309
      "Cannot retrieve locked instance %s" % self.op.instance_name
2310

    
2311
    # check bridges existance
2312
    _CheckInstanceBridgesExist(self, instance)
2313

    
2314
  def Exec(self, feedback_fn):
2315
    """Reboot the instance.
2316

2317
    """
2318
    instance = self.instance
2319
    ignore_secondaries = self.op.ignore_secondaries
2320
    reboot_type = self.op.reboot_type
2321
    extra_args = getattr(self.op, "extra_args", "")
2322

    
2323
    node_current = instance.primary_node
2324

    
2325
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2326
                       constants.INSTANCE_REBOOT_HARD]:
2327
      if not self.rpc.call_instance_reboot(node_current, instance,
2328
                                           reboot_type, extra_args):
2329
        raise errors.OpExecError("Could not reboot instance")
2330
    else:
2331
      if not self.rpc.call_instance_shutdown(node_current, instance):
2332
        raise errors.OpExecError("could not shutdown instance for full reboot")
2333
      _ShutdownInstanceDisks(self, instance)
2334
      _StartInstanceDisks(self, instance, ignore_secondaries)
2335
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2336
        _ShutdownInstanceDisks(self, instance)
2337
        raise errors.OpExecError("Could not start instance for full reboot")
2338

    
2339
    self.cfg.MarkInstanceUp(instance.name)
2340

    
2341

    
2342
class LUShutdownInstance(LogicalUnit):
2343
  """Shutdown an instance.
2344

2345
  """
2346
  HPATH = "instance-stop"
2347
  HTYPE = constants.HTYPE_INSTANCE
2348
  _OP_REQP = ["instance_name"]
2349
  REQ_BGL = False
2350

    
2351
  def ExpandNames(self):
2352
    self._ExpandAndLockInstance()
2353
    self.needed_locks[locking.LEVEL_NODE] = []
2354
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2355

    
2356
  def DeclareLocks(self, level):
2357
    if level == locking.LEVEL_NODE:
2358
      self._LockInstancesNodes()
2359

    
2360
  def BuildHooksEnv(self):
2361
    """Build hooks env.
2362

2363
    This runs on master, primary and secondary nodes of the instance.
2364

2365
    """
2366
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2367
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2368
          list(self.instance.secondary_nodes))
2369
    return env, nl, nl
2370

    
2371
  def CheckPrereq(self):
2372
    """Check prerequisites.
2373

2374
    This checks that the instance is in the cluster.
2375

2376
    """
2377
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2378
    assert self.instance is not None, \
2379
      "Cannot retrieve locked instance %s" % self.op.instance_name
2380

    
2381
  def Exec(self, feedback_fn):
2382
    """Shutdown the instance.
2383

2384
    """
2385
    instance = self.instance
2386
    node_current = instance.primary_node
2387
    self.cfg.MarkInstanceDown(instance.name)
2388
    if not self.rpc.call_instance_shutdown(node_current, instance):
2389
      logging.error("Could not shutdown instance")
2390

    
2391
    _ShutdownInstanceDisks(self, instance)
2392

    
2393

    
2394
class LUReinstallInstance(LogicalUnit):
2395
  """Reinstall an instance.
2396

2397
  """
2398
  HPATH = "instance-reinstall"
2399
  HTYPE = constants.HTYPE_INSTANCE
2400
  _OP_REQP = ["instance_name"]
2401
  REQ_BGL = False
2402

    
2403
  def ExpandNames(self):
2404
    self._ExpandAndLockInstance()
2405
    self.needed_locks[locking.LEVEL_NODE] = []
2406
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2407

    
2408
  def DeclareLocks(self, level):
2409
    if level == locking.LEVEL_NODE:
2410
      self._LockInstancesNodes()
2411

    
2412
  def BuildHooksEnv(self):
2413
    """Build hooks env.
2414

2415
    This runs on master, primary and secondary nodes of the instance.
2416

2417
    """
2418
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2419
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2420
          list(self.instance.secondary_nodes))
2421
    return env, nl, nl
2422

    
2423
  def CheckPrereq(self):
2424
    """Check prerequisites.
2425

2426
    This checks that the instance is in the cluster and is not running.
2427

2428
    """
2429
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2430
    assert instance is not None, \
2431
      "Cannot retrieve locked instance %s" % self.op.instance_name
2432

    
2433
    if instance.disk_template == constants.DT_DISKLESS:
2434
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2435
                                 self.op.instance_name)
2436
    if instance.status != "down":
2437
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2438
                                 self.op.instance_name)
2439
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2440
                                              instance.name,
2441
                                              instance.hypervisor)
2442
    if remote_info:
2443
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2444
                                 (self.op.instance_name,
2445
                                  instance.primary_node))
2446

    
2447
    self.op.os_type = getattr(self.op, "os_type", None)
2448
    if self.op.os_type is not None:
2449
      # OS verification
2450
      pnode = self.cfg.GetNodeInfo(
2451
        self.cfg.ExpandNodeName(instance.primary_node))
2452
      if pnode is None:
2453
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2454
                                   self.op.pnode)
2455
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2456
      if not os_obj:
2457
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2458
                                   " primary node"  % self.op.os_type)
2459

    
2460
    self.instance = instance
2461

    
2462
  def Exec(self, feedback_fn):
2463
    """Reinstall the instance.
2464

2465
    """
2466
    inst = self.instance
2467

    
2468
    if self.op.os_type is not None:
2469
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2470
      inst.os = self.op.os_type
2471
      self.cfg.Update(inst)
2472

    
2473
    _StartInstanceDisks(self, inst, None)
2474
    try:
2475
      feedback_fn("Running the instance OS create scripts...")
2476
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2477
                                           "sda", "sdb"):
2478
        raise errors.OpExecError("Could not install OS for instance %s"
2479
                                 " on node %s" %
2480
                                 (inst.name, inst.primary_node))
2481
    finally:
2482
      _ShutdownInstanceDisks(self, inst)
2483

    
2484

    
2485
class LURenameInstance(LogicalUnit):
2486
  """Rename an instance.
2487

2488
  """
2489
  HPATH = "instance-rename"
2490
  HTYPE = constants.HTYPE_INSTANCE
2491
  _OP_REQP = ["instance_name", "new_name"]
2492

    
2493
  def BuildHooksEnv(self):
2494
    """Build hooks env.
2495

2496
    This runs on master, primary and secondary nodes of the instance.
2497

2498
    """
2499
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2500
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2501
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2502
          list(self.instance.secondary_nodes))
2503
    return env, nl, nl
2504

    
2505
  def CheckPrereq(self):
2506
    """Check prerequisites.
2507

2508
    This checks that the instance is in the cluster and is not running.
2509

2510
    """
2511
    instance = self.cfg.GetInstanceInfo(
2512
      self.cfg.ExpandInstanceName(self.op.instance_name))
2513
    if instance is None:
2514
      raise errors.OpPrereqError("Instance '%s' not known" %
2515
                                 self.op.instance_name)
2516
    if instance.status != "down":
2517
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2518
                                 self.op.instance_name)
2519
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2520
                                              instance.name,
2521
                                              instance.hypervisor)
2522
    if remote_info:
2523
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2524
                                 (self.op.instance_name,
2525
                                  instance.primary_node))
2526
    self.instance = instance
2527

    
2528
    # new name verification
2529
    name_info = utils.HostInfo(self.op.new_name)
2530

    
2531
    self.op.new_name = new_name = name_info.name
2532
    instance_list = self.cfg.GetInstanceList()
2533
    if new_name in instance_list:
2534
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2535
                                 new_name)
2536

    
2537
    if not getattr(self.op, "ignore_ip", False):
2538
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2539
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2540
                                   (name_info.ip, new_name))
2541

    
2542

    
2543
  def Exec(self, feedback_fn):
2544
    """Reinstall the instance.
2545

2546
    """
2547
    inst = self.instance
2548
    old_name = inst.name
2549

    
2550
    if inst.disk_template == constants.DT_FILE:
2551
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2552

    
2553
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2554
    # Change the instance lock. This is definitely safe while we hold the BGL
2555
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2556
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2557

    
2558
    # re-read the instance from the configuration after rename
2559
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2560

    
2561
    if inst.disk_template == constants.DT_FILE:
2562
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2563
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2564
                                                     old_file_storage_dir,
2565
                                                     new_file_storage_dir)
2566

    
2567
      if not result:
2568
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2569
                                 " directory '%s' to '%s' (but the instance"
2570
                                 " has been renamed in Ganeti)" % (
2571
                                 inst.primary_node, old_file_storage_dir,
2572
                                 new_file_storage_dir))
2573

    
2574
      if not result[0]:
2575
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2576
                                 " (but the instance has been renamed in"
2577
                                 " Ganeti)" % (old_file_storage_dir,
2578
                                               new_file_storage_dir))
2579

    
2580
    _StartInstanceDisks(self, inst, None)
2581
    try:
2582
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2583
                                               old_name):
2584
        msg = ("Could not run OS rename script for instance %s on node %s"
2585
               " (but the instance has been renamed in Ganeti)" %
2586
               (inst.name, inst.primary_node))
2587
        logging.error(msg)
2588
    finally:
2589
      _ShutdownInstanceDisks(self, inst)
2590

    
2591

    
2592
class LURemoveInstance(LogicalUnit):
2593
  """Remove an instance.
2594

2595
  """
2596
  HPATH = "instance-remove"
2597
  HTYPE = constants.HTYPE_INSTANCE
2598
  _OP_REQP = ["instance_name", "ignore_failures"]
2599
  REQ_BGL = False
2600

    
2601
  def ExpandNames(self):
2602
    self._ExpandAndLockInstance()
2603
    self.needed_locks[locking.LEVEL_NODE] = []
2604
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2605

    
2606
  def DeclareLocks(self, level):
2607
    if level == locking.LEVEL_NODE:
2608
      self._LockInstancesNodes()
2609

    
2610
  def BuildHooksEnv(self):
2611
    """Build hooks env.
2612

2613
    This runs on master, primary and secondary nodes of the instance.
2614

2615
    """
2616
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2617
    nl = [self.cfg.GetMasterNode()]
2618
    return env, nl, nl
2619

    
2620
  def CheckPrereq(self):
2621
    """Check prerequisites.
2622

2623
    This checks that the instance is in the cluster.
2624

2625
    """
2626
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2627
    assert self.instance is not None, \
2628
      "Cannot retrieve locked instance %s" % self.op.instance_name
2629

    
2630
  def Exec(self, feedback_fn):
2631
    """Remove the instance.
2632

2633
    """
2634
    instance = self.instance
2635
    logging.info("Shutting down instance %s on node %s",
2636
                 instance.name, instance.primary_node)
2637

    
2638
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2639
      if self.op.ignore_failures:
2640
        feedback_fn("Warning: can't shutdown instance")
2641
      else:
2642
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2643
                                 (instance.name, instance.primary_node))
2644

    
2645
    logging.info("Removing block devices for instance %s", instance.name)
2646

    
2647
    if not _RemoveDisks(self, instance):
2648
      if self.op.ignore_failures:
2649
        feedback_fn("Warning: can't remove instance's disks")
2650
      else:
2651
        raise errors.OpExecError("Can't remove instance's disks")
2652

    
2653
    logging.info("Removing instance %s out of cluster config", instance.name)
2654

    
2655
    self.cfg.RemoveInstance(instance.name)
2656
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2657

    
2658

    
2659
class LUQueryInstances(NoHooksLU):
2660
  """Logical unit for querying instances.
2661

2662
  """
2663
  _OP_REQP = ["output_fields", "names"]
2664
  REQ_BGL = False
2665

    
2666
  def ExpandNames(self):
2667
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2668
    hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2669
    bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2670
    self.static_fields = frozenset([
2671
      "name", "os", "pnode", "snodes",
2672
      "admin_state", "admin_ram",
2673
      "disk_template", "ip", "mac", "bridge",
2674
      "sda_size", "sdb_size", "vcpus", "tags",
2675
      "network_port",
2676
      "serial_no", "hypervisor", "hvparams",
2677
      ] + hvp + bep)
2678

    
2679
    _CheckOutputFields(static=self.static_fields,
2680
                       dynamic=self.dynamic_fields,
2681
                       selected=self.op.output_fields)
2682

    
2683
    self.needed_locks = {}
2684
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2685
    self.share_locks[locking.LEVEL_NODE] = 1
2686

    
2687
    if self.op.names:
2688
      self.wanted = _GetWantedInstances(self, self.op.names)
2689
    else:
2690
      self.wanted = locking.ALL_SET
2691

    
2692
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2693
    if self.do_locking:
2694
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2695
      self.needed_locks[locking.LEVEL_NODE] = []
2696
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2697

    
2698
  def DeclareLocks(self, level):
2699
    if level == locking.LEVEL_NODE and self.do_locking:
2700
      self._LockInstancesNodes()
2701

    
2702
  def CheckPrereq(self):
2703
    """Check prerequisites.
2704

2705
    """
2706
    pass
2707

    
2708
  def Exec(self, feedback_fn):
2709
    """Computes the list of nodes and their attributes.
2710

2711
    """
2712
    all_info = self.cfg.GetAllInstancesInfo()
2713
    if self.do_locking:
2714
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2715
    elif self.wanted != locking.ALL_SET:
2716
      instance_names = self.wanted
2717
      missing = set(instance_names).difference(all_info.keys())
2718
      if missing:
2719
        raise errors.OpExecError(
2720
          "Some instances were removed before retrieving their data: %s"
2721
          % missing)
2722
    else:
2723
      instance_names = all_info.keys()
2724

    
2725
    instance_names = utils.NiceSort(instance_names)
2726
    instance_list = [all_info[iname] for iname in instance_names]
2727

    
2728
    # begin data gathering
2729

    
2730
    nodes = frozenset([inst.primary_node for inst in instance_list])
2731
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2732

    
2733
    bad_nodes = []
2734
    if self.dynamic_fields.intersection(self.op.output_fields):
2735
      live_data = {}
2736
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2737
      for name in nodes:
2738
        result = node_data[name]
2739
        if result:
2740
          live_data.update(result)
2741
        elif result == False:
2742
          bad_nodes.append(name)
2743
        # else no instance is alive
2744
    else:
2745
      live_data = dict([(name, {}) for name in instance_names])
2746

    
2747
    # end data gathering
2748

    
2749
    HVPREFIX = "hv/"
2750
    BEPREFIX = "be/"
2751
    output = []
2752
    for instance in instance_list:
2753
      iout = []
2754
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2755
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2756
      for field in self.op.output_fields:
2757
        if field == "name":
2758
          val = instance.name
2759
        elif field == "os":
2760
          val = instance.os
2761
        elif field == "pnode":
2762
          val = instance.primary_node
2763
        elif field == "snodes":
2764
          val = list(instance.secondary_nodes)
2765
        elif field == "admin_state":
2766
          val = (instance.status != "down")
2767
        elif field == "oper_state":
2768
          if instance.primary_node in bad_nodes:
2769
            val = None
2770
          else:
2771
            val = bool(live_data.get(instance.name))
2772
        elif field == "status":
2773
          if instance.primary_node in bad_nodes:
2774
            val = "ERROR_nodedown"
2775
          else:
2776
            running = bool(live_data.get(instance.name))
2777
            if running:
2778
              if instance.status != "down":
2779
                val = "running"
2780
              else:
2781
                val = "ERROR_up"
2782
            else:
2783
              if instance.status != "down":
2784
                val = "ERROR_down"
2785
              else:
2786
                val = "ADMIN_down"
2787
        elif field == "oper_ram":
2788
          if instance.primary_node in bad_nodes:
2789
            val = None
2790
          elif instance.name in live_data:
2791
            val = live_data[instance.name].get("memory", "?")
2792
          else:
2793
            val = "-"
2794
        elif field == "disk_template":
2795
          val = instance.disk_template
2796
        elif field == "ip":
2797
          val = instance.nics[0].ip
2798
        elif field == "bridge":
2799
          val = instance.nics[0].bridge
2800
        elif field == "mac":
2801
          val = instance.nics[0].mac
2802
        elif field == "sda_size" or field == "sdb_size":
2803
          disk = instance.FindDisk(field[:3])
2804
          if disk is None:
2805
            val = None
2806
          else:
2807
            val = disk.size
2808
        elif field == "tags":
2809
          val = list(instance.GetTags())
2810
        elif field == "serial_no":
2811
          val = instance.serial_no
2812
        elif field == "network_port":
2813
          val = instance.network_port
2814
        elif field == "hypervisor":
2815
          val = instance.hypervisor
2816
        elif field == "hvparams":
2817
          val = i_hv
2818
        elif (field.startswith(HVPREFIX) and
2819
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2820
          val = i_hv.get(field[len(HVPREFIX):], None)
2821
        elif field == "beparams":
2822
          val = i_be
2823
        elif (field.startswith(BEPREFIX) and
2824
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2825
          val = i_be.get(field[len(BEPREFIX):], None)
2826
        else:
2827
          raise errors.ParameterError(field)
2828
        iout.append(val)
2829
      output.append(iout)
2830

    
2831
    return output
2832

    
2833

    
2834
class LUFailoverInstance(LogicalUnit):
2835
  """Failover an instance.
2836

2837
  """
2838
  HPATH = "instance-failover"
2839
  HTYPE = constants.HTYPE_INSTANCE
2840
  _OP_REQP = ["instance_name", "ignore_consistency"]
2841
  REQ_BGL = False
2842

    
2843
  def ExpandNames(self):
2844
    self._ExpandAndLockInstance()
2845
    self.needed_locks[locking.LEVEL_NODE] = []
2846
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2847

    
2848
  def DeclareLocks(self, level):
2849
    if level == locking.LEVEL_NODE:
2850
      self._LockInstancesNodes()
2851

    
2852
  def BuildHooksEnv(self):
2853
    """Build hooks env.
2854

2855
    This runs on master, primary and secondary nodes of the instance.
2856

2857
    """
2858
    env = {
2859
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2860
      }
2861
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2862
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2863
    return env, nl, nl
2864

    
2865
  def CheckPrereq(self):
2866
    """Check prerequisites.
2867

2868
    This checks that the instance is in the cluster.
2869

2870
    """
2871
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2872
    assert self.instance is not None, \
2873
      "Cannot retrieve locked instance %s" % self.op.instance_name
2874

    
2875
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2876
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2877
      raise errors.OpPrereqError("Instance's disk layout is not"
2878
                                 " network mirrored, cannot failover.")
2879

    
2880
    secondary_nodes = instance.secondary_nodes
2881
    if not secondary_nodes:
2882
      raise errors.ProgrammerError("no secondary node but using "
2883
                                   "a mirrored disk template")
2884

    
2885
    target_node = secondary_nodes[0]
2886
    # check memory requirements on the secondary node
2887
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2888
                         instance.name, bep[constants.BE_MEMORY],
2889
                         instance.hypervisor)
2890

    
2891
    # check bridge existance
2892
    brlist = [nic.bridge for nic in instance.nics]
2893
    if not self.rpc.call_bridges_exist(target_node, brlist):
2894
      raise errors.OpPrereqError("One or more target bridges %s does not"
2895
                                 " exist on destination node '%s'" %
2896
                                 (brlist, target_node))
2897

    
2898
  def Exec(self, feedback_fn):
2899
    """Failover an instance.
2900

2901
    The failover is done by shutting it down on its present node and
2902
    starting it on the secondary.
2903

2904
    """
2905
    instance = self.instance
2906

    
2907
    source_node = instance.primary_node
2908
    target_node = instance.secondary_nodes[0]
2909

    
2910
    feedback_fn("* checking disk consistency between source and target")
2911
    for dev in instance.disks:
2912
      # for drbd, these are drbd over lvm
2913
      if not _CheckDiskConsistency(self, dev, target_node, False):
2914
        if instance.status == "up" and not self.op.ignore_consistency:
2915
          raise errors.OpExecError("Disk %s is degraded on target node,"
2916
                                   " aborting failover." % dev.iv_name)
2917

    
2918
    feedback_fn("* shutting down instance on source node")
2919
    logging.info("Shutting down instance %s on node %s",
2920
                 instance.name, source_node)
2921

    
2922
    if not self.rpc.call_instance_shutdown(source_node, instance):
2923
      if self.op.ignore_consistency:
2924
        logging.error("Could not shutdown instance %s on node %s. Proceeding"
2925
                      " anyway. Please make sure node %s is down",
2926
                      instance.name, source_node, source_node)
2927
      else:
2928
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2929
                                 (instance.name, source_node))
2930

    
2931
    feedback_fn("* deactivating the instance's disks on source node")
2932
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2933
      raise errors.OpExecError("Can't shut down the instance's disks.")
2934

    
2935
    instance.primary_node = target_node
2936
    # distribute new instance config to the other nodes
2937
    self.cfg.Update(instance)
2938

    
2939
    # Only start the instance if it's marked as up
2940
    if instance.status == "up":
2941
      feedback_fn("* activating the instance's disks on target node")
2942
      logging.info("Starting instance %s on node %s",
2943
                   instance.name, target_node)
2944

    
2945
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2946
                                               ignore_secondaries=True)
2947
      if not disks_ok:
2948
        _ShutdownInstanceDisks(self, instance)
2949
        raise errors.OpExecError("Can't activate the instance's disks")
2950

    
2951
      feedback_fn("* starting the instance on the target node")
2952
      if not self.rpc.call_instance_start(target_node, instance, None):
2953
        _ShutdownInstanceDisks(self, instance)
2954
        raise errors.OpExecError("Could not start instance %s on node %s." %
2955
                                 (instance.name, target_node))
2956

    
2957

    
2958
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2959
  """Create a tree of block devices on the primary node.
2960

2961
  This always creates all devices.
2962

2963
  """
2964
  if device.children:
2965
    for child in device.children:
2966
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2967
        return False
2968

    
2969
  lu.cfg.SetDiskID(device, node)
2970
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2971
                                       instance.name, True, info)
2972
  if not new_id:
2973
    return False
2974
  if device.physical_id is None:
2975
    device.physical_id = new_id
2976
  return True
2977

    
2978

    
2979
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2980
  """Create a tree of block devices on a secondary node.
2981

2982
  If this device type has to be created on secondaries, create it and
2983
  all its children.
2984

2985
  If not, just recurse to children keeping the same 'force' value.
2986

2987
  """
2988
  if device.CreateOnSecondary():
2989
    force = True
2990
  if device.children:
2991
    for child in device.children:
2992
      if not _CreateBlockDevOnSecondary(lu, node, instance,
2993
                                        child, force, info):
2994
        return False
2995

    
2996
  if not force:
2997
    return True
2998
  lu.cfg.SetDiskID(device, node)
2999
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3000
                                       instance.name, False, info)
3001
  if not new_id:
3002
    return False
3003
  if device.physical_id is None:
3004
    device.physical_id = new_id
3005
  return True
3006

    
3007

    
3008
def _GenerateUniqueNames(lu, exts):
3009
  """Generate a suitable LV name.
3010

3011
  This will generate a logical volume name for the given instance.
3012

3013
  """
3014
  results = []
3015
  for val in exts:
3016
    new_id = lu.cfg.GenerateUniqueID()
3017
    results.append("%s%s" % (new_id, val))
3018
  return results
3019

    
3020

    
3021
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3022
                         p_minor, s_minor):
3023
  """Generate a drbd8 device complete with its children.
3024

3025
  """
3026
  port = lu.cfg.AllocatePort()
3027
  vgname = lu.cfg.GetVGName()
3028
  shared_secret = lu.cfg.GenerateDRBDSecret()
3029
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3030
                          logical_id=(vgname, names[0]))
3031
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3032
                          logical_id=(vgname, names[1]))
3033
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3034
                          logical_id=(primary, secondary, port,
3035
                                      p_minor, s_minor,
3036
                                      shared_secret),
3037
                          children=[dev_data, dev_meta],
3038
                          iv_name=iv_name)
3039
  return drbd_dev
3040

    
3041

    
3042
def _GenerateDiskTemplate(lu, template_name,
3043
                          instance_name, primary_node,
3044
                          secondary_nodes, disk_sz, swap_sz,
3045
                          file_storage_dir, file_driver):
3046
  """Generate the entire disk layout for a given template type.
3047

3048
  """
3049
  #TODO: compute space requirements
3050

    
3051
  vgname = lu.cfg.GetVGName()
3052
  if template_name == constants.DT_DISKLESS:
3053
    disks = []
3054
  elif template_name == constants.DT_PLAIN:
3055
    if len(secondary_nodes) != 0:
3056
      raise errors.ProgrammerError("Wrong template configuration")
3057

    
3058
    names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3059
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3060
                           logical_id=(vgname, names[0]),
3061
                           iv_name = "sda")
3062
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3063
                           logical_id=(vgname, names[1]),
3064
                           iv_name = "sdb")
3065
    disks = [sda_dev, sdb_dev]
3066
  elif template_name == constants.DT_DRBD8:
3067
    if len(secondary_nodes) != 1:
3068
      raise errors.ProgrammerError("Wrong template configuration")
3069
    remote_node = secondary_nodes[0]
3070
    (minor_pa, minor_pb,
3071
     minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3072
      [primary_node, primary_node, remote_node, remote_node], instance_name)
3073

    
3074
    names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3075
                                      ".sdb_data", ".sdb_meta"])
3076
    drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3077
                                        disk_sz, names[0:2], "sda",
3078
                                        minor_pa, minor_sa)
3079
    drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3080
                                        swap_sz, names[2:4], "sdb",
3081
                                        minor_pb, minor_sb)
3082
    disks = [drbd_sda_dev, drbd_sdb_dev]
3083
  elif template_name == constants.DT_FILE:
3084
    if len(secondary_nodes) != 0:
3085
      raise errors.ProgrammerError("Wrong template configuration")
3086

    
3087
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3088
                                iv_name="sda", logical_id=(file_driver,
3089
                                "%s/sda" % file_storage_dir))
3090
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3091
                                iv_name="sdb", logical_id=(file_driver,
3092
                                "%s/sdb" % file_storage_dir))
3093
    disks = [file_sda_dev, file_sdb_dev]
3094
  else:
3095
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3096
  return disks
3097

    
3098

    
3099
def _GetInstanceInfoText(instance):
3100
  """Compute that text that should be added to the disk's metadata.
3101

3102
  """
3103
  return "originstname+%s" % instance.name
3104

    
3105

    
3106
def _CreateDisks(lu, instance):
3107
  """Create all disks for an instance.
3108

3109
  This abstracts away some work from AddInstance.
3110

3111
  Args:
3112
    instance: the instance object
3113

3114
  Returns:
3115
    True or False showing the success of the creation process
3116

3117
  """
3118
  info = _GetInstanceInfoText(instance)
3119

    
3120
  if instance.disk_template == constants.DT_FILE:
3121
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3122
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3123
                                                 file_storage_dir)
3124

    
3125
    if not result:
3126
      logging.error("Could not connect to node '%s'", instance.primary_node)
3127
      return False
3128

    
3129
    if not result[0]:
3130
      logging.error("Failed to create directory '%s'", file_storage_dir)
3131
      return False
3132

    
3133
  for device in instance.disks:
3134
    logging.info("Creating volume %s for instance %s",
3135
                 device.iv_name, instance.name)
3136
    #HARDCODE
3137
    for secondary_node in instance.secondary_nodes:
3138
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3139
                                        device, False, info):
3140
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3141
                      device.iv_name, device, secondary_node)
3142
        return False
3143
    #HARDCODE
3144
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3145
                                    instance, device, info):
3146
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3147
      return False
3148

    
3149
  return True
3150

    
3151

    
3152
def _RemoveDisks(lu, instance):
3153
  """Remove all disks for an instance.
3154

3155
  This abstracts away some work from `AddInstance()` and
3156
  `RemoveInstance()`. Note that in case some of the devices couldn't
3157
  be removed, the removal will continue with the other ones (compare
3158
  with `_CreateDisks()`).
3159

3160
  Args:
3161
    instance: the instance object
3162

3163
  Returns:
3164
    True or False showing the success of the removal proces
3165

3166
  """
3167
  logging.info("Removing block devices for instance %s", instance.name)
3168

    
3169
  result = True
3170
  for device in instance.disks:
3171
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3172
      lu.cfg.SetDiskID(disk, node)
3173
      if not lu.rpc.call_blockdev_remove(node, disk):
3174
        logging.error("Could not remove block device %s on node %s,"
3175
                      " continuing anyway", device.iv_name, node)
3176
        result = False
3177

    
3178
  if instance.disk_template == constants.DT_FILE:
3179
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3180
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3181
                                               file_storage_dir):
3182
      logging.error("Could not remove directory '%s'", file_storage_dir)
3183
      result = False
3184

    
3185
  return result
3186

    
3187

    
3188
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3189
  """Compute disk size requirements in the volume group
3190

3191
  This is currently hard-coded for the two-drive layout.
3192

3193
  """
3194
  # Required free disk space as a function of disk and swap space
3195
  req_size_dict = {
3196
    constants.DT_DISKLESS: None,
3197
    constants.DT_PLAIN: disk_size + swap_size,
3198
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3199
    constants.DT_DRBD8: disk_size + swap_size + 256,
3200
    constants.DT_FILE: None,
3201
  }
3202

    
3203
  if disk_template not in req_size_dict:
3204
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3205
                                 " is unknown" %  disk_template)
3206

    
3207
  return req_size_dict[disk_template]
3208

    
3209

    
3210
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3211
  """Hypervisor parameter validation.
3212

3213
  This function abstract the hypervisor parameter validation to be
3214
  used in both instance create and instance modify.
3215

3216
  @type lu: L{LogicalUnit}
3217
  @param lu: the logical unit for which we check
3218
  @type nodenames: list
3219
  @param nodenames: the list of nodes on which we should check
3220
  @type hvname: string
3221
  @param hvname: the name of the hypervisor we should use
3222
  @type hvparams: dict
3223
  @param hvparams: the parameters which we need to check
3224
  @raise errors.OpPrereqError: if the parameters are not valid
3225

3226
  """
3227
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3228
                                                  hvname,
3229
                                                  hvparams)
3230
  for node in nodenames:
3231
    info = hvinfo.get(node, None)
3232
    if not info or not isinstance(info, (tuple, list)):
3233
      raise errors.OpPrereqError("Cannot get current information"
3234
                                 " from node '%s' (%s)" % (node, info))
3235
    if not info[0]:
3236
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3237
                                 " %s" % info[1])
3238

    
3239

    
3240
class LUCreateInstance(LogicalUnit):
3241
  """Create an instance.
3242

3243
  """
3244
  HPATH = "instance-add"
3245
  HTYPE = constants.HTYPE_INSTANCE
3246
  _OP_REQP = ["instance_name", "disk_size",
3247
              "disk_template", "swap_size", "mode", "start",
3248
              "wait_for_sync", "ip_check", "mac",
3249
              "hvparams", "beparams"]
3250
  REQ_BGL = False
3251

    
3252
  def _ExpandNode(self, node):
3253
    """Expands and checks one node name.
3254

3255
    """
3256
    node_full = self.cfg.ExpandNodeName(node)
3257
    if node_full is None:
3258
      raise errors.OpPrereqError("Unknown node %s" % node)
3259
    return node_full
3260

    
3261
  def ExpandNames(self):
3262
    """ExpandNames for CreateInstance.
3263

3264
    Figure out the right locks for instance creation.
3265

3266
    """
3267
    self.needed_locks = {}
3268

    
3269
    # set optional parameters to none if they don't exist
3270
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3271
      if not hasattr(self.op, attr):
3272
        setattr(self.op, attr, None)
3273

    
3274
    # cheap checks, mostly valid constants given
3275

    
3276
    # verify creation mode
3277
    if self.op.mode not in (constants.INSTANCE_CREATE,
3278
                            constants.INSTANCE_IMPORT):
3279
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3280
                                 self.op.mode)
3281

    
3282
    # disk template and mirror node verification
3283
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3284
      raise errors.OpPrereqError("Invalid disk template name")
3285

    
3286
    if self.op.hypervisor is None:
3287
      self.op.hypervisor = self.cfg.GetHypervisorType()
3288

    
3289
    cluster = self.cfg.GetClusterInfo()
3290
    enabled_hvs = cluster.enabled_hypervisors
3291
    if self.op.hypervisor not in enabled_hvs:
3292
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3293
                                 " cluster (%s)" % (self.op.hypervisor,
3294
                                  ",".join(enabled_hvs)))
3295

    
3296
    # check hypervisor parameter syntax (locally)
3297

    
3298
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3299
                                  self.op.hvparams)
3300
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3301
    hv_type.CheckParameterSyntax(filled_hvp)
3302

    
3303
    # fill and remember the beparams dict
3304
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3305
                                    self.op.beparams)
3306

    
3307
    #### instance parameters check
3308

    
3309
    # instance name verification
3310
    hostname1 = utils.HostInfo(self.op.instance_name)
3311
    self.op.instance_name = instance_name = hostname1.name
3312

    
3313
    # this is just a preventive check, but someone might still add this
3314
    # instance in the meantime, and creation will fail at lock-add time
3315
    if instance_name in self.cfg.GetInstanceList():
3316
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3317
                                 instance_name)
3318

    
3319
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3320

    
3321
    # ip validity checks
3322
    ip = getattr(self.op, "ip", None)
3323
    if ip is None or ip.lower() == "none":
3324
      inst_ip = None
3325
    elif ip.lower() == "auto":
3326
      inst_ip = hostname1.ip
3327
    else:
3328
      if not utils.IsValidIP(ip):
3329
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3330
                                   " like a valid IP" % ip)
3331
      inst_ip = ip
3332
    self.inst_ip = self.op.ip = inst_ip
3333
    # used in CheckPrereq for ip ping check
3334
    self.check_ip = hostname1.ip
3335

    
3336
    # MAC address verification
3337
    if self.op.mac != "auto":
3338
      if not utils.IsValidMac(self.op.mac.lower()):
3339
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3340
                                   self.op.mac)
3341

    
3342
    # file storage checks
3343
    if (self.op.file_driver and
3344
        not self.op.file_driver in constants.FILE_DRIVER):
3345
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3346
                                 self.op.file_driver)
3347

    
3348
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3349
      raise errors.OpPrereqError("File storage directory path not absolute")
3350

    
3351
    ### Node/iallocator related checks
3352
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3353
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3354
                                 " node must be given")
3355

    
3356
    if self.op.iallocator:
3357
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3358
    else:
3359
      self.op.pnode = self._ExpandNode(self.op.pnode)
3360
      nodelist = [self.op.pnode]
3361
      if self.op.snode is not None:
3362
        self.op.snode = self._ExpandNode(self.op.snode)
3363
        nodelist.append(self.op.snode)
3364
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3365

    
3366
    # in case of import lock the source node too
3367
    if self.op.mode == constants.INSTANCE_IMPORT:
3368
      src_node = getattr(self.op, "src_node", None)
3369
      src_path = getattr(self.op, "src_path", None)
3370

    
3371
      if src_node is None or src_path is None:
3372
        raise errors.OpPrereqError("Importing an instance requires source"
3373
                                   " node and path options")
3374

    
3375
      if not os.path.isabs(src_path):
3376
        raise errors.OpPrereqError("The source path must be absolute")
3377

    
3378
      self.op.src_node = src_node = self._ExpandNode(src_node)
3379
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3380
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3381

    
3382
    else: # INSTANCE_CREATE
3383
      if getattr(self.op, "os_type", None) is None:
3384
        raise errors.OpPrereqError("No guest OS specified")
3385

    
3386
  def _RunAllocator(self):
3387
    """Run the allocator based on input opcode.
3388

3389
    """
3390
    disks = [{"size": self.op.disk_size, "mode": "w"},
3391
             {"size": self.op.swap_size, "mode": "w"}]
3392
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3393
             "bridge": self.op.bridge}]
3394
    ial = IAllocator(self,
3395
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3396
                     name=self.op.instance_name,
3397
                     disk_template=self.op.disk_template,
3398
                     tags=[],
3399
                     os=self.op.os_type,
3400
                     vcpus=self.be_full[constants.BE_VCPUS],
3401
                     mem_size=self.be_full[constants.BE_MEMORY],
3402
                     disks=disks,
3403
                     nics=nics,
3404
                     )
3405

    
3406
    ial.Run(self.op.iallocator)
3407

    
3408
    if not ial.success:
3409
      raise errors.OpPrereqError("Can't compute nodes using"
3410
                                 " iallocator '%s': %s" % (self.op.iallocator,
3411
                                                           ial.info))
3412
    if len(ial.nodes) != ial.required_nodes:
3413
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3414
                                 " of nodes (%s), required %s" %
3415
                                 (self.op.iallocator, len(ial.nodes),
3416
                                  ial.required_nodes))
3417
    self.op.pnode = ial.nodes[0]
3418
    feedback_fn("Selected nodes for the instance: %s" %
3419
                (", ".join(ial.nodes),))
3420
    logging.info("Selected nodes for instance %s via iallocator %s: %s",
3421
                 self.op.instance_name, self.op.iallocator, ial.nodes)
3422
    if ial.required_nodes == 2:
3423
      self.op.snode = ial.nodes[1]
3424

    
3425
  def BuildHooksEnv(self):
3426
    """Build hooks env.
3427

3428
    This runs on master, primary and secondary nodes of the instance.
3429

3430
    """
3431
    env = {
3432
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3433
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3434
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3435
      "INSTANCE_ADD_MODE": self.op.mode,
3436
      }
3437
    if self.op.mode == constants.INSTANCE_IMPORT:
3438
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3439
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3440
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3441

    
3442
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3443
      primary_node=self.op.pnode,
3444
      secondary_nodes=self.secondaries,
3445
      status=self.instance_status,
3446
      os_type=self.op.os_type,
3447
      memory=self.be_full[constants.BE_MEMORY],
3448
      vcpus=self.be_full[constants.BE_VCPUS],
3449
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3450
    ))
3451

    
3452
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3453
          self.secondaries)
3454
    return env, nl, nl
3455

    
3456

    
3457
  def CheckPrereq(self):
3458
    """Check prerequisites.
3459

3460
    """
3461
    if (not self.cfg.GetVGName() and
3462
        self.op.disk_template not in constants.DTS_NOT_LVM):
3463
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3464
                                 " instances")
3465

    
3466

    
3467
    if self.op.mode == constants.INSTANCE_IMPORT:
3468
      src_node = self.op.src_node
3469
      src_path = self.op.src_path
3470

    
3471
      export_info = self.rpc.call_export_info(src_node, src_path)
3472

    
3473
      if not export_info:
3474
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3475

    
3476
      if not export_info.has_section(constants.INISECT_EXP):
3477
        raise errors.ProgrammerError("Corrupted export config")
3478

    
3479
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3480
      if (int(ei_version) != constants.EXPORT_VERSION):
3481
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3482
                                   (ei_version, constants.EXPORT_VERSION))
3483

    
3484
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3485
        raise errors.OpPrereqError("Can't import instance with more than"
3486
                                   " one data disk")
3487

    
3488
      # FIXME: are the old os-es, disk sizes, etc. useful?
3489
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3490
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3491
                                                         'disk0_dump'))
3492
      self.src_image = diskimage
3493

    
3494
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3495

    
3496
    if self.op.start and not self.op.ip_check:
3497
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3498
                                 " adding an instance in start mode")
3499

    
3500
    if self.op.ip_check:
3501
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3502
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3503
                                   (self.check_ip, self.op.instance_name))
3504

    
3505
    # bridge verification
3506
    bridge = getattr(self.op, "bridge", None)
3507
    if bridge is None:
3508
      self.op.bridge = self.cfg.GetDefBridge()
3509
    else:
3510
      self.op.bridge = bridge
3511

    
3512
    #### allocator run
3513

    
3514
    if self.op.iallocator is not None:
3515
      self._RunAllocator()
3516

    
3517
    #### node related checks
3518

    
3519
    # check primary node
3520
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3521
    assert self.pnode is not None, \
3522
      "Cannot retrieve locked node %s" % self.op.pnode
3523
    self.secondaries = []
3524

    
3525
    # mirror node verification
3526
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3527
      if self.op.snode is None:
3528
        raise errors.OpPrereqError("The networked disk templates need"
3529
                                   " a mirror node")
3530
      if self.op.snode == pnode.name:
3531
        raise errors.OpPrereqError("The secondary node cannot be"
3532
                                   " the primary node.")
3533
      self.secondaries.append(self.op.snode)
3534

    
3535
    nodenames = [pnode.name] + self.secondaries
3536

    
3537
    req_size = _ComputeDiskSize(self.op.disk_template,
3538
                                self.op.disk_size, self.op.swap_size)
3539

    
3540
    # Check lv size requirements
3541
    if req_size is not None:
3542
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3543
                                         self.op.hypervisor)
3544
      for node in nodenames:
3545
        info = nodeinfo.get(node, None)
3546
        if not info:
3547
          raise errors.OpPrereqError("Cannot get current information"
3548
                                     " from node '%s'" % node)
3549
        vg_free = info.get('vg_free', None)
3550
        if not isinstance(vg_free, int):
3551
          raise errors.OpPrereqError("Can't compute free disk space on"
3552
                                     " node %s" % node)
3553
        if req_size > info['vg_free']:
3554
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3555
                                     " %d MB available, %d MB required" %
3556
                                     (node, info['vg_free'], req_size))
3557

    
3558
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3559

    
3560
    # os verification
3561
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3562
    if not os_obj:
3563
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3564
                                 " primary node"  % self.op.os_type)
3565

    
3566
    # bridge check on primary node
3567
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3568
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3569
                                 " destination node '%s'" %
3570
                                 (self.op.bridge, pnode.name))
3571

    
3572
    # memory check on primary node
3573
    if self.op.start:
3574
      _CheckNodeFreeMemory(self, self.pnode.name,
3575
                           "creating instance %s" % self.op.instance_name,
3576
                           self.be_full[constants.BE_MEMORY],
3577
                           self.op.hypervisor)
3578

    
3579
    if self.op.start:
3580
      self.instance_status = 'up'
3581
    else:
3582
      self.instance_status = 'down'
3583

    
3584
  def Exec(self, feedback_fn):
3585
    """Create and add the instance to the cluster.
3586

3587
    """
3588
    instance = self.op.instance_name
3589
    pnode_name = self.pnode.name
3590

    
3591
    if self.op.mac == "auto":
3592
      mac_address = self.cfg.GenerateMAC()
3593
    else:
3594
      mac_address = self.op.mac
3595

    
3596
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3597
    if self.inst_ip is not None:
3598
      nic.ip = self.inst_ip
3599

    
3600
    ht_kind = self.op.hypervisor
3601
    if ht_kind in constants.HTS_REQ_PORT:
3602
      network_port = self.cfg.AllocatePort()
3603
    else:
3604
      network_port = None
3605

    
3606
    ##if self.op.vnc_bind_address is None:
3607
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3608

    
3609
    # this is needed because os.path.join does not accept None arguments
3610
    if self.op.file_storage_dir is None:
3611
      string_file_storage_dir = ""
3612
    else:
3613
      string_file_storage_dir = self.op.file_storage_dir
3614

    
3615
    # build the full file storage dir path
3616
    file_storage_dir = os.path.normpath(os.path.join(
3617
                                        self.cfg.GetFileStorageDir(),
3618
                                        string_file_storage_dir, instance))
3619

    
3620

    
3621
    disks = _GenerateDiskTemplate(self,
3622
                                  self.op.disk_template,
3623
                                  instance, pnode_name,
3624
                                  self.secondaries, self.op.disk_size,
3625
                                  self.op.swap_size,
3626
                                  file_storage_dir,
3627
                                  self.op.file_driver)
3628

    
3629
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3630
                            primary_node=pnode_name,
3631
                            nics=[nic], disks=disks,
3632
                            disk_template=self.op.disk_template,
3633
                            status=self.instance_status,
3634
                            network_port=network_port,
3635
                            beparams=self.op.beparams,
3636
                            hvparams=self.op.hvparams,
3637
                            hypervisor=self.op.hypervisor,
3638
                            )
3639

    
3640
    feedback_fn("* creating instance disks...")
3641
    if not _CreateDisks(self, iobj):
3642
      _RemoveDisks(self, iobj)
3643
      self.cfg.ReleaseDRBDMinors(instance)
3644
      raise errors.OpExecError("Device creation failed, reverting...")
3645

    
3646
    feedback_fn("adding instance %s to cluster config" % instance)
3647

    
3648
    self.cfg.AddInstance(iobj)
3649
    # Declare that we don't want to remove the instance lock anymore, as we've
3650
    # added the instance to the config
3651
    del self.remove_locks[locking.LEVEL_INSTANCE]
3652
    # Remove the temp. assignements for the instance's drbds
3653
    self.cfg.ReleaseDRBDMinors(instance)
3654

    
3655
    if self.op.wait_for_sync:
3656
      disk_abort = not _WaitForSync(self, iobj)
3657
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3658
      # make sure the disks are not degraded (still sync-ing is ok)
3659
      time.sleep(15)
3660
      feedback_fn("* checking mirrors status")
3661
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3662
    else:
3663
      disk_abort = False
3664

    
3665
    if disk_abort:
3666
      _RemoveDisks(self, iobj)
3667
      self.cfg.RemoveInstance(iobj.name)
3668
      # Make sure the instance lock gets removed
3669
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3670
      raise errors.OpExecError("There are some degraded disks for"
3671
                               " this instance")
3672

    
3673
    feedback_fn("creating os for instance %s on node %s" %
3674
                (instance, pnode_name))
3675

    
3676
    if iobj.disk_template != constants.DT_DISKLESS:
3677
      if self.op.mode == constants.INSTANCE_CREATE:
3678
        feedback_fn("* running the instance OS create scripts...")
3679
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3680
          raise errors.OpExecError("could not add os for instance %s"
3681
                                   " on node %s" %
3682
                                   (instance, pnode_name))
3683

    
3684
      elif self.op.mode == constants.INSTANCE_IMPORT:
3685
        feedback_fn("* running the instance OS import scripts...")
3686
        src_node = self.op.src_node
3687
        src_image = self.src_image
3688
        cluster_name = self.cfg.GetClusterName()
3689
        if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3690
                                                src_node, src_image,
3691
                                                cluster_name):
3692
          raise errors.OpExecError("Could not import os for instance"
3693
                                   " %s on node %s" %
3694
                                   (instance, pnode_name))
3695
      else:
3696
        # also checked in the prereq part
3697
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3698
                                     % self.op.mode)
3699

    
3700
    if self.op.start:
3701
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3702
      feedback_fn("* starting instance...")
3703
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3704
        raise errors.OpExecError("Could not start instance")
3705

    
3706

    
3707
class LUConnectConsole(NoHooksLU):
3708
  """Connect to an instance's console.
3709

3710
  This is somewhat special in that it returns the command line that
3711
  you need to run on the master node in order to connect to the
3712
  console.
3713

3714
  """
3715
  _OP_REQP = ["instance_name"]
3716
  REQ_BGL = False
3717

    
3718
  def ExpandNames(self):
3719
    self._ExpandAndLockInstance()
3720

    
3721
  def CheckPrereq(self):
3722
    """Check prerequisites.
3723

3724
    This checks that the instance is in the cluster.
3725

3726
    """
3727
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3728
    assert self.instance is not None, \
3729
      "Cannot retrieve locked instance %s" % self.op.instance_name
3730

    
3731
  def Exec(self, feedback_fn):
3732
    """Connect to the console of an instance
3733

3734
    """
3735
    instance = self.instance
3736
    node = instance.primary_node
3737

    
3738
    node_insts = self.rpc.call_instance_list([node],
3739
                                             [instance.hypervisor])[node]
3740
    if node_insts is False:
3741
      raise errors.OpExecError("Can't connect to node %s." % node)
3742

    
3743
    if instance.name not in node_insts:
3744
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3745

    
3746
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3747

    
3748
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3749
    console_cmd = hyper.GetShellCommandForConsole(instance)
3750

    
3751
    # build ssh cmdline
3752
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3753

    
3754

    
3755
class LUReplaceDisks(LogicalUnit):
3756
  """Replace the disks of an instance.
3757

3758
  """
3759
  HPATH = "mirrors-replace"
3760
  HTYPE = constants.HTYPE_INSTANCE
3761
  _OP_REQP = ["instance_name", "mode", "disks"]
3762
  REQ_BGL = False
3763

    
3764
  def ExpandNames(self):
3765
    self._ExpandAndLockInstance()
3766

    
3767
    if not hasattr(self.op, "remote_node"):
3768
      self.op.remote_node = None
3769

    
3770
    ia_name = getattr(self.op, "iallocator", None)
3771
    if ia_name is not None:
3772
      if self.op.remote_node is not None:
3773
        raise errors.OpPrereqError("Give either the iallocator or the new"
3774
                                   " secondary, not both")
3775
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3776
    elif self.op.remote_node is not None:
3777
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3778
      if remote_node is None:
3779
        raise errors.OpPrereqError("Node '%s' not known" %
3780
                                   self.op.remote_node)
3781
      self.op.remote_node = remote_node
3782
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3783
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3784
    else:
3785
      self.needed_locks[locking.LEVEL_NODE] = []
3786
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3787

    
3788
  def DeclareLocks(self, level):
3789
    # If we're not already locking all nodes in the set we have to declare the
3790
    # instance's primary/secondary nodes.
3791
    if (level == locking.LEVEL_NODE and
3792
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3793
      self._LockInstancesNodes()
3794

    
3795
  def _RunAllocator(self):
3796
    """Compute a new secondary node using an IAllocator.
3797

3798
    """
3799
    ial = IAllocator(self,
3800
                     mode=constants.IALLOCATOR_MODE_RELOC,
3801
                     name=self.op.instance_name,
3802
                     relocate_from=[self.sec_node])
3803

    
3804
    ial.Run(self.op.iallocator)
3805

    
3806
    if not ial.success:
3807
      raise errors.OpPrereqError("Can't compute nodes using"
3808
                                 " iallocator '%s': %s" % (self.op.iallocator,
3809
                                                           ial.info))
3810
    if len(ial.nodes) != ial.required_nodes:
3811
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3812
                                 " of nodes (%s), required %s" %
3813
                                 (len(ial.nodes), ial.required_nodes))
3814
    self.op.remote_node = ial.nodes[0]
3815
    feedback_fn("Selected new secondary for the instance: %s" %
3816
                self.op.remote_node)
3817

    
3818
  def BuildHooksEnv(self):
3819
    """Build hooks env.
3820

3821
    This runs on the master, the primary and all the secondaries.
3822

3823
    """
3824
    env = {
3825
      "MODE": self.op.mode,
3826
      "NEW_SECONDARY": self.op.remote_node,
3827
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3828
      }
3829
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3830
    nl = [
3831
      self.cfg.GetMasterNode(),
3832
      self.instance.primary_node,
3833
      ]
3834
    if self.op.remote_node is not None:
3835
      nl.append(self.op.remote_node)
3836
    return env, nl, nl
3837

    
3838
  def CheckPrereq(self):
3839
    """Check prerequisites.
3840

3841
    This checks that the instance is in the cluster.
3842

3843
    """
3844
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3845
    assert instance is not None, \
3846
      "Cannot retrieve locked instance %s" % self.op.instance_name
3847
    self.instance = instance
3848

    
3849
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3850
      raise errors.OpPrereqError("Instance's disk layout is not"
3851
                                 " network mirrored.")
3852

    
3853
    if len(instance.secondary_nodes) != 1:
3854
      raise errors.OpPrereqError("The instance has a strange layout,"
3855
                                 " expected one secondary but found %d" %
3856
                                 len(instance.secondary_nodes))
3857

    
3858
    self.sec_node = instance.secondary_nodes[0]
3859

    
3860
    ia_name = getattr(self.op, "iallocator", None)
3861
    if ia_name is not None:
3862
      self._RunAllocator()
3863

    
3864
    remote_node = self.op.remote_node
3865
    if remote_node is not None:
3866
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3867
      assert self.remote_node_info is not None, \
3868
        "Cannot retrieve locked node %s" % remote_node
3869
    else:
3870
      self.remote_node_info = None
3871
    if remote_node == instance.primary_node:
3872
      raise errors.OpPrereqError("The specified node is the primary node of"
3873
                                 " the instance.")
3874
    elif remote_node == self.sec_node:
3875
      if self.op.mode == constants.REPLACE_DISK_SEC:
3876
        # this is for DRBD8, where we can't execute the same mode of
3877
        # replacement as for drbd7 (no different port allocated)
3878
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3879
                                   " replacement")
3880
    if instance.disk_template == constants.DT_DRBD8:
3881
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3882
          remote_node is not None):
3883
        # switch to replace secondary mode
3884
        self.op.mode = constants.REPLACE_DISK_SEC
3885

    
3886
      if self.op.mode == constants.REPLACE_DISK_ALL:
3887
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3888
                                   " secondary disk replacement, not"
3889
                                   " both at once")
3890
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3891
        if remote_node is not None:
3892
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3893
                                     " the secondary while doing a primary"
3894
                                     " node disk replacement")
3895
        self.tgt_node = instance.primary_node
3896
        self.oth_node = instance.secondary_nodes[0]
3897
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3898
        self.new_node = remote_node # this can be None, in which case
3899
                                    # we don't change the secondary
3900
        self.tgt_node = instance.secondary_nodes[0]
3901
        self.oth_node = instance.primary_node
3902
      else:
3903
        raise errors.ProgrammerError("Unhandled disk replace mode")
3904

    
3905
    for name in self.op.disks:
3906
      if instance.FindDisk(name) is None:
3907
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3908
                                   (name, instance.name))
3909

    
3910
  def _ExecD8DiskOnly(self, feedback_fn):
3911
    """Replace a disk on the primary or secondary for dbrd8.
3912

3913
    The algorithm for replace is quite complicated:
3914
      - for each disk to be replaced:
3915
        - create new LVs on the target node with unique names
3916
        - detach old LVs from the drbd device
3917
        - rename old LVs to name_replaced.<time_t>
3918
        - rename new LVs to old LVs
3919
        - attach the new LVs (with the old names now) to the drbd device
3920
      - wait for sync across all devices
3921
      - for each modified disk:
3922
        - remove old LVs (which have the name name_replaces.<time_t>)
3923

3924
    Failures are not very well handled.
3925

3926
    """
3927
    steps_total = 6
3928
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3929
    instance = self.instance
3930
    iv_names = {}
3931
    vgname = self.cfg.GetVGName()
3932
    # start of work
3933
    cfg = self.cfg
3934
    tgt_node = self.tgt_node
3935
    oth_node = self.oth_node
3936

    
3937
    # Step: check device activation
3938
    self.proc.LogStep(1, steps_total, "check device existence")
3939
    info("checking volume groups")
3940
    my_vg = cfg.GetVGName()
3941
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3942
    if not results:
3943
      raise errors.OpExecError("Can't list volume groups on the nodes")
3944
    for node in oth_node, tgt_node:
3945
      res = results.get(node, False)
3946
      if not res or my_vg not in res:
3947
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3948
                                 (my_vg, node))
3949
    for dev in instance.disks:
3950
      if not dev.iv_name in self.op.disks:
3951
        continue
3952
      for node in tgt_node, oth_node:
3953
        info("checking %s on %s" % (dev.iv_name, node))
3954
        cfg.SetDiskID(dev, node)
3955
        if not self.rpc.call_blockdev_find(node, dev):
3956
          raise errors.OpExecError("Can't find device %s on node %s" %
3957
                                   (dev.iv_name, node))
3958

    
3959
    # Step: check other node consistency
3960
    self.proc.LogStep(2, steps_total, "check peer consistency")
3961
    for dev in instance.disks:
3962
      if not dev.iv_name in self.op.disks:
3963
        continue
3964
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3965
      if not _CheckDiskConsistency(self, dev, oth_node,
3966
                                   oth_node==instance.primary_node):
3967
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3968
                                 " to replace disks on this node (%s)" %
3969
                                 (oth_node, tgt_node))
3970

    
3971
    # Step: create new storage
3972
    self.proc.LogStep(3, steps_total, "allocate new storage")
3973
    for dev in instance.disks:
3974
      if not dev.iv_name in self.op.disks:
3975
        continue
3976
      size = dev.size
3977
      cfg.SetDiskID(dev, tgt_node)
3978
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3979
      names = _GenerateUniqueNames(self, lv_names)
3980
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3981
                             logical_id=(vgname, names[0]))
3982
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3983
                             logical_id=(vgname, names[1]))
3984
      new_lvs = [lv_data, lv_meta]
3985
      old_lvs = dev.children
3986
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3987
      info("creating new local storage on %s for %s" %
3988
           (tgt_node, dev.iv_name))
3989
      # since we *always* want to create this LV, we use the
3990
      # _Create...OnPrimary (which forces the creation), even if we
3991
      # are talking about the secondary node
3992
      for new_lv in new_lvs:
3993
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3994
                                        _GetInstanceInfoText(instance)):
3995
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3996
                                   " node '%s'" %
3997
                                   (new_lv.logical_id[1], tgt_node))
3998

    
3999
    # Step: for each lv, detach+rename*2+attach
4000
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4001
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4002
      info("detaching %s drbd from local storage" % dev.iv_name)
4003
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4004
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4005
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4006
      #dev.children = []
4007
      #cfg.Update(instance)
4008

    
4009
      # ok, we created the new LVs, so now we know we have the needed
4010
      # storage; as such, we proceed on the target node to rename
4011
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4012
      # using the assumption that logical_id == physical_id (which in
4013
      # turn is the unique_id on that node)
4014

    
4015
      # FIXME(iustin): use a better name for the replaced LVs
4016
      temp_suffix = int(time.time())
4017
      ren_fn = lambda d, suff: (d.physical_id[0],
4018
                                d.physical_id[1] + "_replaced-%s" % suff)
4019
      # build the rename list based on what LVs exist on the node
4020
      rlist = []
4021
      for to_ren in old_lvs:
4022
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4023
        if find_res is not None: # device exists
4024
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4025

    
4026
      info("renaming the old LVs on the target node")
4027
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4028
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4029
      # now we rename the new LVs to the old LVs
4030
      info("renaming the new LVs on the target node")
4031
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4032
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4033
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4034

    
4035
      for old, new in zip(old_lvs, new_lvs):
4036
        new.logical_id = old.logical_id
4037
        cfg.SetDiskID(new, tgt_node)
4038

    
4039
      for disk in old_lvs:
4040
        disk.logical_id = ren_fn(disk, temp_suffix)
4041
        cfg.SetDiskID(disk, tgt_node)
4042

    
4043
      # now that the new lvs have the old name, we can add them to the device
4044
      info("adding new mirror component on %s" % tgt_node)
4045
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4046
        for new_lv in new_lvs:
4047
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4048
            warning("Can't rollback device %s", hint="manually cleanup unused"
4049
                    " logical volumes")
4050
        raise errors.OpExecError("Can't add local storage to drbd")
4051

    
4052
      dev.children = new_lvs
4053
      cfg.Update(instance)
4054

    
4055
    # Step: wait for sync
4056

    
4057
    # this can fail as the old devices are degraded and _WaitForSync
4058
    # does a combined result over all disks, so we don't check its
4059
    # return value
4060
    self.proc.LogStep(5, steps_total, "sync devices")
4061
    _WaitForSync(self, instance, unlock=True)
4062

    
4063
    # so check manually all the devices
4064
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4065
      cfg.SetDiskID(dev, instance.primary_node)
4066
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4067
      if is_degr:
4068
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4069

    
4070
    # Step: remove old storage
4071
    self.proc.LogStep(6, steps_total, "removing old storage")
4072
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4073
      info("remove logical volumes for %s" % name)
4074
      for lv in old_lvs:
4075
        cfg.SetDiskID(lv, tgt_node)
4076
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4077
          warning("Can't remove old LV", hint="manually remove unused LVs")
4078
          continue
4079

    
4080
  def _ExecD8Secondary(self, feedback_fn):
4081
    """Replace the secondary node for drbd8.
4082

4083
    The algorithm for replace is quite complicated:
4084
      - for all disks of the instance:
4085
        - create new LVs on the new node with same names
4086
        - shutdown the drbd device on the old secondary
4087
        - disconnect the drbd network on the primary
4088
        - create the drbd device on the new secondary
4089
        - network attach the drbd on the primary, using an artifice:
4090
          the drbd code for Attach() will connect to the network if it
4091
          finds a device which is connected to the good local disks but
4092
          not network enabled
4093
      - wait for sync across all devices
4094
      - remove all disks from the old secondary
4095

4096
    Failures are not very well handled.
4097

4098
    """
4099
    steps_total = 6
4100
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4101
    instance = self.instance
4102
    iv_names = {}
4103
    vgname = self.cfg.GetVGName()
4104
    # start of work
4105
    cfg = self.cfg
4106
    old_node = self.tgt_node
4107
    new_node = self.new_node
4108
    pri_node = instance.primary_node
4109

    
4110
    # Step: check device activation
4111
    self.proc.LogStep(1, steps_total, "check device existence")
4112
    info("checking volume groups")
4113
    my_vg = cfg.GetVGName()
4114
    results = self.rpc.call_vg_list([pri_node, new_node])
4115
    if not results:
4116
      raise errors.OpExecError("Can't list volume groups on the nodes")
4117
    for node in pri_node, new_node:
4118
      res = results.get(node, False)
4119
      if not res or my_vg not in res:
4120
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4121
                                 (my_vg, node))
4122
    for dev in instance.disks:
4123
      if not dev.iv_name in self.op.disks:
4124
        continue
4125
      info("checking %s on %s" % (dev.iv_name, pri_node))
4126
      cfg.SetDiskID(dev, pri_node)
4127
      if not self.rpc.call_blockdev_find(pri_node, dev):
4128
        raise errors.OpExecError("Can't find device %s on node %s" %
4129
                                 (dev.iv_name, pri_node))
4130

    
4131
    # Step: check other node consistency
4132
    self.proc.LogStep(2, steps_total, "check peer consistency")
4133
    for dev in instance.disks:
4134
      if not dev.iv_name in self.op.disks:
4135
        continue
4136
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4137
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4138
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4139
                                 " unsafe to replace the secondary" %
4140
                                 pri_node)
4141

    
4142
    # Step: create new storage
4143
    self.proc.LogStep(3, steps_total, "allocate new storage")
4144
    for dev in instance.disks:
4145
      size = dev.size
4146
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4147
      # since we *always* want to create this LV, we use the
4148
      # _Create...OnPrimary (which forces the creation), even if we
4149
      # are talking about the secondary node
4150
      for new_lv in dev.children:
4151
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4152
                                        _GetInstanceInfoText(instance)):
4153
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4154
                                   " node '%s'" %
4155
                                   (new_lv.logical_id[1], new_node))
4156

    
4157

    
4158
    # Step 4: dbrd minors and drbd setups changes
4159
    # after this, we must manually remove the drbd minors on both the
4160
    # error and the success paths
4161
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4162
                                   instance.name)
4163
    logging.debug("Allocated minors %s" % (minors,))
4164
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4165
    for dev, new_minor in zip(instance.disks, minors):
4166
      size = dev.size
4167
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4168
      # create new devices on new_node
4169
      if pri_node == dev.logical_id[0]:
4170
        new_logical_id = (pri_node, new_node,
4171
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4172
                          dev.logical_id[5])
4173
      else:
4174
        new_logical_id = (new_node, pri_node,
4175
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4176
                          dev.logical_id[5])
4177
      iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4178
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4179
                    new_logical_id)
4180
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4181
                              logical_id=new_logical_id,
4182
                              children=dev.children)
4183
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4184
                                        new_drbd, False,
4185
                                        _GetInstanceInfoText(instance)):
4186
        self.cfg.ReleaseDRBDMinors(instance.name)
4187
        raise errors.OpExecError("Failed to create new DRBD on"
4188
                                 " node '%s'" % new_node)
4189

    
4190
    for dev in instance.disks:
4191
      # we have new devices, shutdown the drbd on the old secondary
4192
      info("shutting down drbd for %s on old node" % dev.iv_name)
4193
      cfg.SetDiskID(dev, old_node)
4194
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4195
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4196
                hint="Please cleanup this device manually as soon as possible")
4197

    
4198
    info("detaching primary drbds from the network (=> standalone)")
4199
    done = 0
4200
    for dev in instance.disks:
4201
      cfg.SetDiskID(dev, pri_node)
4202
      # set the network part of the physical (unique in bdev terms) id
4203
      # to None, meaning detach from network
4204
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4205
      # and 'find' the device, which will 'fix' it to match the
4206
      # standalone state
4207
      if self.rpc.call_blockdev_find(pri_node, dev):
4208
        done += 1
4209
      else:
4210
        warning("Failed to detach drbd %s from network, unusual case" %
4211
                dev.iv_name)
4212

    
4213
    if not done:
4214
      # no detaches succeeded (very unlikely)
4215
      self.cfg.ReleaseDRBDMinors(instance.name)
4216
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4217

    
4218
    # if we managed to detach at least one, we update all the disks of
4219
    # the instance to point to the new secondary
4220
    info("updating instance configuration")
4221
    for dev, _, new_logical_id in iv_names.itervalues():
4222
      dev.logical_id = new_logical_id
4223
      cfg.SetDiskID(dev, pri_node)
4224
    cfg.Update(instance)
4225
    # we can remove now the temp minors as now the new values are
4226
    # written to the config file (and therefore stable)
4227
    self.cfg.ReleaseDRBDMinors(instance.name)
4228

    
4229
    # and now perform the drbd attach
4230
    info("attaching primary drbds to new secondary (standalone => connected)")
4231
    failures = []
4232
    for dev in instance.disks:
4233
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4234
      # since the attach is smart, it's enough to 'find' the device,
4235
      # it will automatically activate the network, if the physical_id
4236
      # is correct
4237
      cfg.SetDiskID(dev, pri_node)
4238
      logging.debug("Disk to attach: %s", dev)
4239
      if not self.rpc.call_blockdev_find(pri_node, dev):
4240
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4241
                "please do a gnt-instance info to see the status of disks")
4242

    
4243
    # this can fail as the old devices are degraded and _WaitForSync
4244
    # does a combined result over all disks, so we don't check its
4245
    # return value
4246
    self.proc.LogStep(5, steps_total, "sync devices")
4247
    _WaitForSync(self, instance, unlock=True)
4248

    
4249
    # so check manually all the devices
4250
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4251
      cfg.SetDiskID(dev, pri_node)
4252
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4253
      if is_degr:
4254
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4255

    
4256
    self.proc.LogStep(6, steps_total, "removing old storage")
4257
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4258
      info("remove logical volumes for %s" % name)
4259
      for lv in old_lvs:
4260
        cfg.SetDiskID(lv, old_node)
4261
        if not self.rpc.call_blockdev_remove(old_node, lv):
4262
          warning("Can't remove LV on old secondary",
4263
                  hint="Cleanup stale volumes by hand")
4264

    
4265
  def Exec(self, feedback_fn):
4266
    """Execute disk replacement.
4267

4268
    This dispatches the disk replacement to the appropriate handler.
4269

4270
    """
4271
    instance = self.instance
4272

    
4273
    # Activate the instance disks if we're replacing them on a down instance
4274
    if instance.status == "down":
4275
      _StartInstanceDisks(self, instance, True)
4276

    
4277
    if instance.disk_template == constants.DT_DRBD8:
4278
      if self.op.remote_node is None:
4279
        fn = self._ExecD8DiskOnly
4280
      else:
4281
        fn = self._ExecD8Secondary
4282
    else:
4283
      raise errors.ProgrammerError("Unhandled disk replacement case")
4284

    
4285
    ret = fn(feedback_fn)
4286

    
4287
    # Deactivate the instance disks if we're replacing them on a down instance
4288
    if instance.status == "down":
4289
      _SafeShutdownInstanceDisks(self, instance)
4290

    
4291
    return ret
4292

    
4293

    
4294
class LUGrowDisk(LogicalUnit):
4295
  """Grow a disk of an instance.
4296

4297
  """
4298
  HPATH = "disk-grow"
4299
  HTYPE = constants.HTYPE_INSTANCE
4300
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4301
  REQ_BGL = False
4302

    
4303
  def ExpandNames(self):
4304
    self._ExpandAndLockInstance()
4305
    self.needed_locks[locking.LEVEL_NODE] = []
4306
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4307

    
4308
  def DeclareLocks(self, level):
4309
    if level == locking.LEVEL_NODE:
4310
      self._LockInstancesNodes()
4311

    
4312
  def BuildHooksEnv(self):
4313
    """Build hooks env.
4314

4315
    This runs on the master, the primary and all the secondaries.
4316

4317
    """
4318
    env = {
4319
      "DISK": self.op.disk,
4320
      "AMOUNT": self.op.amount,
4321
      }
4322
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4323
    nl = [
4324
      self.cfg.GetMasterNode(),
4325
      self.instance.primary_node,
4326
      ]
4327
    return env, nl, nl
4328

    
4329
  def CheckPrereq(self):
4330
    """Check prerequisites.
4331

4332
    This checks that the instance is in the cluster.
4333

4334
    """
4335
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4336
    assert instance is not None, \
4337
      "Cannot retrieve locked instance %s" % self.op.instance_name
4338

    
4339
    self.instance = instance
4340

    
4341
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4342
      raise errors.OpPrereqError("Instance's disk layout does not support"
4343
                                 " growing.")
4344

    
4345
    if instance.FindDisk(self.op.disk) is None:
4346
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4347
                                 (self.op.disk, instance.name))
4348

    
4349
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4350
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4351
                                       instance.hypervisor)
4352
    for node in nodenames:
4353
      info = nodeinfo.get(node, None)
4354
      if not info:
4355
        raise errors.OpPrereqError("Cannot get current information"
4356
                                   " from node '%s'" % node)
4357
      vg_free = info.get('vg_free', None)
4358
      if not isinstance(vg_free, int):
4359
        raise errors.OpPrereqError("Can't compute free disk space on"
4360
                                   " node %s" % node)
4361
      if self.op.amount > info['vg_free']:
4362
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4363
                                   " %d MiB available, %d MiB required" %
4364
                                   (node, info['vg_free'], self.op.amount))
4365

    
4366
  def Exec(self, feedback_fn):
4367
    """Execute disk grow.
4368

4369
    """
4370
    instance = self.instance
4371
    disk = instance.FindDisk(self.op.disk)
4372
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4373
      self.cfg.SetDiskID(disk, node)
4374
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4375
      if (not result or not isinstance(result, (list, tuple)) or
4376
          len(result) != 2):
4377
        raise errors.OpExecError("grow request failed to node %s" % node)
4378
      elif not result[0]:
4379
        raise errors.OpExecError("grow request failed to node %s: %s" %
4380
                                 (node, result[1]))
4381
    disk.RecordGrow(self.op.amount)
4382
    self.cfg.Update(instance)
4383
    if self.op.wait_for_sync:
4384
      disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4385
      if disk_abort:
4386
        logging.error("Warning: disk sync-ing has not returned a good"
4387
                      " status.\nPlease check the instance.")
4388

    
4389

    
4390
class LUQueryInstanceData(NoHooksLU):
4391
  """Query runtime instance data.
4392

4393
  """
4394
  _OP_REQP = ["instances", "static"]
4395
  REQ_BGL = False
4396

    
4397
  def ExpandNames(self):
4398
    self.needed_locks = {}
4399
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4400

    
4401
    if not isinstance(self.op.instances, list):
4402
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4403

    
4404
    if self.op.instances:
4405
      self.wanted_names = []
4406
      for name in self.op.instances:
4407
        full_name = self.cfg.ExpandInstanceName(name)
4408
        if full_name is None:
4409
          raise errors.OpPrereqError("Instance '%s' not known" %
4410
                                     self.op.instance_name)
4411
        self.wanted_names.append(full_name)
4412
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4413
    else:
4414
      self.wanted_names = None
4415
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4416

    
4417
    self.needed_locks[locking.LEVEL_NODE] = []
4418
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4419

    
4420
  def DeclareLocks(self, level):
4421
    if level == locking.LEVEL_NODE:
4422
      self._LockInstancesNodes()
4423

    
4424
  def CheckPrereq(self):
4425
    """Check prerequisites.
4426

4427
    This only checks the optional instance list against the existing names.
4428

4429
    """
4430
    if self.wanted_names is None:
4431
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4432

    
4433
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4434
                             in self.wanted_names]
4435
    return
4436

    
4437
  def _ComputeDiskStatus(self, instance, snode, dev):
4438
    """Compute block device status.
4439

4440
    """
4441
    static = self.op.static
4442
    if not static:
4443
      self.cfg.SetDiskID(dev, instance.primary_node)
4444
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4445
    else:
4446
      dev_pstatus = None
4447

    
4448
    if dev.dev_type in constants.LDS_DRBD:
4449
      # we change the snode then (otherwise we use the one passed in)
4450
      if dev.logical_id[0] == instance.primary_node:
4451
        snode = dev.logical_id[1]
4452
      else:
4453
        snode = dev.logical_id[0]
4454

    
4455
    if snode and not static:
4456
      self.cfg.SetDiskID(dev, snode)
4457
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4458
    else:
4459
      dev_sstatus = None
4460

    
4461
    if dev.children:
4462
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4463
                      for child in dev.children]
4464
    else:
4465
      dev_children = []
4466

    
4467
    data = {
4468
      "iv_name": dev.iv_name,
4469
      "dev_type": dev.dev_type,
4470
      "logical_id": dev.logical_id,
4471
      "physical_id": dev.physical_id,
4472
      "pstatus": dev_pstatus,
4473
      "sstatus": dev_sstatus,
4474
      "children": dev_children,
4475
      }
4476

    
4477
    return data
4478

    
4479
  def Exec(self, feedback_fn):
4480
    """Gather and return data"""
4481
    result = {}
4482

    
4483
    cluster = self.cfg.GetClusterInfo()
4484

    
4485
    for instance in self.wanted_instances:
4486
      if not self.op.static:
4487
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4488
                                                  instance.name,
4489
                                                  instance.hypervisor)
4490
        if remote_info and "state" in remote_info:
4491
          remote_state = "up"
4492
        else:
4493
          remote_state = "down"
4494
      else:
4495
        remote_state = None
4496
      if instance.status == "down":
4497
        config_state = "down"
4498
      else:
4499
        config_state = "up"
4500

    
4501
      disks = [self._ComputeDiskStatus(instance, None, device)
4502
               for device in instance.disks]
4503

    
4504
      idict = {
4505
        "name": instance.name,
4506
        "config_state": config_state,
4507
        "run_state": remote_state,
4508
        "pnode": instance.primary_node,
4509
        "snodes": instance.secondary_nodes,
4510
        "os": instance.os,
4511
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4512
        "disks": disks,
4513
        "hypervisor": instance.hypervisor,
4514
        "network_port": instance.network_port,
4515
        "hv_instance": instance.hvparams,
4516
        "hv_actual": cluster.FillHV(instance),
4517
        "be_instance": instance.beparams,
4518
        "be_actual": cluster.FillBE(instance),
4519
        }
4520

    
4521
      result[instance.name] = idict
4522

    
4523
    return result
4524

    
4525

    
4526
class LUSetInstanceParams(LogicalUnit):
4527
  """Modifies an instances's parameters.
4528

4529
  """
4530
  HPATH = "instance-modify"
4531
  HTYPE = constants.HTYPE_INSTANCE
4532
  _OP_REQP = ["instance_name", "hvparams"]
4533
  REQ_BGL = False
4534

    
4535
  def ExpandNames(self):
4536
    self._ExpandAndLockInstance()
4537
    self.needed_locks[locking.LEVEL_NODE] = []
4538
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4539

    
4540

    
4541
  def DeclareLocks(self, level):
4542
    if level == locking.LEVEL_NODE:
4543
      self._LockInstancesNodes()
4544

    
4545
  def BuildHooksEnv(self):
4546
    """Build hooks env.
4547

4548
    This runs on the master, primary and secondaries.
4549

4550
    """
4551
    args = dict()
4552
    if constants.BE_MEMORY in self.be_new:
4553
      args['memory'] = self.be_new[constants.BE_MEMORY]
4554
    if constants.BE_VCPUS in self.be_new:
4555
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4556
    if self.do_ip or self.do_bridge or self.mac:
4557
      if self.do_ip:
4558
        ip = self.ip
4559
      else:
4560
        ip = self.instance.nics[0].ip
4561
      if self.bridge:
4562
        bridge = self.bridge
4563
      else:
4564
        bridge = self.instance.nics[0].bridge
4565
      if self.mac:
4566
        mac = self.mac
4567
      else:
4568
        mac = self.instance.nics[0].mac
4569
      args['nics'] = [(ip, bridge, mac)]
4570
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4571
    nl = [self.cfg.GetMasterNode(),
4572
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4573
    return env, nl, nl
4574

    
4575
  def CheckPrereq(self):
4576
    """Check prerequisites.
4577

4578
    This only checks the instance list against the existing names.
4579

4580
    """
4581
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4582
    # a separate CheckArguments function, if we implement one, so the operation
4583
    # can be aborted without waiting for any lock, should it have an error...
4584
    self.ip = getattr(self.op, "ip", None)
4585
    self.mac = getattr(self.op, "mac", None)
4586
    self.bridge = getattr(self.op, "bridge", None)
4587
    self.kernel_path = getattr(self.op, "kernel_path", None)
4588
    self.initrd_path = getattr(self.op, "initrd_path", None)
4589
    self.force = getattr(self.op, "force", None)
4590
    all_parms = [self.ip, self.bridge, self.mac]
4591
    if (all_parms.count(None) == len(all_parms) and
4592
        not self.op.hvparams and
4593
        not self.op.beparams):
4594
      raise errors.OpPrereqError("No changes submitted")
4595
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4596
      val = self.op.beparams.get(item, None)
4597
      if val is not None:
4598
        try:
4599
          val = int(val)
4600
        except ValueError, err:
4601
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4602
        self.op.beparams[item] = val
4603
    if self.ip is not None:
4604
      self.do_ip = True
4605
      if self.ip.lower() == "none":
4606
        self.ip = None
4607
      else:
4608
        if not utils.IsValidIP(self.ip):
4609
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4610
    else:
4611
      self.do_ip = False
4612
    self.do_bridge = (self.bridge is not None)
4613
    if self.mac is not None:
4614
      if self.cfg.IsMacInUse(self.mac):
4615
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4616
                                   self.mac)
4617
      if not utils.IsValidMac(self.mac):
4618
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4619

    
4620
    # checking the new params on the primary/secondary nodes
4621

    
4622
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4623
    assert self.instance is not None, \
4624
      "Cannot retrieve locked instance %s" % self.op.instance_name
4625
    pnode = self.instance.primary_node
4626
    nodelist = [pnode]
4627
    nodelist.extend(instance.secondary_nodes)
4628

    
4629
    # hvparams processing
4630
    if self.op.hvparams:
4631
      i_hvdict = copy.deepcopy(instance.hvparams)
4632
      for key, val in self.op.hvparams.iteritems():
4633
        if val is None:
4634
          try:
4635
            del i_hvdict[key]
4636
          except KeyError:
4637
            pass
4638
        else:
4639
          i_hvdict[key] = val
4640
      cluster = self.cfg.GetClusterInfo()
4641
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4642
                                i_hvdict)
4643
      # local check
4644
      hypervisor.GetHypervisor(
4645
        instance.hypervisor).CheckParameterSyntax(hv_new)
4646
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4647
      self.hv_new = hv_new # the new actual values
4648
      self.hv_inst = i_hvdict # the new dict (without defaults)
4649
    else:
4650
      self.hv_new = self.hv_inst = {}
4651

    
4652
    # beparams processing
4653
    if self.op.beparams:
4654
      i_bedict = copy.deepcopy(instance.beparams)
4655
      for key, val in self.op.beparams.iteritems():
4656
        if val is None:
4657
          try:
4658
            del i_bedict[key]
4659
          except KeyError:
4660
            pass
4661
        else:
4662
          i_bedict[key] = val
4663
      cluster = self.cfg.GetClusterInfo()
4664
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4665
                                i_bedict)
4666
      self.be_new = be_new # the new actual values
4667
      self.be_inst = i_bedict # the new dict (without defaults)
4668
    else:
4669
      self.hv_new = self.hv_inst = {}
4670

    
4671
    self.warn = []
4672

    
4673
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4674
      mem_check_list = [pnode]
4675
      if be_new[constants.BE_AUTO_BALANCE]:
4676
        # either we changed auto_balance to yes or it was from before
4677
        mem_check_list.extend(instance.secondary_nodes)
4678
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4679
                                                  instance.hypervisor)
4680
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4681
                                         instance.hypervisor)
4682

    
4683
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4684
        # Assume the primary node is unreachable and go ahead
4685
        self.warn.append("Can't get info from primary node %s" % pnode)
4686
      else:
4687
        if instance_info:
4688
          current_mem = instance_info['memory']
4689
        else:
4690
          # Assume instance not running
4691
          # (there is a slight race condition here, but it's not very probable,
4692
          # and we have no other way to check)
4693
          current_mem = 0
4694
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4695
                    nodeinfo[pnode]['memory_free'])
4696
        if miss_mem > 0:
4697
          raise errors.OpPrereqError("This change will prevent the instance"
4698
                                     " from starting, due to %d MB of memory"
4699
                                     " missing on its primary node" % miss_mem)
4700

    
4701
      if be_new[constants.BE_AUTO_BALANCE]:
4702
        for node in instance.secondary_nodes:
4703
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4704
            self.warn.append("Can't get info from secondary node %s" % node)
4705
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4706
            self.warn.append("Not enough memory to failover instance to"
4707
                             " secondary node %s" % node)
4708

    
4709
    return
4710

    
4711
  def Exec(self, feedback_fn):
4712
    """Modifies an instance.
4713

4714
    All parameters take effect only at the next restart of the instance.
4715
    """
4716
    # Process here the warnings from CheckPrereq, as we don't have a
4717
    # feedback_fn there.
4718
    for warn in self.warn:
4719
      feedback_fn("WARNING: %s" % warn)
4720

    
4721
    result = []
4722
    instance = self.instance
4723
    if self.do_ip:
4724
      instance.nics[0].ip = self.ip
4725
      result.append(("ip", self.ip))
4726
    if self.bridge:
4727
      instance.nics[0].bridge = self.bridge
4728
      result.append(("bridge", self.bridge))
4729
    if self.mac:
4730
      instance.nics[0].mac = self.mac
4731
      result.append(("mac", self.mac))
4732
    if self.op.hvparams:
4733
      instance.hvparams = self.hv_new
4734
      for key, val in self.op.hvparams.iteritems():
4735
        result.append(("hv/%s" % key, val))
4736
    if self.op.beparams:
4737
      instance.beparams = self.be_inst
4738
      for key, val in self.op.beparams.iteritems():
4739
        result.append(("be/%s" % key, val))
4740

    
4741
    self.cfg.Update(instance)
4742

    
4743
    return result
4744

    
4745

    
4746
class LUQueryExports(NoHooksLU):
4747
  """Query the exports list
4748

4749
  """
4750
  _OP_REQP = ['nodes']
4751
  REQ_BGL = False
4752

    
4753
  def ExpandNames(self):
4754
    self.needed_locks = {}
4755
    self.share_locks[locking.LEVEL_NODE] = 1
4756
    if not self.op.nodes:
4757
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4758
    else:
4759
      self.needed_locks[locking.LEVEL_NODE] = \
4760
        _GetWantedNodes(self, self.op.nodes)
4761

    
4762
  def CheckPrereq(self):
4763
    """Check prerequisites.
4764

4765
    """
4766
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4767

    
4768
  def Exec(self, feedback_fn):
4769
    """Compute the list of all the exported system images.
4770

4771
    Returns:
4772
      a dictionary with the structure node->(export-list)
4773
      where export-list is a list of the instances exported on
4774
      that node.
4775

4776
    """
4777
    return self.rpc.call_export_list(self.nodes)
4778

    
4779

    
4780
class LUExportInstance(LogicalUnit):
4781
  """Export an instance to an image in the cluster.
4782

4783
  """
4784
  HPATH = "instance-export"
4785
  HTYPE = constants.HTYPE_INSTANCE
4786
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4787
  REQ_BGL = False
4788

    
4789
  def ExpandNames(self):
4790
    self._ExpandAndLockInstance()
4791
    # FIXME: lock only instance primary and destination node
4792
    #
4793
    # Sad but true, for now we have do lock all nodes, as we don't know where
4794
    # the previous export might be, and and in this LU we search for it and
4795
    # remove it from its current node. In the future we could fix this by:
4796
    #  - making a tasklet to search (share-lock all), then create the new one,
4797
    #    then one to remove, after
4798
    #  - removing the removal operation altoghether
4799
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4800

    
4801
  def DeclareLocks(self, level):
4802
    """Last minute lock declaration."""
4803
    # All nodes are locked anyway, so nothing to do here.
4804

    
4805
  def BuildHooksEnv(self):
4806
    """Build hooks env.
4807

4808
    This will run on the master, primary node and target node.
4809

4810
    """
4811
    env = {
4812
      "EXPORT_NODE": self.op.target_node,
4813
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4814
      }
4815
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4816
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4817
          self.op.target_node]
4818
    return env, nl, nl
4819

    
4820
  def CheckPrereq(self):
4821
    """Check prerequisites.
4822

4823
    This checks that the instance and node names are valid.
4824

4825
    """
4826
    instance_name = self.op.instance_name
4827
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4828
    assert self.instance is not None, \
4829
          "Cannot retrieve locked instance %s" % self.op.instance_name
4830

    
4831
    self.dst_node = self.cfg.GetNodeInfo(
4832
      self.cfg.ExpandNodeName(self.op.target_node))
4833

    
4834
    assert self.dst_node is not None, \
4835
          "Cannot retrieve locked node %s" % self.op.target_node
4836

    
4837
    # instance disk type verification
4838
    for disk in self.instance.disks:
4839
      if disk.dev_type == constants.LD_FILE:
4840
        raise errors.OpPrereqError("Export not supported for instances with"
4841
                                   " file-based disks")
4842

    
4843
  def Exec(self, feedback_fn):
4844
    """Export an instance to an image in the cluster.
4845

4846
    """
4847
    instance = self.instance
4848
    dst_node = self.dst_node
4849
    src_node = instance.primary_node
4850
    if self.op.shutdown:
4851
      # shutdown the instance, but not the disks
4852
      if not self.rpc.call_instance_shutdown(src_node, instance):
4853
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4854
                                 (instance.name, src_node))
4855

    
4856
    vgname = self.cfg.GetVGName()
4857

    
4858
    snap_disks = []
4859

    
4860
    try:
4861
      for disk in instance.disks:
4862
        if disk.iv_name == "sda":
4863
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4864
          new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4865

    
4866
          if not new_dev_name:
4867
            logging.error("Could not snapshot block device %s on node %s",
4868
                          disk.logical_id[1], src_node)
4869
          else:
4870
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4871
                                      logical_id=(vgname, new_dev_name),
4872
                                      physical_id=(vgname, new_dev_name),
4873
                                      iv_name=disk.iv_name)
4874
            snap_disks.append(new_dev)
4875

    
4876
    finally:
4877
      if self.op.shutdown and instance.status == "up":
4878
        if not self.rpc.call_instance_start(src_node, instance, None):
4879
          _ShutdownInstanceDisks(self, instance)
4880
          raise errors.OpExecError("Could not start instance")
4881

    
4882
    # TODO: check for size
4883

    
4884
    cluster_name = self.cfg.GetClusterName()
4885
    for dev in snap_disks:
4886
      if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4887
                                      instance, cluster_name):
4888
        logging.error("Could not export block device %s from node %s to"
4889
                      " node %s", dev.logical_id[1], src_node, dst_node.name)
4890
      if not self.rpc.call_blockdev_remove(src_node, dev):
4891
        logging.error("Could not remove snapshot block device %s from node"
4892
                      " %s", dev.logical_id[1], src_node)
4893

    
4894
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4895
      logging.error("Could not finalize export for instance %s on node %s",
4896
                    instance.name, dst_node.name)
4897

    
4898
    nodelist = self.cfg.GetNodeList()
4899
    nodelist.remove(dst_node.name)
4900

    
4901
    # on one-node clusters nodelist will be empty after the removal
4902
    # if we proceed the backup would be removed because OpQueryExports
4903
    # substitutes an empty list with the full cluster node list.
4904
    if nodelist:
4905
      exportlist = self.rpc.call_export_list(nodelist)
4906
      for node in exportlist:
4907
        if instance.name in exportlist[node]:
4908
          if not self.rpc.call_export_remove(node, instance.name):
4909
            logging.error("Could not remove older export for instance %s"
4910
                          " on node %s", instance.name, node)
4911

    
4912

    
4913
class LURemoveExport(NoHooksLU):
4914
  """Remove exports related to the named instance.
4915

4916
  """
4917
  _OP_REQP = ["instance_name"]
4918
  REQ_BGL = False
4919

    
4920
  def ExpandNames(self):
4921
    self.needed_locks = {}
4922
    # We need all nodes to be locked in order for RemoveExport to work, but we
4923
    # don't need to lock the instance itself, as nothing will happen to it (and
4924
    # we can remove exports also for a removed instance)
4925
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4926

    
4927
  def CheckPrereq(self):
4928
    """Check prerequisites.
4929
    """
4930
    pass
4931

    
4932
  def Exec(self, feedback_fn):
4933
    """Remove any export.
4934

4935
    """
4936
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4937
    # If the instance was not found we'll try with the name that was passed in.
4938
    # This will only work if it was an FQDN, though.
4939
    fqdn_warn = False
4940
    if not instance_name:
4941
      fqdn_warn = True
4942
      instance_name = self.op.instance_name
4943

    
4944
    exportlist = self.rpc.call_export_list(self.acquired_locks[
4945
      locking.LEVEL_NODE])
4946
    found = False
4947
    for node in exportlist:
4948
      if instance_name in exportlist[node]:
4949
        found = True
4950
        if not self.rpc.call_export_remove(node, instance_name):
4951
          logging.error("Could not remove export for instance %s"
4952
                        " on node %s", instance_name, node)
4953

    
4954
    if fqdn_warn and not found:
4955
      feedback_fn("Export not found. If trying to remove an export belonging"
4956
                  " to a deleted instance please use its Fully Qualified"
4957
                  " Domain Name.")
4958

    
4959

    
4960
class TagsLU(NoHooksLU):
4961
  """Generic tags LU.
4962

4963
  This is an abstract class which is the parent of all the other tags LUs.
4964

4965
  """
4966

    
4967
  def ExpandNames(self):
4968
    self.needed_locks = {}
4969
    if self.op.kind == constants.TAG_NODE:
4970
      name = self.cfg.ExpandNodeName(self.op.name)
4971
      if name is None:
4972
        raise errors.OpPrereqError("Invalid node name (%s)" %
4973
                                   (self.op.name,))
4974
      self.op.name = name
4975
      self.needed_locks[locking.LEVEL_NODE] = name
4976
    elif self.op.kind == constants.TAG_INSTANCE:
4977
      name = self.cfg.ExpandInstanceName(self.op.name)
4978
      if name is None:
4979
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4980
                                   (self.op.name,))
4981
      self.op.name = name
4982
      self.needed_locks[locking.LEVEL_INSTANCE] = name
4983

    
4984
  def CheckPrereq(self):
4985
    """Check prerequisites.
4986

4987
    """
4988
    if self.op.kind == constants.TAG_CLUSTER:
4989
      self.target = self.cfg.GetClusterInfo()
4990
    elif self.op.kind == constants.TAG_NODE:
4991
      self.target = self.cfg.GetNodeInfo(self.op.name)
4992
    elif self.op.kind == constants.TAG_INSTANCE:
4993
      self.target = self.cfg.GetInstanceInfo(self.op.name)
4994
    else:
4995
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4996
                                 str(self.op.kind))
4997

    
4998

    
4999
class LUGetTags(TagsLU):
5000
  """Returns the tags of a given object.
5001

5002
  """
5003
  _OP_REQP = ["kind", "name"]
5004
  REQ_BGL = False
5005

    
5006
  def Exec(self, feedback_fn):
5007
    """Returns the tag list.
5008

5009
    """
5010
    return list(self.target.GetTags())
5011

    
5012

    
5013
class LUSearchTags(NoHooksLU):
5014
  """Searches the tags for a given pattern.
5015

5016
  """
5017
  _OP_REQP = ["pattern"]
5018
  REQ_BGL = False
5019

    
5020
  def ExpandNames(self):
5021
    self.needed_locks = {}
5022

    
5023
  def CheckPrereq(self):
5024
    """Check prerequisites.
5025

5026
    This checks the pattern passed for validity by compiling it.
5027

5028
    """
5029
    try:
5030
      self.re = re.compile(self.op.pattern)
5031
    except re.error, err:
5032
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5033
                                 (self.op.pattern, err))
5034

    
5035
  def Exec(self, feedback_fn):
5036
    """Returns the tag list.
5037

5038
    """
5039
    cfg = self.cfg
5040
    tgts = [("/cluster", cfg.GetClusterInfo())]
5041
    ilist = cfg.GetAllInstancesInfo().values()
5042
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5043
    nlist = cfg.GetAllNodesInfo().values()
5044
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5045
    results = []
5046
    for path, target in tgts:
5047
      for tag in target.GetTags():
5048
        if self.re.search(tag):
5049
          results.append((path, tag))
5050
    return results
5051

    
5052

    
5053
class LUAddTags(TagsLU):
5054
  """Sets a tag on a given object.
5055

5056
  """
5057
  _OP_REQP = ["kind", "name", "tags"]
5058
  REQ_BGL = False
5059

    
5060
  def CheckPrereq(self):
5061
    """Check prerequisites.
5062

5063
    This checks the type and length of the tag name and value.
5064

5065
    """
5066
    TagsLU.CheckPrereq(self)
5067
    for tag in self.op.tags:
5068
      objects.TaggableObject.ValidateTag(tag)
5069

    
5070
  def Exec(self, feedback_fn):
5071
    """Sets the tag.
5072

5073
    """
5074
    try:
5075
      for tag in self.op.tags:
5076
        self.target.AddTag(tag)
5077
    except errors.TagError, err:
5078
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5079
    try:
5080
      self.cfg.Update(self.target)
5081
    except errors.ConfigurationError:
5082
      raise errors.OpRetryError("There has been a modification to the"
5083
                                " config file and the operation has been"
5084
                                " aborted. Please retry.")
5085

    
5086

    
5087
class LUDelTags(TagsLU):
5088
  """Delete a list of tags from a given object.
5089

5090
  """
5091
  _OP_REQP = ["kind", "name", "tags"]
5092
  REQ_BGL = False
5093

    
5094
  def CheckPrereq(self):
5095
    """Check prerequisites.
5096

5097
    This checks that we have the given tag.
5098

5099
    """
5100
    TagsLU.CheckPrereq(self)
5101
    for tag in self.op.tags:
5102
      objects.TaggableObject.ValidateTag(tag)
5103
    del_tags = frozenset(self.op.tags)
5104
    cur_tags = self.target.GetTags()
5105
    if not del_tags <= cur_tags:
5106
      diff_tags = del_tags - cur_tags
5107
      diff_names = ["'%s'" % tag for tag in diff_tags]
5108
      diff_names.sort()
5109
      raise errors.OpPrereqError("Tag(s) %s not found" %
5110
                                 (",".join(diff_names)))
5111

    
5112
  def Exec(self, feedback_fn):
5113
    """Remove the tag from the object.
5114

5115
    """
5116
    for tag in self.op.tags:
5117
      self.target.RemoveTag(tag)
5118
    try:
5119
      self.cfg.Update(self.target)
5120
    except errors.ConfigurationError:
5121
      raise errors.OpRetryError("There has been a modification to the"
5122
                                " config file and the operation has been"
5123
                                " aborted. Please retry.")
5124

    
5125

    
5126
class LUTestDelay(NoHooksLU):
5127
  """Sleep for a specified amount of time.
5128

5129
  This LU sleeps on the master and/or nodes for a specified amount of
5130
  time.
5131

5132
  """
5133
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5134
  REQ_BGL = False
5135

    
5136
  def ExpandNames(self):
5137
    """Expand names and set required locks.
5138

5139
    This expands the node list, if any.
5140

5141
    """
5142
    self.needed_locks = {}
5143
    if self.op.on_nodes:
5144
      # _GetWantedNodes can be used here, but is not always appropriate to use
5145
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5146
      # more information.
5147
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5148
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5149

    
5150
  def CheckPrereq(self):
5151
    """Check prerequisites.
5152

5153
    """
5154

    
5155
  def Exec(self, feedback_fn):
5156
    """Do the actual sleep.
5157

5158
    """
5159
    if self.op.on_master:
5160
      if not utils.TestDelay(self.op.duration):
5161
        raise errors.OpExecError("Error during master delay test")
5162
    if self.op.on_nodes:
5163
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5164
      if not result:
5165
        raise errors.OpExecError("Complete failure from rpc call")
5166
      for node, node_result in result.items():
5167
        if not node_result:
5168
          raise errors.OpExecError("Failure during rpc call to node %s,"
5169
                                   " result: %s" % (node, node_result))
5170

    
5171

    
5172
class IAllocator(object):
5173
  """IAllocator framework.
5174

5175
  An IAllocator instance has three sets of attributes:
5176
    - cfg that is needed to query the cluster
5177
    - input data (all members of the _KEYS class attribute are required)
5178
    - four buffer attributes (in|out_data|text), that represent the
5179
      input (to the external script) in text and data structure format,
5180
      and the output from it, again in two formats
5181
    - the result variables from the script (success, info, nodes) for
5182
      easy usage
5183

5184
  """
5185
  _ALLO_KEYS = [
5186
    "mem_size", "disks", "disk_template",
5187
    "os", "tags", "nics", "vcpus",
5188
    ]
5189
  _RELO_KEYS = [
5190
    "relocate_from",
5191
    ]
5192

    
5193
  def __init__(self, lu, mode, name, **kwargs):
5194
    self.lu = lu
5195
    # init buffer variables
5196
    self.in_text = self.out_text = self.in_data = self.out_data = None
5197
    # init all input fields so that pylint is happy
5198
    self.mode = mode
5199
    self.name = name
5200
    self.mem_size = self.disks = self.disk_template = None
5201
    self.os = self.tags = self.nics = self.vcpus = None
5202
    self.relocate_from = None
5203
    # computed fields
5204
    self.required_nodes = None
5205
    # init result fields
5206
    self.success = self.info = self.nodes = None
5207
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5208
      keyset = self._ALLO_KEYS
5209
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5210
      keyset = self._RELO_KEYS
5211
    else:
5212
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5213
                                   " IAllocator" % self.mode)
5214
    for key in kwargs:
5215
      if key not in keyset:
5216
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5217
                                     " IAllocator" % key)
5218
      setattr(self, key, kwargs[key])
5219
    for key in keyset:
5220
      if key not in kwargs:
5221
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5222
                                     " IAllocator" % key)
5223
    self._BuildInputData()
5224

    
5225
  def _ComputeClusterData(self):
5226
    """Compute the generic allocator input data.
5227

5228
    This is the data that is independent of the actual operation.
5229

5230
    """
5231
    cfg = self.lu.cfg
5232
    cluster_info = cfg.GetClusterInfo()
5233
    # cluster data
5234
    data = {
5235
      "version": 1,
5236
      "cluster_name": cfg.GetClusterName(),
5237
      "cluster_tags": list(cluster_info.GetTags()),
5238
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5239
      # we don't have job IDs
5240
      }
5241

    
5242
    i_list = []
5243
    cluster = self.cfg.GetClusterInfo()
5244
    for iname in cfg.GetInstanceList():
5245
      i_obj = cfg.GetInstanceInfo(iname)
5246
      i_list.append((i_obj, cluster.FillBE(i_obj)))
5247

    
5248
    # node data
5249
    node_results = {}
5250
    node_list = cfg.GetNodeList()
5251
    # FIXME: here we have only one hypervisor information, but
5252
    # instance can belong to different hypervisors
5253
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5254
                                           cfg.GetHypervisorType())
5255
    for nname in node_list:
5256
      ninfo = cfg.GetNodeInfo(nname)
5257
      if nname not in node_data or not isinstance(node_data[nname], dict):
5258
        raise errors.OpExecError("Can't get data for node %s" % nname)
5259
      remote_info = node_data[nname]
5260
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5261
                   'vg_size', 'vg_free', 'cpu_total']:
5262
        if attr not in remote_info:
5263
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5264
                                   (nname, attr))
5265
        try:
5266
          remote_info[attr] = int(remote_info[attr])
5267
        except ValueError, err:
5268
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5269
                                   " %s" % (nname, attr, str(err)))
5270
      # compute memory used by primary instances
5271
      i_p_mem = i_p_up_mem = 0
5272
      for iinfo, beinfo in i_list:
5273
        if iinfo.primary_node == nname:
5274
          i_p_mem += beinfo[constants.BE_MEMORY]
5275
          if iinfo.status == "up":
5276
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5277

    
5278
      # compute memory used by instances
5279
      pnr = {
5280
        "tags": list(ninfo.GetTags()),
5281
        "total_memory": remote_info['memory_total'],
5282
        "reserved_memory": remote_info['memory_dom0'],
5283
        "free_memory": remote_info['memory_free'],
5284
        "i_pri_memory": i_p_mem,
5285
        "i_pri_up_memory": i_p_up_mem,
5286
        "total_disk": remote_info['vg_size'],
5287
        "free_disk": remote_info['vg_free'],
5288
        "primary_ip": ninfo.primary_ip,
5289
        "secondary_ip": ninfo.secondary_ip,
5290
        "total_cpus": remote_info['cpu_total'],
5291
        }
5292
      node_results[nname] = pnr
5293
    data["nodes"] = node_results
5294

    
5295
    # instance data
5296
    instance_data = {}
5297
    for iinfo, beinfo in i_list:
5298
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5299
                  for n in iinfo.nics]
5300
      pir = {
5301
        "tags": list(iinfo.GetTags()),
5302
        "should_run": iinfo.status == "up",
5303
        "vcpus": beinfo[constants.BE_VCPUS],
5304
        "memory": beinfo[constants.BE_MEMORY],
5305
        "os": iinfo.os,
5306
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5307
        "nics": nic_data,
5308
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5309
        "disk_template": iinfo.disk_template,
5310
        "hypervisor": iinfo.hypervisor,
5311
        }
5312
      instance_data[iinfo.name] = pir
5313

    
5314
    data["instances"] = instance_data
5315

    
5316
    self.in_data = data
5317

    
5318
  def _AddNewInstance(self):
5319
    """Add new instance data to allocator structure.
5320

5321
    This in combination with _AllocatorGetClusterData will create the
5322
    correct structure needed as input for the allocator.
5323

5324
    The checks for the completeness of the opcode must have already been
5325
    done.
5326

5327
    """
5328
    data = self.in_data
5329
    if len(self.disks) != 2:
5330
      raise errors.OpExecError("Only two-disk configurations supported")
5331

    
5332
    disk_space = _ComputeDiskSize(self.disk_template,
5333
                                  self.disks[0]["size"], self.disks[1]["size"])
5334

    
5335
    if self.disk_template in constants.DTS_NET_MIRROR:
5336
      self.required_nodes = 2
5337
    else:
5338
      self.required_nodes = 1
5339
    request = {
5340
      "type": "allocate",
5341
      "name": self.name,
5342
      "disk_template": self.disk_template,
5343
      "tags": self.tags,
5344
      "os": self.os,
5345
      "vcpus": self.vcpus,
5346
      "memory": self.mem_size,
5347
      "disks": self.disks,
5348
      "disk_space_total": disk_space,
5349
      "nics": self.nics,
5350
      "required_nodes": self.required_nodes,
5351
      }
5352
    data["request"] = request
5353

    
5354
  def _AddRelocateInstance(self):
5355
    """Add relocate instance data to allocator structure.
5356

5357
    This in combination with _IAllocatorGetClusterData will create the
5358
    correct structure needed as input for the allocator.
5359

5360
    The checks for the completeness of the opcode must have already been
5361
    done.
5362

5363
    """
5364
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5365
    if instance is None:
5366
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5367
                                   " IAllocator" % self.name)
5368

    
5369
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5370
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5371

    
5372
    if len(instance.secondary_nodes) != 1:
5373
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5374

    
5375
    self.required_nodes = 1
5376

    
5377
    disk_space = _ComputeDiskSize(instance.disk_template,
5378
                                  instance.disks[0].size,
5379
                                  instance.disks[1].size)
5380

    
5381
    request = {
5382
      "type": "relocate",
5383
      "name": self.name,
5384
      "disk_space_total": disk_space,
5385
      "required_nodes": self.required_nodes,
5386
      "relocate_from": self.relocate_from,
5387
      }
5388
    self.in_data["request"] = request
5389

    
5390
  def _BuildInputData(self):
5391
    """Build input data structures.
5392

5393
    """
5394
    self._ComputeClusterData()
5395

    
5396
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5397
      self._AddNewInstance()
5398
    else:
5399
      self._AddRelocateInstance()
5400

    
5401
    self.in_text = serializer.Dump(self.in_data)
5402

    
5403
  def Run(self, name, validate=True, call_fn=None):
5404
    """Run an instance allocator and return the results.
5405

5406
    """
5407
    if call_fn is None:
5408
      call_fn = self.lu.rpc.call_iallocator_runner
5409
    data = self.in_text
5410

    
5411
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5412

    
5413
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5414
      raise errors.OpExecError("Invalid result from master iallocator runner")
5415

    
5416
    rcode, stdout, stderr, fail = result
5417

    
5418
    if rcode == constants.IARUN_NOTFOUND:
5419
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5420
    elif rcode == constants.IARUN_FAILURE:
5421
      raise errors.OpExecError("Instance allocator call failed: %s,"
5422
                               " output: %s" % (fail, stdout+stderr))
5423
    self.out_text = stdout
5424
    if validate:
5425
      self._ValidateResult()
5426

    
5427
  def _ValidateResult(self):
5428
    """Process the allocator results.
5429

5430
    This will process and if successful save the result in
5431
    self.out_data and the other parameters.
5432

5433
    """
5434
    try:
5435
      rdict = serializer.Load(self.out_text)
5436
    except Exception, err:
5437
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5438

    
5439
    if not isinstance(rdict, dict):
5440
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5441

    
5442
    for key in "success", "info", "nodes":
5443
      if key not in rdict:
5444
        raise errors.OpExecError("Can't parse iallocator results:"
5445
                                 " missing key '%s'" % key)
5446
      setattr(self, key, rdict[key])
5447

    
5448
    if not isinstance(rdict["nodes"], list):
5449
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5450
                               " is not a list")
5451
    self.out_data = rdict
5452

    
5453

    
5454
class LUTestAllocator(NoHooksLU):
5455
  """Run allocator tests.
5456

5457
  This LU runs the allocator tests
5458

5459
  """
5460
  _OP_REQP = ["direction", "mode", "name"]
5461

    
5462
  def CheckPrereq(self):
5463
    """Check prerequisites.
5464

5465
    This checks the opcode parameters depending on the director and mode test.
5466

5467
    """
5468
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5469
      for attr in ["name", "mem_size", "disks", "disk_template",
5470
                   "os", "tags", "nics", "vcpus"]:
5471
        if not hasattr(self.op, attr):
5472
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5473
                                     attr)
5474
      iname = self.cfg.ExpandInstanceName(self.op.name)
5475
      if iname is not None:
5476
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5477
                                   iname)
5478
      if not isinstance(self.op.nics, list):
5479
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5480
      for row in self.op.nics:
5481
        if (not isinstance(row, dict) or
5482
            "mac" not in row or
5483
            "ip" not in row or
5484
            "bridge" not in row):
5485
          raise errors.OpPrereqError("Invalid contents of the"
5486
                                     " 'nics' parameter")
5487
      if not isinstance(self.op.disks, list):
5488
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5489
      if len(self.op.disks) != 2:
5490
        raise errors.OpPrereqError("Only two-disk configurations supported")
5491
      for row in self.op.disks:
5492
        if (not isinstance(row, dict) or
5493
            "size" not in row or
5494
            not isinstance(row["size"], int) or
5495
            "mode" not in row or
5496
            row["mode"] not in ['r', 'w']):
5497
          raise errors.OpPrereqError("Invalid contents of the"
5498
                                     " 'disks' parameter")
5499
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5500
      if not hasattr(self.op, "name"):
5501
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5502
      fname = self.cfg.ExpandInstanceName(self.op.name)
5503
      if fname is None:
5504
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5505
                                   self.op.name)
5506
      self.op.name = fname
5507
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5508
    else:
5509
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5510
                                 self.op.mode)
5511

    
5512
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5513
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5514
        raise errors.OpPrereqError("Missing allocator name")
5515
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5516
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5517
                                 self.op.direction)
5518

    
5519
  def Exec(self, feedback_fn):
5520
    """Run the allocator test.
5521

5522
    """
5523
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5524
      ial = IAllocator(self,
5525
                       mode=self.op.mode,
5526
                       name=self.op.name,
5527
                       mem_size=self.op.mem_size,
5528
                       disks=self.op.disks,
5529
                       disk_template=self.op.disk_template,
5530
                       os=self.op.os,
5531
                       tags=self.op.tags,
5532
                       nics=self.op.nics,
5533
                       vcpus=self.op.vcpus,
5534
                       )
5535
    else:
5536
      ial = IAllocator(self,
5537
                       mode=self.op.mode,
5538
                       name=self.op.name,
5539
                       relocate_from=list(self.relocate_from),
5540
                       )
5541

    
5542
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5543
      result = ial.in_text
5544
    else:
5545
      ial.Run(self.op.allocator, validate=False)
5546
      result = ial.out_text
5547
    return result