Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 02691904

History | View | Annotate | Download (190.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_MASTER = True
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90

    
91
    for attr_name in self._OP_REQP:
92
      attr_val = getattr(op, attr_name, None)
93
      if attr_val is None:
94
        raise errors.OpPrereqError("Required parameter '%s' missing" %
95
                                   attr_name)
96

    
97
    if not self.cfg.IsCluster():
98
      raise errors.OpPrereqError("Cluster not initialized yet,"
99
                                 " use 'gnt-cluster init' first.")
100
    if self.REQ_MASTER:
101
      master = self.cfg.GetMasterNode()
102
      if master != utils.HostInfo().name:
103
        raise errors.OpPrereqError("Commands must be run on the master"
104
                                   " node %s" % master)
105

    
106
  def __GetSSH(self):
107
    """Returns the SshRunner object
108

109
    """
110
    if not self.__ssh:
111
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
112
    return self.__ssh
113

    
114
  ssh = property(fget=__GetSSH)
115

    
116
  def ExpandNames(self):
117
    """Expand names for this LU.
118

119
    This method is called before starting to execute the opcode, and it should
120
    update all the parameters of the opcode to their canonical form (e.g. a
121
    short node name must be fully expanded after this method has successfully
122
    completed). This way locking, hooks, logging, ecc. can work correctly.
123

124
    LUs which implement this method must also populate the self.needed_locks
125
    member, as a dict with lock levels as keys, and a list of needed lock names
126
    as values. Rules:
127
      - Use an empty dict if you don't need any lock
128
      - If you don't need any lock at a particular level omit that level
129
      - Don't put anything for the BGL level
130
      - If you want all locks at a level use locking.ALL_SET as a value
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: locking.ALL_SET,
140
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265
  def _LockInstancesNodes(self, primary_only=False):
266
    """Helper function to declare instances' nodes for locking.
267

268
    This function should be called after locking one or more instances to lock
269
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270
    with all primary or secondary nodes for instances already locked and
271
    present in self.needed_locks[locking.LEVEL_INSTANCE].
272

273
    It should be called from DeclareLocks, and for safety only works if
274
    self.recalculate_locks[locking.LEVEL_NODE] is set.
275

276
    In the future it may grow parameters to just lock some instance's nodes, or
277
    to just lock primaries or secondary nodes, if needed.
278

279
    If should be called in DeclareLocks in a way similar to:
280

281
    if level == locking.LEVEL_NODE:
282
      self._LockInstancesNodes()
283

284
    @type primary_only: boolean
285
    @param primary_only: only lock primary nodes of locked instances
286

287
    """
288
    assert locking.LEVEL_NODE in self.recalculate_locks, \
289
      "_LockInstancesNodes helper function called with no nodes to recalculate"
290

    
291
    # TODO: check if we're really been called with the instance locks held
292

    
293
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294
    # future we might want to have different behaviors depending on the value
295
    # of self.recalculate_locks[locking.LEVEL_NODE]
296
    wanted_nodes = []
297
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298
      instance = self.context.cfg.GetInstanceInfo(instance_name)
299
      wanted_nodes.append(instance.primary_node)
300
      if not primary_only:
301
        wanted_nodes.extend(instance.secondary_nodes)
302

    
303
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
307

    
308
    del self.recalculate_locks[locking.LEVEL_NODE]
309

    
310

    
311
class NoHooksLU(LogicalUnit):
312
  """Simple LU which runs no hooks.
313

314
  This LU is intended as a parent for other LogicalUnits which will
315
  run no hooks, in order to reduce duplicate code.
316

317
  """
318
  HPATH = None
319
  HTYPE = None
320

    
321

    
322
def _GetWantedNodes(lu, nodes):
323
  """Returns list of checked and expanded node names.
324

325
  Args:
326
    nodes: List of nodes (strings) or None for all
327

328
  """
329
  if not isinstance(nodes, list):
330
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
331

    
332
  if not nodes:
333
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334
      " non-empty list of nodes whose name is to be expanded.")
335

    
336
  wanted = []
337
  for name in nodes:
338
    node = lu.cfg.ExpandNodeName(name)
339
    if node is None:
340
      raise errors.OpPrereqError("No such node name '%s'" % name)
341
    wanted.append(node)
342

    
343
  return utils.NiceSort(wanted)
344

    
345

    
346
def _GetWantedInstances(lu, instances):
347
  """Returns list of checked and expanded instance names.
348

349
  Args:
350
    instances: List of instances (strings) or None for all
351

352
  """
353
  if not isinstance(instances, list):
354
    raise errors.OpPrereqError("Invalid argument type 'instances'")
355

    
356
  if instances:
357
    wanted = []
358

    
359
    for name in instances:
360
      instance = lu.cfg.ExpandInstanceName(name)
361
      if instance is None:
362
        raise errors.OpPrereqError("No such instance name '%s'" % name)
363
      wanted.append(instance)
364

    
365
  else:
366
    wanted = lu.cfg.GetInstanceList()
367
  return utils.NiceSort(wanted)
368

    
369

    
370
def _CheckOutputFields(static, dynamic, selected):
371
  """Checks whether all selected fields are valid.
372

373
  Args:
374
    static: Static fields
375
    dynamic: Dynamic fields
376

377
  """
378
  static_fields = frozenset(static)
379
  dynamic_fields = frozenset(dynamic)
380

    
381
  all_fields = static_fields | dynamic_fields
382

    
383
  if not all_fields.issuperset(selected):
384
    raise errors.OpPrereqError("Unknown output fields selected: %s"
385
                               % ",".join(frozenset(selected).
386
                                          difference(all_fields)))
387

    
388

    
389
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390
                          memory, vcpus, nics):
391
  """Builds instance related env variables for hooks from single variables.
392

393
  Args:
394
    secondary_nodes: List of secondary nodes as strings
395
  """
396
  env = {
397
    "OP_TARGET": name,
398
    "INSTANCE_NAME": name,
399
    "INSTANCE_PRIMARY": primary_node,
400
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401
    "INSTANCE_OS_TYPE": os_type,
402
    "INSTANCE_STATUS": status,
403
    "INSTANCE_MEMORY": memory,
404
    "INSTANCE_VCPUS": vcpus,
405
  }
406

    
407
  if nics:
408
    nic_count = len(nics)
409
    for idx, (ip, bridge, mac) in enumerate(nics):
410
      if ip is None:
411
        ip = ""
412
      env["INSTANCE_NIC%d_IP" % idx] = ip
413
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
415
  else:
416
    nic_count = 0
417

    
418
  env["INSTANCE_NIC_COUNT"] = nic_count
419

    
420
  return env
421

    
422

    
423
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
424
  """Builds instance related env variables for hooks from an object.
425

426
  Args:
427
    instance: objects.Instance object of instance
428
    override: dict of values to override
429
  """
430
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
431
  args = {
432
    'name': instance.name,
433
    'primary_node': instance.primary_node,
434
    'secondary_nodes': instance.secondary_nodes,
435
    'os_type': instance.os,
436
    'status': instance.os,
437
    'memory': bep[constants.BE_MEMORY],
438
    'vcpus': bep[constants.BE_VCPUS],
439
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
440
  }
441
  if override:
442
    args.update(override)
443
  return _BuildInstanceHookEnv(**args)
444

    
445

    
446
def _CheckInstanceBridgesExist(lu, instance):
447
  """Check that the brigdes needed by an instance exist.
448

449
  """
450
  # check bridges existance
451
  brlist = [nic.bridge for nic in instance.nics]
452
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
453
    raise errors.OpPrereqError("one or more target bridges %s does not"
454
                               " exist on destination node '%s'" %
455
                               (brlist, instance.primary_node))
456

    
457

    
458
class LUDestroyCluster(NoHooksLU):
459
  """Logical unit for destroying the cluster.
460

461
  """
462
  _OP_REQP = []
463

    
464
  def CheckPrereq(self):
465
    """Check prerequisites.
466

467
    This checks whether the cluster is empty.
468

469
    Any errors are signalled by raising errors.OpPrereqError.
470

471
    """
472
    master = self.cfg.GetMasterNode()
473

    
474
    nodelist = self.cfg.GetNodeList()
475
    if len(nodelist) != 1 or nodelist[0] != master:
476
      raise errors.OpPrereqError("There are still %d node(s) in"
477
                                 " this cluster." % (len(nodelist) - 1))
478
    instancelist = self.cfg.GetInstanceList()
479
    if instancelist:
480
      raise errors.OpPrereqError("There are still %d instance(s) in"
481
                                 " this cluster." % len(instancelist))
482

    
483
  def Exec(self, feedback_fn):
484
    """Destroys the cluster.
485

486
    """
487
    master = self.cfg.GetMasterNode()
488
    if not self.rpc.call_node_stop_master(master, False):
489
      raise errors.OpExecError("Could not disable the master role")
490
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
491
    utils.CreateBackup(priv_key)
492
    utils.CreateBackup(pub_key)
493
    return master
494

    
495

    
496
class LUVerifyCluster(LogicalUnit):
497
  """Verifies the cluster status.
498

499
  """
500
  HPATH = "cluster-verify"
501
  HTYPE = constants.HTYPE_CLUSTER
502
  _OP_REQP = ["skip_checks"]
503
  REQ_BGL = False
504

    
505
  def ExpandNames(self):
506
    self.needed_locks = {
507
      locking.LEVEL_NODE: locking.ALL_SET,
508
      locking.LEVEL_INSTANCE: locking.ALL_SET,
509
    }
510
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
511

    
512
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
513
                  remote_version, feedback_fn):
514
    """Run multiple tests against a node.
515

516
    Test list:
517
      - compares ganeti version
518
      - checks vg existance and size > 20G
519
      - checks config file checksum
520
      - checks ssh to other nodes
521

522
    Args:
523
      node: name of the node to check
524
      file_list: required list of files
525
      local_cksum: dictionary of local files and their checksums
526

527
    """
528
    # compares ganeti version
529
    local_version = constants.PROTOCOL_VERSION
530
    if not remote_version:
531
      feedback_fn("  - ERROR: connection to %s failed" % (node))
532
      return True
533

    
534
    if local_version != remote_version:
535
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
536
                      (local_version, node, remote_version))
537
      return True
538

    
539
    # checks vg existance and size > 20G
540

    
541
    bad = False
542
    if not vglist:
543
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
544
                      (node,))
545
      bad = True
546
    else:
547
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
548
                                            constants.MIN_VG_SIZE)
549
      if vgstatus:
550
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
551
        bad = True
552

    
553
    if not node_result:
554
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
555
      return True
556

    
557
    # checks config file checksum
558
    # checks ssh to any
559

    
560
    if 'filelist' not in node_result:
561
      bad = True
562
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
563
    else:
564
      remote_cksum = node_result['filelist']
565
      for file_name in file_list:
566
        if file_name not in remote_cksum:
567
          bad = True
568
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
569
        elif remote_cksum[file_name] != local_cksum[file_name]:
570
          bad = True
571
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
572

    
573
    if 'nodelist' not in node_result:
574
      bad = True
575
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
576
    else:
577
      if node_result['nodelist']:
578
        bad = True
579
        for node in node_result['nodelist']:
580
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
581
                          (node, node_result['nodelist'][node]))
582
    if 'node-net-test' not in node_result:
583
      bad = True
584
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
585
    else:
586
      if node_result['node-net-test']:
587
        bad = True
588
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
589
        for node in nlist:
590
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
591
                          (node, node_result['node-net-test'][node]))
592

    
593
    hyp_result = node_result.get('hypervisor', None)
594
    if isinstance(hyp_result, dict):
595
      for hv_name, hv_result in hyp_result.iteritems():
596
        if hv_result is not None:
597
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
598
                      (hv_name, hv_result))
599
    return bad
600

    
601
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
602
                      node_instance, feedback_fn):
603
    """Verify an instance.
604

605
    This function checks to see if the required block devices are
606
    available on the instance's node.
607

608
    """
609
    bad = False
610

    
611
    node_current = instanceconfig.primary_node
612

    
613
    node_vol_should = {}
614
    instanceconfig.MapLVsByNode(node_vol_should)
615

    
616
    for node in node_vol_should:
617
      for volume in node_vol_should[node]:
618
        if node not in node_vol_is or volume not in node_vol_is[node]:
619
          feedback_fn("  - ERROR: volume %s missing on node %s" %
620
                          (volume, node))
621
          bad = True
622

    
623
    if not instanceconfig.status == 'down':
624
      if (node_current not in node_instance or
625
          not instance in node_instance[node_current]):
626
        feedback_fn("  - ERROR: instance %s not running on node %s" %
627
                        (instance, node_current))
628
        bad = True
629

    
630
    for node in node_instance:
631
      if (not node == node_current):
632
        if instance in node_instance[node]:
633
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
634
                          (instance, node))
635
          bad = True
636

    
637
    return bad
638

    
639
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
640
    """Verify if there are any unknown volumes in the cluster.
641

642
    The .os, .swap and backup volumes are ignored. All other volumes are
643
    reported as unknown.
644

645
    """
646
    bad = False
647

    
648
    for node in node_vol_is:
649
      for volume in node_vol_is[node]:
650
        if node not in node_vol_should or volume not in node_vol_should[node]:
651
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
652
                      (volume, node))
653
          bad = True
654
    return bad
655

    
656
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
657
    """Verify the list of running instances.
658

659
    This checks what instances are running but unknown to the cluster.
660

661
    """
662
    bad = False
663
    for node in node_instance:
664
      for runninginstance in node_instance[node]:
665
        if runninginstance not in instancelist:
666
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
667
                          (runninginstance, node))
668
          bad = True
669
    return bad
670

    
671
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
672
    """Verify N+1 Memory Resilience.
673

674
    Check that if one single node dies we can still start all the instances it
675
    was primary for.
676

677
    """
678
    bad = False
679

    
680
    for node, nodeinfo in node_info.iteritems():
681
      # This code checks that every node which is now listed as secondary has
682
      # enough memory to host all instances it is supposed to should a single
683
      # other node in the cluster fail.
684
      # FIXME: not ready for failover to an arbitrary node
685
      # FIXME: does not support file-backed instances
686
      # WARNING: we currently take into account down instances as well as up
687
      # ones, considering that even if they're down someone might want to start
688
      # them even in the event of a node failure.
689
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
690
        needed_mem = 0
691
        for instance in instances:
692
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
693
          if bep[constants.BE_AUTO_BALANCE]:
694
            needed_mem += bep[constants.BE_MEMORY]
695
        if nodeinfo['mfree'] < needed_mem:
696
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
697
                      " failovers should node %s fail" % (node, prinode))
698
          bad = True
699
    return bad
700

    
701
  def CheckPrereq(self):
702
    """Check prerequisites.
703

704
    Transform the list of checks we're going to skip into a set and check that
705
    all its members are valid.
706

707
    """
708
    self.skip_set = frozenset(self.op.skip_checks)
709
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
710
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
711

    
712
  def BuildHooksEnv(self):
713
    """Build hooks env.
714

715
    Cluster-Verify hooks just rone in the post phase and their failure makes
716
    the output be logged in the verify output and the verification to fail.
717

718
    """
719
    all_nodes = self.cfg.GetNodeList()
720
    # TODO: populate the environment with useful information for verify hooks
721
    env = {}
722
    return env, [], all_nodes
723

    
724
  def Exec(self, feedback_fn):
725
    """Verify integrity of cluster, performing various test on nodes.
726

727
    """
728
    bad = False
729
    feedback_fn("* Verifying global settings")
730
    for msg in self.cfg.VerifyConfig():
731
      feedback_fn("  - ERROR: %s" % msg)
732

    
733
    vg_name = self.cfg.GetVGName()
734
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
735
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
736
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
737
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
738
    i_non_redundant = [] # Non redundant instances
739
    i_non_a_balanced = [] # Non auto-balanced instances
740
    node_volume = {}
741
    node_instance = {}
742
    node_info = {}
743
    instance_cfg = {}
744

    
745
    # FIXME: verify OS list
746
    # do local checksums
747
    file_names = []
748
    file_names.append(constants.SSL_CERT_FILE)
749
    file_names.append(constants.CLUSTER_CONF_FILE)
750
    local_checksums = utils.FingerprintFiles(file_names)
751

    
752
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
753
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
754
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
755
    all_vglist = self.rpc.call_vg_list(nodelist)
756
    node_verify_param = {
757
      'filelist': file_names,
758
      'nodelist': nodelist,
759
      'hypervisor': hypervisors,
760
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
761
                        for node in nodeinfo]
762
      }
763
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
764
                                           self.cfg.GetClusterName())
765
    all_rversion = self.rpc.call_version(nodelist)
766
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
767
                                        self.cfg.GetHypervisorType())
768

    
769
    cluster = self.cfg.GetClusterInfo()
770
    for node in nodelist:
771
      feedback_fn("* Verifying node %s" % node)
772
      result = self._VerifyNode(node, file_names, local_checksums,
773
                                all_vglist[node], all_nvinfo[node],
774
                                all_rversion[node], feedback_fn)
775
      bad = bad or result
776

    
777
      # node_volume
778
      volumeinfo = all_volumeinfo[node]
779

    
780
      if isinstance(volumeinfo, basestring):
781
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
782
                    (node, volumeinfo[-400:].encode('string_escape')))
783
        bad = True
784
        node_volume[node] = {}
785
      elif not isinstance(volumeinfo, dict):
786
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
787
        bad = True
788
        continue
789
      else:
790
        node_volume[node] = volumeinfo
791

    
792
      # node_instance
793
      nodeinstance = all_instanceinfo[node]
794
      if type(nodeinstance) != list:
795
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
796
        bad = True
797
        continue
798

    
799
      node_instance[node] = nodeinstance
800

    
801
      # node_info
802
      nodeinfo = all_ninfo[node]
803
      if not isinstance(nodeinfo, dict):
804
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
805
        bad = True
806
        continue
807

    
808
      try:
809
        node_info[node] = {
810
          "mfree": int(nodeinfo['memory_free']),
811
          "dfree": int(nodeinfo['vg_free']),
812
          "pinst": [],
813
          "sinst": [],
814
          # dictionary holding all instances this node is secondary for,
815
          # grouped by their primary node. Each key is a cluster node, and each
816
          # value is a list of instances which have the key as primary and the
817
          # current node as secondary.  this is handy to calculate N+1 memory
818
          # availability if you can only failover from a primary to its
819
          # secondary.
820
          "sinst-by-pnode": {},
821
        }
822
      except ValueError:
823
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
824
        bad = True
825
        continue
826

    
827
    node_vol_should = {}
828

    
829
    for instance in instancelist:
830
      feedback_fn("* Verifying instance %s" % instance)
831
      inst_config = self.cfg.GetInstanceInfo(instance)
832
      result =  self._VerifyInstance(instance, inst_config, node_volume,
833
                                     node_instance, feedback_fn)
834
      bad = bad or result
835

    
836
      inst_config.MapLVsByNode(node_vol_should)
837

    
838
      instance_cfg[instance] = inst_config
839

    
840
      pnode = inst_config.primary_node
841
      if pnode in node_info:
842
        node_info[pnode]['pinst'].append(instance)
843
      else:
844
        feedback_fn("  - ERROR: instance %s, connection to primary node"
845
                    " %s failed" % (instance, pnode))
846
        bad = True
847

    
848
      # If the instance is non-redundant we cannot survive losing its primary
849
      # node, so we are not N+1 compliant. On the other hand we have no disk
850
      # templates with more than one secondary so that situation is not well
851
      # supported either.
852
      # FIXME: does not support file-backed instances
853
      if len(inst_config.secondary_nodes) == 0:
854
        i_non_redundant.append(instance)
855
      elif len(inst_config.secondary_nodes) > 1:
856
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
857
                    % instance)
858

    
859
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
860
        i_non_a_balanced.append(instance)
861

    
862
      for snode in inst_config.secondary_nodes:
863
        if snode in node_info:
864
          node_info[snode]['sinst'].append(instance)
865
          if pnode not in node_info[snode]['sinst-by-pnode']:
866
            node_info[snode]['sinst-by-pnode'][pnode] = []
867
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
868
        else:
869
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
870
                      " %s failed" % (instance, snode))
871

    
872
    feedback_fn("* Verifying orphan volumes")
873
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
874
                                       feedback_fn)
875
    bad = bad or result
876

    
877
    feedback_fn("* Verifying remaining instances")
878
    result = self._VerifyOrphanInstances(instancelist, node_instance,
879
                                         feedback_fn)
880
    bad = bad or result
881

    
882
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
883
      feedback_fn("* Verifying N+1 Memory redundancy")
884
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
885
      bad = bad or result
886

    
887
    feedback_fn("* Other Notes")
888
    if i_non_redundant:
889
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
890
                  % len(i_non_redundant))
891

    
892
    if i_non_a_balanced:
893
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
894
                  % len(i_non_a_balanced))
895

    
896
    return not bad
897

    
898
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
899
    """Analize the post-hooks' result, handle it, and send some
900
    nicely-formatted feedback back to the user.
901

902
    Args:
903
      phase: the hooks phase that has just been run
904
      hooks_results: the results of the multi-node hooks rpc call
905
      feedback_fn: function to send feedback back to the caller
906
      lu_result: previous Exec result
907

908
    """
909
    # We only really run POST phase hooks, and are only interested in
910
    # their results
911
    if phase == constants.HOOKS_PHASE_POST:
912
      # Used to change hooks' output to proper indentation
913
      indent_re = re.compile('^', re.M)
914
      feedback_fn("* Hooks Results")
915
      if not hooks_results:
916
        feedback_fn("  - ERROR: general communication failure")
917
        lu_result = 1
918
      else:
919
        for node_name in hooks_results:
920
          show_node_header = True
921
          res = hooks_results[node_name]
922
          if res is False or not isinstance(res, list):
923
            feedback_fn("    Communication failure")
924
            lu_result = 1
925
            continue
926
          for script, hkr, output in res:
927
            if hkr == constants.HKR_FAIL:
928
              # The node header is only shown once, if there are
929
              # failing hooks on that node
930
              if show_node_header:
931
                feedback_fn("  Node %s:" % node_name)
932
                show_node_header = False
933
              feedback_fn("    ERROR: Script %s failed, output:" % script)
934
              output = indent_re.sub('      ', output)
935
              feedback_fn("%s" % output)
936
              lu_result = 1
937

    
938
      return lu_result
939

    
940

    
941
class LUVerifyDisks(NoHooksLU):
942
  """Verifies the cluster disks status.
943

944
  """
945
  _OP_REQP = []
946
  REQ_BGL = False
947

    
948
  def ExpandNames(self):
949
    self.needed_locks = {
950
      locking.LEVEL_NODE: locking.ALL_SET,
951
      locking.LEVEL_INSTANCE: locking.ALL_SET,
952
    }
953
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
954

    
955
  def CheckPrereq(self):
956
    """Check prerequisites.
957

958
    This has no prerequisites.
959

960
    """
961
    pass
962

    
963
  def Exec(self, feedback_fn):
964
    """Verify integrity of cluster disks.
965

966
    """
967
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
968

    
969
    vg_name = self.cfg.GetVGName()
970
    nodes = utils.NiceSort(self.cfg.GetNodeList())
971
    instances = [self.cfg.GetInstanceInfo(name)
972
                 for name in self.cfg.GetInstanceList()]
973

    
974
    nv_dict = {}
975
    for inst in instances:
976
      inst_lvs = {}
977
      if (inst.status != "up" or
978
          inst.disk_template not in constants.DTS_NET_MIRROR):
979
        continue
980
      inst.MapLVsByNode(inst_lvs)
981
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
982
      for node, vol_list in inst_lvs.iteritems():
983
        for vol in vol_list:
984
          nv_dict[(node, vol)] = inst
985

    
986
    if not nv_dict:
987
      return result
988

    
989
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
990

    
991
    to_act = set()
992
    for node in nodes:
993
      # node_volume
994
      lvs = node_lvs[node]
995

    
996
      if isinstance(lvs, basestring):
997
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
998
        res_nlvm[node] = lvs
999
      elif not isinstance(lvs, dict):
1000
        logging.warning("Connection to node %s failed or invalid data"
1001
                        " returned", node)
1002
        res_nodes.append(node)
1003
        continue
1004

    
1005
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1006
        inst = nv_dict.pop((node, lv_name), None)
1007
        if (not lv_online and inst is not None
1008
            and inst.name not in res_instances):
1009
          res_instances.append(inst.name)
1010

    
1011
    # any leftover items in nv_dict are missing LVs, let's arrange the
1012
    # data better
1013
    for key, inst in nv_dict.iteritems():
1014
      if inst.name not in res_missing:
1015
        res_missing[inst.name] = []
1016
      res_missing[inst.name].append(key)
1017

    
1018
    return result
1019

    
1020

    
1021
class LURenameCluster(LogicalUnit):
1022
  """Rename the cluster.
1023

1024
  """
1025
  HPATH = "cluster-rename"
1026
  HTYPE = constants.HTYPE_CLUSTER
1027
  _OP_REQP = ["name"]
1028

    
1029
  def BuildHooksEnv(self):
1030
    """Build hooks env.
1031

1032
    """
1033
    env = {
1034
      "OP_TARGET": self.cfg.GetClusterName(),
1035
      "NEW_NAME": self.op.name,
1036
      }
1037
    mn = self.cfg.GetMasterNode()
1038
    return env, [mn], [mn]
1039

    
1040
  def CheckPrereq(self):
1041
    """Verify that the passed name is a valid one.
1042

1043
    """
1044
    hostname = utils.HostInfo(self.op.name)
1045

    
1046
    new_name = hostname.name
1047
    self.ip = new_ip = hostname.ip
1048
    old_name = self.cfg.GetClusterName()
1049
    old_ip = self.cfg.GetMasterIP()
1050
    if new_name == old_name and new_ip == old_ip:
1051
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1052
                                 " cluster has changed")
1053
    if new_ip != old_ip:
1054
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1055
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1056
                                   " reachable on the network. Aborting." %
1057
                                   new_ip)
1058

    
1059
    self.op.name = new_name
1060

    
1061
  def Exec(self, feedback_fn):
1062
    """Rename the cluster.
1063

1064
    """
1065
    clustername = self.op.name
1066
    ip = self.ip
1067

    
1068
    # shutdown the master IP
1069
    master = self.cfg.GetMasterNode()
1070
    if not self.rpc.call_node_stop_master(master, False):
1071
      raise errors.OpExecError("Could not disable the master role")
1072

    
1073
    try:
1074
      # modify the sstore
1075
      # TODO: sstore
1076
      ss.SetKey(ss.SS_MASTER_IP, ip)
1077
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1078

    
1079
      # Distribute updated ss config to all nodes
1080
      myself = self.cfg.GetNodeInfo(master)
1081
      dist_nodes = self.cfg.GetNodeList()
1082
      if myself.name in dist_nodes:
1083
        dist_nodes.remove(myself.name)
1084

    
1085
      logging.debug("Copying updated ssconf data to all nodes")
1086
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1087
        fname = ss.KeyToFilename(keyname)
1088
        result = self.rpc.call_upload_file(dist_nodes, fname)
1089
        for to_node in dist_nodes:
1090
          if not result[to_node]:
1091
            logging.error("Copy of file %s to node %s failed", fname, to_node)
1092
    finally:
1093
      if not self.rpc.call_node_start_master(master, False):
1094
        logging.error("Could not re-enable the master role on the master,"
1095
                      " please restart manually.")
1096

    
1097

    
1098
def _RecursiveCheckIfLVMBased(disk):
1099
  """Check if the given disk or its children are lvm-based.
1100

1101
  Args:
1102
    disk: ganeti.objects.Disk object
1103

1104
  Returns:
1105
    boolean indicating whether a LD_LV dev_type was found or not
1106

1107
  """
1108
  if disk.children:
1109
    for chdisk in disk.children:
1110
      if _RecursiveCheckIfLVMBased(chdisk):
1111
        return True
1112
  return disk.dev_type == constants.LD_LV
1113

    
1114

    
1115
class LUSetClusterParams(LogicalUnit):
1116
  """Change the parameters of the cluster.
1117

1118
  """
1119
  HPATH = "cluster-modify"
1120
  HTYPE = constants.HTYPE_CLUSTER
1121
  _OP_REQP = []
1122
  REQ_BGL = False
1123

    
1124
  def ExpandNames(self):
1125
    # FIXME: in the future maybe other cluster params won't require checking on
1126
    # all nodes to be modified.
1127
    self.needed_locks = {
1128
      locking.LEVEL_NODE: locking.ALL_SET,
1129
    }
1130
    self.share_locks[locking.LEVEL_NODE] = 1
1131

    
1132
  def BuildHooksEnv(self):
1133
    """Build hooks env.
1134

1135
    """
1136
    env = {
1137
      "OP_TARGET": self.cfg.GetClusterName(),
1138
      "NEW_VG_NAME": self.op.vg_name,
1139
      }
1140
    mn = self.cfg.GetMasterNode()
1141
    return env, [mn], [mn]
1142

    
1143
  def CheckPrereq(self):
1144
    """Check prerequisites.
1145

1146
    This checks whether the given params don't conflict and
1147
    if the given volume group is valid.
1148

1149
    """
1150
    # FIXME: This only works because there is only one parameter that can be
1151
    # changed or removed.
1152
    if self.op.vg_name is not None and not self.op.vg_name:
1153
      instances = self.cfg.GetAllInstancesInfo().values()
1154
      for inst in instances:
1155
        for disk in inst.disks:
1156
          if _RecursiveCheckIfLVMBased(disk):
1157
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1158
                                       " lvm-based instances exist")
1159

    
1160
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1161

    
1162
    # if vg_name not None, checks given volume group on all nodes
1163
    if self.op.vg_name:
1164
      vglist = self.rpc.call_vg_list(node_list)
1165
      for node in node_list:
1166
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1167
                                              constants.MIN_VG_SIZE)
1168
        if vgstatus:
1169
          raise errors.OpPrereqError("Error on node '%s': %s" %
1170
                                     (node, vgstatus))
1171

    
1172
    self.cluster = cluster = self.cfg.GetClusterInfo()
1173
    # beparams changes do not need validation (we can't validate?),
1174
    # but we still process here
1175
    if self.op.beparams:
1176
      self.new_beparams = cluster.FillDict(
1177
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1178

    
1179
    # hypervisor list/parameters
1180
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1181
    if self.op.hvparams:
1182
      if not isinstance(self.op.hvparams, dict):
1183
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1184
      for hv_name, hv_dict in self.op.hvparams.items():
1185
        if hv_name not in self.new_hvparams:
1186
          self.new_hvparams[hv_name] = hv_dict
1187
        else:
1188
          self.new_hvparams[hv_name].update(hv_dict)
1189

    
1190
    if self.op.enabled_hypervisors is not None:
1191
      self.hv_list = self.op.enabled_hypervisors
1192
    else:
1193
      self.hv_list = cluster.enabled_hypervisors
1194

    
1195
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1196
      # either the enabled list has changed, or the parameters have, validate
1197
      for hv_name, hv_params in self.new_hvparams.items():
1198
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1199
            (self.op.enabled_hypervisors and
1200
             hv_name in self.op.enabled_hypervisors)):
1201
          # either this is a new hypervisor, or its parameters have changed
1202
          hv_class = hypervisor.GetHypervisor(hv_name)
1203
          hv_class.CheckParameterSyntax(hv_params)
1204
          _CheckHVParams(self, node_list, hv_name, hv_params)
1205

    
1206
  def Exec(self, feedback_fn):
1207
    """Change the parameters of the cluster.
1208

1209
    """
1210
    if self.op.vg_name is not None:
1211
      if self.op.vg_name != self.cfg.GetVGName():
1212
        self.cfg.SetVGName(self.op.vg_name)
1213
      else:
1214
        feedback_fn("Cluster LVM configuration already in desired"
1215
                    " state, not changing")
1216
    if self.op.hvparams:
1217
      self.cluster.hvparams = self.new_hvparams
1218
    if self.op.enabled_hypervisors is not None:
1219
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1220
    if self.op.beparams:
1221
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1222
    self.cfg.Update(self.cluster)
1223

    
1224

    
1225
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1226
  """Sleep and poll for an instance's disk to sync.
1227

1228
  """
1229
  if not instance.disks:
1230
    return True
1231

    
1232
  if not oneshot:
1233
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1234

    
1235
  node = instance.primary_node
1236

    
1237
  for dev in instance.disks:
1238
    lu.cfg.SetDiskID(dev, node)
1239

    
1240
  retries = 0
1241
  while True:
1242
    max_time = 0
1243
    done = True
1244
    cumul_degraded = False
1245
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1246
    if not rstats:
1247
      lu.proc.LogWarning("Can't get any data from node %s" % node)
1248
      retries += 1
1249
      if retries >= 10:
1250
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1251
                                 " aborting." % node)
1252
      time.sleep(6)
1253
      continue
1254
    retries = 0
1255
    for i in range(len(rstats)):
1256
      mstat = rstats[i]
1257
      if mstat is None:
1258
        lu.proc.LogWarning("Can't compute data for node %s/%s" %
1259
                           (node, instance.disks[i].iv_name))
1260
        continue
1261
      # we ignore the ldisk parameter
1262
      perc_done, est_time, is_degraded, _ = mstat
1263
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1264
      if perc_done is not None:
1265
        done = False
1266
        if est_time is not None:
1267
          rem_time = "%d estimated seconds remaining" % est_time
1268
          max_time = est_time
1269
        else:
1270
          rem_time = "no time estimate"
1271
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1272
                        (instance.disks[i].iv_name, perc_done, rem_time))
1273
    if done or oneshot:
1274
      break
1275

    
1276
    time.sleep(min(60, max_time))
1277

    
1278
  if done:
1279
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1280
  return not cumul_degraded
1281

    
1282

    
1283
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1284
  """Check that mirrors are not degraded.
1285

1286
  The ldisk parameter, if True, will change the test from the
1287
  is_degraded attribute (which represents overall non-ok status for
1288
  the device(s)) to the ldisk (representing the local storage status).
1289

1290
  """
1291
  lu.cfg.SetDiskID(dev, node)
1292
  if ldisk:
1293
    idx = 6
1294
  else:
1295
    idx = 5
1296

    
1297
  result = True
1298
  if on_primary or dev.AssembleOnSecondary():
1299
    rstats = lu.rpc.call_blockdev_find(node, dev)
1300
    if not rstats:
1301
      logging.warning("Node %s: disk degraded, not found or node down", node)
1302
      result = False
1303
    else:
1304
      result = result and (not rstats[idx])
1305
  if dev.children:
1306
    for child in dev.children:
1307
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1308

    
1309
  return result
1310

    
1311

    
1312
class LUDiagnoseOS(NoHooksLU):
1313
  """Logical unit for OS diagnose/query.
1314

1315
  """
1316
  _OP_REQP = ["output_fields", "names"]
1317
  REQ_BGL = False
1318

    
1319
  def ExpandNames(self):
1320
    if self.op.names:
1321
      raise errors.OpPrereqError("Selective OS query not supported")
1322

    
1323
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1324
    _CheckOutputFields(static=[],
1325
                       dynamic=self.dynamic_fields,
1326
                       selected=self.op.output_fields)
1327

    
1328
    # Lock all nodes, in shared mode
1329
    self.needed_locks = {}
1330
    self.share_locks[locking.LEVEL_NODE] = 1
1331
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1332

    
1333
  def CheckPrereq(self):
1334
    """Check prerequisites.
1335

1336
    """
1337

    
1338
  @staticmethod
1339
  def _DiagnoseByOS(node_list, rlist):
1340
    """Remaps a per-node return list into an a per-os per-node dictionary
1341

1342
      Args:
1343
        node_list: a list with the names of all nodes
1344
        rlist: a map with node names as keys and OS objects as values
1345

1346
      Returns:
1347
        map: a map with osnames as keys and as value another map, with
1348
             nodes as
1349
             keys and list of OS objects as values
1350
             e.g. {"debian-etch": {"node1": [<object>,...],
1351
                                   "node2": [<object>,]}
1352
                  }
1353

1354
    """
1355
    all_os = {}
1356
    for node_name, nr in rlist.iteritems():
1357
      if not nr:
1358
        continue
1359
      for os_obj in nr:
1360
        if os_obj.name not in all_os:
1361
          # build a list of nodes for this os containing empty lists
1362
          # for each node in node_list
1363
          all_os[os_obj.name] = {}
1364
          for nname in node_list:
1365
            all_os[os_obj.name][nname] = []
1366
        all_os[os_obj.name][node_name].append(os_obj)
1367
    return all_os
1368

    
1369
  def Exec(self, feedback_fn):
1370
    """Compute the list of OSes.
1371

1372
    """
1373
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1374
    node_data = self.rpc.call_os_diagnose(node_list)
1375
    if node_data == False:
1376
      raise errors.OpExecError("Can't gather the list of OSes")
1377
    pol = self._DiagnoseByOS(node_list, node_data)
1378
    output = []
1379
    for os_name, os_data in pol.iteritems():
1380
      row = []
1381
      for field in self.op.output_fields:
1382
        if field == "name":
1383
          val = os_name
1384
        elif field == "valid":
1385
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1386
        elif field == "node_status":
1387
          val = {}
1388
          for node_name, nos_list in os_data.iteritems():
1389
            val[node_name] = [(v.status, v.path) for v in nos_list]
1390
        else:
1391
          raise errors.ParameterError(field)
1392
        row.append(val)
1393
      output.append(row)
1394

    
1395
    return output
1396

    
1397

    
1398
class LURemoveNode(LogicalUnit):
1399
  """Logical unit for removing a node.
1400

1401
  """
1402
  HPATH = "node-remove"
1403
  HTYPE = constants.HTYPE_NODE
1404
  _OP_REQP = ["node_name"]
1405

    
1406
  def BuildHooksEnv(self):
1407
    """Build hooks env.
1408

1409
    This doesn't run on the target node in the pre phase as a failed
1410
    node would then be impossible to remove.
1411

1412
    """
1413
    env = {
1414
      "OP_TARGET": self.op.node_name,
1415
      "NODE_NAME": self.op.node_name,
1416
      }
1417
    all_nodes = self.cfg.GetNodeList()
1418
    all_nodes.remove(self.op.node_name)
1419
    return env, all_nodes, all_nodes
1420

    
1421
  def CheckPrereq(self):
1422
    """Check prerequisites.
1423

1424
    This checks:
1425
     - the node exists in the configuration
1426
     - it does not have primary or secondary instances
1427
     - it's not the master
1428

1429
    Any errors are signalled by raising errors.OpPrereqError.
1430

1431
    """
1432
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1433
    if node is None:
1434
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1435

    
1436
    instance_list = self.cfg.GetInstanceList()
1437

    
1438
    masternode = self.cfg.GetMasterNode()
1439
    if node.name == masternode:
1440
      raise errors.OpPrereqError("Node is the master node,"
1441
                                 " you need to failover first.")
1442

    
1443
    for instance_name in instance_list:
1444
      instance = self.cfg.GetInstanceInfo(instance_name)
1445
      if node.name == instance.primary_node:
1446
        raise errors.OpPrereqError("Instance %s still running on the node,"
1447
                                   " please remove first." % instance_name)
1448
      if node.name in instance.secondary_nodes:
1449
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1450
                                   " please remove first." % instance_name)
1451
    self.op.node_name = node.name
1452
    self.node = node
1453

    
1454
  def Exec(self, feedback_fn):
1455
    """Removes the node from the cluster.
1456

1457
    """
1458
    node = self.node
1459
    logging.info("Stopping the node daemon and removing configs from node %s",
1460
                 node.name)
1461

    
1462
    self.context.RemoveNode(node.name)
1463

    
1464
    self.rpc.call_node_leave_cluster(node.name)
1465

    
1466

    
1467
class LUQueryNodes(NoHooksLU):
1468
  """Logical unit for querying nodes.
1469

1470
  """
1471
  _OP_REQP = ["output_fields", "names"]
1472
  REQ_BGL = False
1473

    
1474
  def ExpandNames(self):
1475
    self.dynamic_fields = frozenset([
1476
      "dtotal", "dfree",
1477
      "mtotal", "mnode", "mfree",
1478
      "bootid",
1479
      "ctotal",
1480
      ])
1481

    
1482
    self.static_fields = frozenset([
1483
      "name", "pinst_cnt", "sinst_cnt",
1484
      "pinst_list", "sinst_list",
1485
      "pip", "sip", "tags",
1486
      "serial_no",
1487
      ])
1488

    
1489
    _CheckOutputFields(static=self.static_fields,
1490
                       dynamic=self.dynamic_fields,
1491
                       selected=self.op.output_fields)
1492

    
1493
    self.needed_locks = {}
1494
    self.share_locks[locking.LEVEL_NODE] = 1
1495

    
1496
    if self.op.names:
1497
      self.wanted = _GetWantedNodes(self, self.op.names)
1498
    else:
1499
      self.wanted = locking.ALL_SET
1500

    
1501
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1502
    if self.do_locking:
1503
      # if we don't request only static fields, we need to lock the nodes
1504
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1505

    
1506

    
1507
  def CheckPrereq(self):
1508
    """Check prerequisites.
1509

1510
    """
1511
    # The validation of the node list is done in the _GetWantedNodes,
1512
    # if non empty, and if empty, there's no validation to do
1513
    pass
1514

    
1515
  def Exec(self, feedback_fn):
1516
    """Computes the list of nodes and their attributes.
1517

1518
    """
1519
    all_info = self.cfg.GetAllNodesInfo()
1520
    if self.do_locking:
1521
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1522
    elif self.wanted != locking.ALL_SET:
1523
      nodenames = self.wanted
1524
      missing = set(nodenames).difference(all_info.keys())
1525
      if missing:
1526
        raise errors.OpExecError(
1527
          "Some nodes were removed before retrieving their data: %s" % missing)
1528
    else:
1529
      nodenames = all_info.keys()
1530

    
1531
    nodenames = utils.NiceSort(nodenames)
1532
    nodelist = [all_info[name] for name in nodenames]
1533

    
1534
    # begin data gathering
1535

    
1536
    if self.dynamic_fields.intersection(self.op.output_fields):
1537
      live_data = {}
1538
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1539
                                          self.cfg.GetHypervisorType())
1540
      for name in nodenames:
1541
        nodeinfo = node_data.get(name, None)
1542
        if nodeinfo:
1543
          live_data[name] = {
1544
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1545
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1546
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1547
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1548
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1549
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1550
            "bootid": nodeinfo['bootid'],
1551
            }
1552
        else:
1553
          live_data[name] = {}
1554
    else:
1555
      live_data = dict.fromkeys(nodenames, {})
1556

    
1557
    node_to_primary = dict([(name, set()) for name in nodenames])
1558
    node_to_secondary = dict([(name, set()) for name in nodenames])
1559

    
1560
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1561
                             "sinst_cnt", "sinst_list"))
1562
    if inst_fields & frozenset(self.op.output_fields):
1563
      instancelist = self.cfg.GetInstanceList()
1564

    
1565
      for instance_name in instancelist:
1566
        inst = self.cfg.GetInstanceInfo(instance_name)
1567
        if inst.primary_node in node_to_primary:
1568
          node_to_primary[inst.primary_node].add(inst.name)
1569
        for secnode in inst.secondary_nodes:
1570
          if secnode in node_to_secondary:
1571
            node_to_secondary[secnode].add(inst.name)
1572

    
1573
    # end data gathering
1574

    
1575
    output = []
1576
    for node in nodelist:
1577
      node_output = []
1578
      for field in self.op.output_fields:
1579
        if field == "name":
1580
          val = node.name
1581
        elif field == "pinst_list":
1582
          val = list(node_to_primary[node.name])
1583
        elif field == "sinst_list":
1584
          val = list(node_to_secondary[node.name])
1585
        elif field == "pinst_cnt":
1586
          val = len(node_to_primary[node.name])
1587
        elif field == "sinst_cnt":
1588
          val = len(node_to_secondary[node.name])
1589
        elif field == "pip":
1590
          val = node.primary_ip
1591
        elif field == "sip":
1592
          val = node.secondary_ip
1593
        elif field == "tags":
1594
          val = list(node.GetTags())
1595
        elif field == "serial_no":
1596
          val = node.serial_no
1597
        elif field in self.dynamic_fields:
1598
          val = live_data[node.name].get(field, None)
1599
        else:
1600
          raise errors.ParameterError(field)
1601
        node_output.append(val)
1602
      output.append(node_output)
1603

    
1604
    return output
1605

    
1606

    
1607
class LUQueryNodeVolumes(NoHooksLU):
1608
  """Logical unit for getting volumes on node(s).
1609

1610
  """
1611
  _OP_REQP = ["nodes", "output_fields"]
1612
  REQ_BGL = False
1613

    
1614
  def ExpandNames(self):
1615
    _CheckOutputFields(static=["node"],
1616
                       dynamic=["phys", "vg", "name", "size", "instance"],
1617
                       selected=self.op.output_fields)
1618

    
1619
    self.needed_locks = {}
1620
    self.share_locks[locking.LEVEL_NODE] = 1
1621
    if not self.op.nodes:
1622
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1623
    else:
1624
      self.needed_locks[locking.LEVEL_NODE] = \
1625
        _GetWantedNodes(self, self.op.nodes)
1626

    
1627
  def CheckPrereq(self):
1628
    """Check prerequisites.
1629

1630
    This checks that the fields required are valid output fields.
1631

1632
    """
1633
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1634

    
1635
  def Exec(self, feedback_fn):
1636
    """Computes the list of nodes and their attributes.
1637

1638
    """
1639
    nodenames = self.nodes
1640
    volumes = self.rpc.call_node_volumes(nodenames)
1641

    
1642
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1643
             in self.cfg.GetInstanceList()]
1644

    
1645
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1646

    
1647
    output = []
1648
    for node in nodenames:
1649
      if node not in volumes or not volumes[node]:
1650
        continue
1651

    
1652
      node_vols = volumes[node][:]
1653
      node_vols.sort(key=lambda vol: vol['dev'])
1654

    
1655
      for vol in node_vols:
1656
        node_output = []
1657
        for field in self.op.output_fields:
1658
          if field == "node":
1659
            val = node
1660
          elif field == "phys":
1661
            val = vol['dev']
1662
          elif field == "vg":
1663
            val = vol['vg']
1664
          elif field == "name":
1665
            val = vol['name']
1666
          elif field == "size":
1667
            val = int(float(vol['size']))
1668
          elif field == "instance":
1669
            for inst in ilist:
1670
              if node not in lv_by_node[inst]:
1671
                continue
1672
              if vol['name'] in lv_by_node[inst][node]:
1673
                val = inst.name
1674
                break
1675
            else:
1676
              val = '-'
1677
          else:
1678
            raise errors.ParameterError(field)
1679
          node_output.append(str(val))
1680

    
1681
        output.append(node_output)
1682

    
1683
    return output
1684

    
1685

    
1686
class LUAddNode(LogicalUnit):
1687
  """Logical unit for adding node to the cluster.
1688

1689
  """
1690
  HPATH = "node-add"
1691
  HTYPE = constants.HTYPE_NODE
1692
  _OP_REQP = ["node_name"]
1693

    
1694
  def BuildHooksEnv(self):
1695
    """Build hooks env.
1696

1697
    This will run on all nodes before, and on all nodes + the new node after.
1698

1699
    """
1700
    env = {
1701
      "OP_TARGET": self.op.node_name,
1702
      "NODE_NAME": self.op.node_name,
1703
      "NODE_PIP": self.op.primary_ip,
1704
      "NODE_SIP": self.op.secondary_ip,
1705
      }
1706
    nodes_0 = self.cfg.GetNodeList()
1707
    nodes_1 = nodes_0 + [self.op.node_name, ]
1708
    return env, nodes_0, nodes_1
1709

    
1710
  def CheckPrereq(self):
1711
    """Check prerequisites.
1712

1713
    This checks:
1714
     - the new node is not already in the config
1715
     - it is resolvable
1716
     - its parameters (single/dual homed) matches the cluster
1717

1718
    Any errors are signalled by raising errors.OpPrereqError.
1719

1720
    """
1721
    node_name = self.op.node_name
1722
    cfg = self.cfg
1723

    
1724
    dns_data = utils.HostInfo(node_name)
1725

    
1726
    node = dns_data.name
1727
    primary_ip = self.op.primary_ip = dns_data.ip
1728
    secondary_ip = getattr(self.op, "secondary_ip", None)
1729
    if secondary_ip is None:
1730
      secondary_ip = primary_ip
1731
    if not utils.IsValidIP(secondary_ip):
1732
      raise errors.OpPrereqError("Invalid secondary IP given")
1733
    self.op.secondary_ip = secondary_ip
1734

    
1735
    node_list = cfg.GetNodeList()
1736
    if not self.op.readd and node in node_list:
1737
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1738
                                 node)
1739
    elif self.op.readd and node not in node_list:
1740
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1741

    
1742
    for existing_node_name in node_list:
1743
      existing_node = cfg.GetNodeInfo(existing_node_name)
1744

    
1745
      if self.op.readd and node == existing_node_name:
1746
        if (existing_node.primary_ip != primary_ip or
1747
            existing_node.secondary_ip != secondary_ip):
1748
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1749
                                     " address configuration as before")
1750
        continue
1751

    
1752
      if (existing_node.primary_ip == primary_ip or
1753
          existing_node.secondary_ip == primary_ip or
1754
          existing_node.primary_ip == secondary_ip or
1755
          existing_node.secondary_ip == secondary_ip):
1756
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1757
                                   " existing node %s" % existing_node.name)
1758

    
1759
    # check that the type of the node (single versus dual homed) is the
1760
    # same as for the master
1761
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1762
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1763
    newbie_singlehomed = secondary_ip == primary_ip
1764
    if master_singlehomed != newbie_singlehomed:
1765
      if master_singlehomed:
1766
        raise errors.OpPrereqError("The master has no private ip but the"
1767
                                   " new node has one")
1768
      else:
1769
        raise errors.OpPrereqError("The master has a private ip but the"
1770
                                   " new node doesn't have one")
1771

    
1772
    # checks reachablity
1773
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1774
      raise errors.OpPrereqError("Node not reachable by ping")
1775

    
1776
    if not newbie_singlehomed:
1777
      # check reachability from my secondary ip to newbie's secondary ip
1778
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1779
                           source=myself.secondary_ip):
1780
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1781
                                   " based ping to noded port")
1782

    
1783
    self.new_node = objects.Node(name=node,
1784
                                 primary_ip=primary_ip,
1785
                                 secondary_ip=secondary_ip)
1786

    
1787
  def Exec(self, feedback_fn):
1788
    """Adds the new node to the cluster.
1789

1790
    """
1791
    new_node = self.new_node
1792
    node = new_node.name
1793

    
1794
    # check connectivity
1795
    result = self.rpc.call_version([node])[node]
1796
    if result:
1797
      if constants.PROTOCOL_VERSION == result:
1798
        logging.info("Communication to node %s fine, sw version %s match",
1799
                     node, result)
1800
      else:
1801
        raise errors.OpExecError("Version mismatch master version %s,"
1802
                                 " node version %s" %
1803
                                 (constants.PROTOCOL_VERSION, result))
1804
    else:
1805
      raise errors.OpExecError("Cannot get version from the new node")
1806

    
1807
    # setup ssh on node
1808
    logging.info("Copy ssh key to node %s", node)
1809
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1810
    keyarray = []
1811
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1812
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1813
                priv_key, pub_key]
1814

    
1815
    for i in keyfiles:
1816
      f = open(i, 'r')
1817
      try:
1818
        keyarray.append(f.read())
1819
      finally:
1820
        f.close()
1821

    
1822
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1823
                                    keyarray[2],
1824
                                    keyarray[3], keyarray[4], keyarray[5])
1825

    
1826
    if not result:
1827
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1828

    
1829
    # Add node to our /etc/hosts, and add key to known_hosts
1830
    utils.AddHostToEtcHosts(new_node.name)
1831

    
1832
    if new_node.secondary_ip != new_node.primary_ip:
1833
      if not self.rpc.call_node_has_ip_address(new_node.name,
1834
                                               new_node.secondary_ip):
1835
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1836
                                 " you gave (%s). Please fix and re-run this"
1837
                                 " command." % new_node.secondary_ip)
1838

    
1839
    node_verify_list = [self.cfg.GetMasterNode()]
1840
    node_verify_param = {
1841
      'nodelist': [node],
1842
      # TODO: do a node-net-test as well?
1843
    }
1844

    
1845
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1846
                                       self.cfg.GetClusterName())
1847
    for verifier in node_verify_list:
1848
      if not result[verifier]:
1849
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1850
                                 " for remote verification" % verifier)
1851
      if result[verifier]['nodelist']:
1852
        for failed in result[verifier]['nodelist']:
1853
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1854
                      (verifier, result[verifier]['nodelist'][failed]))
1855
        raise errors.OpExecError("ssh/hostname verification failed.")
1856

    
1857
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1858
    # including the node just added
1859
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1860
    dist_nodes = self.cfg.GetNodeList()
1861
    if not self.op.readd:
1862
      dist_nodes.append(node)
1863
    if myself.name in dist_nodes:
1864
      dist_nodes.remove(myself.name)
1865

    
1866
    logging.debug("Copying hosts and known_hosts to all nodes")
1867
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1868
      result = self.rpc.call_upload_file(dist_nodes, fname)
1869
      for to_node in dist_nodes:
1870
        if not result[to_node]:
1871
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1872

    
1873
    to_copy = []
1874
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1875
      to_copy.append(constants.VNC_PASSWORD_FILE)
1876
    for fname in to_copy:
1877
      result = self.rpc.call_upload_file([node], fname)
1878
      if not result[node]:
1879
        logging.error("Could not copy file %s to node %s", fname, node)
1880

    
1881
    if self.op.readd:
1882
      self.context.ReaddNode(new_node)
1883
    else:
1884
      self.context.AddNode(new_node)
1885

    
1886

    
1887
class LUQueryClusterInfo(NoHooksLU):
1888
  """Query cluster configuration.
1889

1890
  """
1891
  _OP_REQP = []
1892
  REQ_MASTER = False
1893
  REQ_BGL = False
1894

    
1895
  def ExpandNames(self):
1896
    self.needed_locks = {}
1897

    
1898
  def CheckPrereq(self):
1899
    """No prerequsites needed for this LU.
1900

1901
    """
1902
    pass
1903

    
1904
  def Exec(self, feedback_fn):
1905
    """Return cluster config.
1906

1907
    """
1908
    cluster = self.cfg.GetClusterInfo()
1909
    result = {
1910
      "software_version": constants.RELEASE_VERSION,
1911
      "protocol_version": constants.PROTOCOL_VERSION,
1912
      "config_version": constants.CONFIG_VERSION,
1913
      "os_api_version": constants.OS_API_VERSION,
1914
      "export_version": constants.EXPORT_VERSION,
1915
      "architecture": (platform.architecture()[0], platform.machine()),
1916
      "name": cluster.cluster_name,
1917
      "master": cluster.master_node,
1918
      "default_hypervisor": cluster.default_hypervisor,
1919
      "enabled_hypervisors": cluster.enabled_hypervisors,
1920
      "hvparams": cluster.hvparams,
1921
      "beparams": cluster.beparams,
1922
      }
1923

    
1924
    return result
1925

    
1926

    
1927
class LUQueryConfigValues(NoHooksLU):
1928
  """Return configuration values.
1929

1930
  """
1931
  _OP_REQP = []
1932
  REQ_BGL = False
1933

    
1934
  def ExpandNames(self):
1935
    self.needed_locks = {}
1936

    
1937
    static_fields = ["cluster_name", "master_node", "drain_flag"]
1938
    _CheckOutputFields(static=static_fields,
1939
                       dynamic=[],
1940
                       selected=self.op.output_fields)
1941

    
1942
  def CheckPrereq(self):
1943
    """No prerequisites.
1944

1945
    """
1946
    pass
1947

    
1948
  def Exec(self, feedback_fn):
1949
    """Dump a representation of the cluster config to the standard output.
1950

1951
    """
1952
    values = []
1953
    for field in self.op.output_fields:
1954
      if field == "cluster_name":
1955
        entry = self.cfg.GetClusterName()
1956
      elif field == "master_node":
1957
        entry = self.cfg.GetMasterNode()
1958
      elif field == "drain_flag":
1959
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1960
      else:
1961
        raise errors.ParameterError(field)
1962
      values.append(entry)
1963
    return values
1964

    
1965

    
1966
class LUActivateInstanceDisks(NoHooksLU):
1967
  """Bring up an instance's disks.
1968

1969
  """
1970
  _OP_REQP = ["instance_name"]
1971
  REQ_BGL = False
1972

    
1973
  def ExpandNames(self):
1974
    self._ExpandAndLockInstance()
1975
    self.needed_locks[locking.LEVEL_NODE] = []
1976
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1977

    
1978
  def DeclareLocks(self, level):
1979
    if level == locking.LEVEL_NODE:
1980
      self._LockInstancesNodes()
1981

    
1982
  def CheckPrereq(self):
1983
    """Check prerequisites.
1984

1985
    This checks that the instance is in the cluster.
1986

1987
    """
1988
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1989
    assert self.instance is not None, \
1990
      "Cannot retrieve locked instance %s" % self.op.instance_name
1991

    
1992
  def Exec(self, feedback_fn):
1993
    """Activate the disks.
1994

1995
    """
1996
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1997
    if not disks_ok:
1998
      raise errors.OpExecError("Cannot activate block devices")
1999

    
2000
    return disks_info
2001

    
2002

    
2003
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2004
  """Prepare the block devices for an instance.
2005

2006
  This sets up the block devices on all nodes.
2007

2008
  Args:
2009
    instance: a ganeti.objects.Instance object
2010
    ignore_secondaries: if true, errors on secondary nodes won't result
2011
                        in an error return from the function
2012

2013
  Returns:
2014
    false if the operation failed
2015
    list of (host, instance_visible_name, node_visible_name) if the operation
2016
         suceeded with the mapping from node devices to instance devices
2017
  """
2018
  device_info = []
2019
  disks_ok = True
2020
  iname = instance.name
2021
  # With the two passes mechanism we try to reduce the window of
2022
  # opportunity for the race condition of switching DRBD to primary
2023
  # before handshaking occured, but we do not eliminate it
2024

    
2025
  # The proper fix would be to wait (with some limits) until the
2026
  # connection has been made and drbd transitions from WFConnection
2027
  # into any other network-connected state (Connected, SyncTarget,
2028
  # SyncSource, etc.)
2029

    
2030
  # 1st pass, assemble on all nodes in secondary mode
2031
  for inst_disk in instance.disks:
2032
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2033
      lu.cfg.SetDiskID(node_disk, node)
2034
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2035
      if not result:
2036
        logging.error("Could not prepare block device %s on node %s"
2037
                      " (is_primary=False, pass=1)", inst_disk.iv_name, node)
2038
        if not ignore_secondaries:
2039
          disks_ok = False
2040

    
2041
  # FIXME: race condition on drbd migration to primary
2042

    
2043
  # 2nd pass, do only the primary node
2044
  for inst_disk in instance.disks:
2045
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2046
      if node != instance.primary_node:
2047
        continue
2048
      lu.cfg.SetDiskID(node_disk, node)
2049
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2050
      if not result:
2051
        logging.error("Could not prepare block device %s on node %s"
2052
                      " (is_primary=True, pass=2)", inst_disk.iv_name, node)
2053
        disks_ok = False
2054
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2055

    
2056
  # leave the disks configured for the primary node
2057
  # this is a workaround that would be fixed better by
2058
  # improving the logical/physical id handling
2059
  for disk in instance.disks:
2060
    lu.cfg.SetDiskID(disk, instance.primary_node)
2061

    
2062
  return disks_ok, device_info
2063

    
2064

    
2065
def _StartInstanceDisks(lu, instance, force):
2066
  """Start the disks of an instance.
2067

2068
  """
2069
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2070
                                           ignore_secondaries=force)
2071
  if not disks_ok:
2072
    _ShutdownInstanceDisks(lu, instance)
2073
    if force is not None and not force:
2074
      logging.error("If the message above refers to a secondary node,"
2075
                    " you can retry the operation using '--force'.")
2076
    raise errors.OpExecError("Disk consistency error")
2077

    
2078

    
2079
class LUDeactivateInstanceDisks(NoHooksLU):
2080
  """Shutdown an instance's disks.
2081

2082
  """
2083
  _OP_REQP = ["instance_name"]
2084
  REQ_BGL = False
2085

    
2086
  def ExpandNames(self):
2087
    self._ExpandAndLockInstance()
2088
    self.needed_locks[locking.LEVEL_NODE] = []
2089
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2090

    
2091
  def DeclareLocks(self, level):
2092
    if level == locking.LEVEL_NODE:
2093
      self._LockInstancesNodes()
2094

    
2095
  def CheckPrereq(self):
2096
    """Check prerequisites.
2097

2098
    This checks that the instance is in the cluster.
2099

2100
    """
2101
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2102
    assert self.instance is not None, \
2103
      "Cannot retrieve locked instance %s" % self.op.instance_name
2104

    
2105
  def Exec(self, feedback_fn):
2106
    """Deactivate the disks
2107

2108
    """
2109
    instance = self.instance
2110
    _SafeShutdownInstanceDisks(self, instance)
2111

    
2112

    
2113
def _SafeShutdownInstanceDisks(lu, instance):
2114
  """Shutdown block devices of an instance.
2115

2116
  This function checks if an instance is running, before calling
2117
  _ShutdownInstanceDisks.
2118

2119
  """
2120
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2121
                                      [instance.hypervisor])
2122
  ins_l = ins_l[instance.primary_node]
2123
  if not type(ins_l) is list:
2124
    raise errors.OpExecError("Can't contact node '%s'" %
2125
                             instance.primary_node)
2126

    
2127
  if instance.name in ins_l:
2128
    raise errors.OpExecError("Instance is running, can't shutdown"
2129
                             " block devices.")
2130

    
2131
  _ShutdownInstanceDisks(lu, instance)
2132

    
2133

    
2134
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2135
  """Shutdown block devices of an instance.
2136

2137
  This does the shutdown on all nodes of the instance.
2138

2139
  If the ignore_primary is false, errors on the primary node are
2140
  ignored.
2141

2142
  """
2143
  result = True
2144
  for disk in instance.disks:
2145
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2146
      lu.cfg.SetDiskID(top_disk, node)
2147
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2148
        logging.error("Could not shutdown block device %s on node %s",
2149
                      disk.iv_name, node)
2150
        if not ignore_primary or node != instance.primary_node:
2151
          result = False
2152
  return result
2153

    
2154

    
2155
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2156
  """Checks if a node has enough free memory.
2157

2158
  This function check if a given node has the needed amount of free
2159
  memory. In case the node has less memory or we cannot get the
2160
  information from the node, this function raise an OpPrereqError
2161
  exception.
2162

2163
  @type lu: C{LogicalUnit}
2164
  @param lu: a logical unit from which we get configuration data
2165
  @type node: C{str}
2166
  @param node: the node to check
2167
  @type reason: C{str}
2168
  @param reason: string to use in the error message
2169
  @type requested: C{int}
2170
  @param requested: the amount of memory in MiB to check for
2171
  @type hypervisor: C{str}
2172
  @param hypervisor: the hypervisor to ask for memory stats
2173
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2174
      we cannot check the node
2175

2176
  """
2177
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2178
  if not nodeinfo or not isinstance(nodeinfo, dict):
2179
    raise errors.OpPrereqError("Could not contact node %s for resource"
2180
                             " information" % (node,))
2181

    
2182
  free_mem = nodeinfo[node].get('memory_free')
2183
  if not isinstance(free_mem, int):
2184
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2185
                             " was '%s'" % (node, free_mem))
2186
  if requested > free_mem:
2187
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2188
                             " needed %s MiB, available %s MiB" %
2189
                             (node, reason, requested, free_mem))
2190

    
2191

    
2192
class LUStartupInstance(LogicalUnit):
2193
  """Starts an instance.
2194

2195
  """
2196
  HPATH = "instance-start"
2197
  HTYPE = constants.HTYPE_INSTANCE
2198
  _OP_REQP = ["instance_name", "force"]
2199
  REQ_BGL = False
2200

    
2201
  def ExpandNames(self):
2202
    self._ExpandAndLockInstance()
2203
    self.needed_locks[locking.LEVEL_NODE] = []
2204
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2205

    
2206
  def DeclareLocks(self, level):
2207
    if level == locking.LEVEL_NODE:
2208
      self._LockInstancesNodes()
2209

    
2210
  def BuildHooksEnv(self):
2211
    """Build hooks env.
2212

2213
    This runs on master, primary and secondary nodes of the instance.
2214

2215
    """
2216
    env = {
2217
      "FORCE": self.op.force,
2218
      }
2219
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2220
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2221
          list(self.instance.secondary_nodes))
2222
    return env, nl, nl
2223

    
2224
  def CheckPrereq(self):
2225
    """Check prerequisites.
2226

2227
    This checks that the instance is in the cluster.
2228

2229
    """
2230
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2231
    assert self.instance is not None, \
2232
      "Cannot retrieve locked instance %s" % self.op.instance_name
2233

    
2234
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2235
    # check bridges existance
2236
    _CheckInstanceBridgesExist(self, instance)
2237

    
2238
    _CheckNodeFreeMemory(self, instance.primary_node,
2239
                         "starting instance %s" % instance.name,
2240
                         bep[constants.BE_MEMORY], instance.hypervisor)
2241

    
2242
  def Exec(self, feedback_fn):
2243
    """Start the instance.
2244

2245
    """
2246
    instance = self.instance
2247
    force = self.op.force
2248
    extra_args = getattr(self.op, "extra_args", "")
2249

    
2250
    self.cfg.MarkInstanceUp(instance.name)
2251

    
2252
    node_current = instance.primary_node
2253

    
2254
    _StartInstanceDisks(self, instance, force)
2255

    
2256
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2257
      _ShutdownInstanceDisks(self, instance)
2258
      raise errors.OpExecError("Could not start instance")
2259

    
2260

    
2261
class LURebootInstance(LogicalUnit):
2262
  """Reboot an instance.
2263

2264
  """
2265
  HPATH = "instance-reboot"
2266
  HTYPE = constants.HTYPE_INSTANCE
2267
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2268
  REQ_BGL = False
2269

    
2270
  def ExpandNames(self):
2271
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2272
                                   constants.INSTANCE_REBOOT_HARD,
2273
                                   constants.INSTANCE_REBOOT_FULL]:
2274
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2275
                                  (constants.INSTANCE_REBOOT_SOFT,
2276
                                   constants.INSTANCE_REBOOT_HARD,
2277
                                   constants.INSTANCE_REBOOT_FULL))
2278
    self._ExpandAndLockInstance()
2279
    self.needed_locks[locking.LEVEL_NODE] = []
2280
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2281

    
2282
  def DeclareLocks(self, level):
2283
    if level == locking.LEVEL_NODE:
2284
      primary_only = not constants.INSTANCE_REBOOT_FULL
2285
      self._LockInstancesNodes(primary_only=primary_only)
2286

    
2287
  def BuildHooksEnv(self):
2288
    """Build hooks env.
2289

2290
    This runs on master, primary and secondary nodes of the instance.
2291

2292
    """
2293
    env = {
2294
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2295
      }
2296
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2297
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2298
          list(self.instance.secondary_nodes))
2299
    return env, nl, nl
2300

    
2301
  def CheckPrereq(self):
2302
    """Check prerequisites.
2303

2304
    This checks that the instance is in the cluster.
2305

2306
    """
2307
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2308
    assert self.instance is not None, \
2309
      "Cannot retrieve locked instance %s" % self.op.instance_name
2310

    
2311
    # check bridges existance
2312
    _CheckInstanceBridgesExist(self, instance)
2313

    
2314
  def Exec(self, feedback_fn):
2315
    """Reboot the instance.
2316

2317
    """
2318
    instance = self.instance
2319
    ignore_secondaries = self.op.ignore_secondaries
2320
    reboot_type = self.op.reboot_type
2321
    extra_args = getattr(self.op, "extra_args", "")
2322

    
2323
    node_current = instance.primary_node
2324

    
2325
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2326
                       constants.INSTANCE_REBOOT_HARD]:
2327
      if not self.rpc.call_instance_reboot(node_current, instance,
2328
                                           reboot_type, extra_args):
2329
        raise errors.OpExecError("Could not reboot instance")
2330
    else:
2331
      if not self.rpc.call_instance_shutdown(node_current, instance):
2332
        raise errors.OpExecError("could not shutdown instance for full reboot")
2333
      _ShutdownInstanceDisks(self, instance)
2334
      _StartInstanceDisks(self, instance, ignore_secondaries)
2335
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2336
        _ShutdownInstanceDisks(self, instance)
2337
        raise errors.OpExecError("Could not start instance for full reboot")
2338

    
2339
    self.cfg.MarkInstanceUp(instance.name)
2340

    
2341

    
2342
class LUShutdownInstance(LogicalUnit):
2343
  """Shutdown an instance.
2344

2345
  """
2346
  HPATH = "instance-stop"
2347
  HTYPE = constants.HTYPE_INSTANCE
2348
  _OP_REQP = ["instance_name"]
2349
  REQ_BGL = False
2350

    
2351
  def ExpandNames(self):
2352
    self._ExpandAndLockInstance()
2353
    self.needed_locks[locking.LEVEL_NODE] = []
2354
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2355

    
2356
  def DeclareLocks(self, level):
2357
    if level == locking.LEVEL_NODE:
2358
      self._LockInstancesNodes()
2359

    
2360
  def BuildHooksEnv(self):
2361
    """Build hooks env.
2362

2363
    This runs on master, primary and secondary nodes of the instance.
2364

2365
    """
2366
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2367
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2368
          list(self.instance.secondary_nodes))
2369
    return env, nl, nl
2370

    
2371
  def CheckPrereq(self):
2372
    """Check prerequisites.
2373

2374
    This checks that the instance is in the cluster.
2375

2376
    """
2377
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2378
    assert self.instance is not None, \
2379
      "Cannot retrieve locked instance %s" % self.op.instance_name
2380

    
2381
  def Exec(self, feedback_fn):
2382
    """Shutdown the instance.
2383

2384
    """
2385
    instance = self.instance
2386
    node_current = instance.primary_node
2387
    self.cfg.MarkInstanceDown(instance.name)
2388
    if not self.rpc.call_instance_shutdown(node_current, instance):
2389
      logging.error("Could not shutdown instance")
2390

    
2391
    _ShutdownInstanceDisks(self, instance)
2392

    
2393

    
2394
class LUReinstallInstance(LogicalUnit):
2395
  """Reinstall an instance.
2396

2397
  """
2398
  HPATH = "instance-reinstall"
2399
  HTYPE = constants.HTYPE_INSTANCE
2400
  _OP_REQP = ["instance_name"]
2401
  REQ_BGL = False
2402

    
2403
  def ExpandNames(self):
2404
    self._ExpandAndLockInstance()
2405
    self.needed_locks[locking.LEVEL_NODE] = []
2406
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2407

    
2408
  def DeclareLocks(self, level):
2409
    if level == locking.LEVEL_NODE:
2410
      self._LockInstancesNodes()
2411

    
2412
  def BuildHooksEnv(self):
2413
    """Build hooks env.
2414

2415
    This runs on master, primary and secondary nodes of the instance.
2416

2417
    """
2418
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2419
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2420
          list(self.instance.secondary_nodes))
2421
    return env, nl, nl
2422

    
2423
  def CheckPrereq(self):
2424
    """Check prerequisites.
2425

2426
    This checks that the instance is in the cluster and is not running.
2427

2428
    """
2429
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2430
    assert instance is not None, \
2431
      "Cannot retrieve locked instance %s" % self.op.instance_name
2432

    
2433
    if instance.disk_template == constants.DT_DISKLESS:
2434
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2435
                                 self.op.instance_name)
2436
    if instance.status != "down":
2437
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2438
                                 self.op.instance_name)
2439
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2440
                                              instance.name,
2441
                                              instance.hypervisor)
2442
    if remote_info:
2443
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2444
                                 (self.op.instance_name,
2445
                                  instance.primary_node))
2446

    
2447
    self.op.os_type = getattr(self.op, "os_type", None)
2448
    if self.op.os_type is not None:
2449
      # OS verification
2450
      pnode = self.cfg.GetNodeInfo(
2451
        self.cfg.ExpandNodeName(instance.primary_node))
2452
      if pnode is None:
2453
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2454
                                   self.op.pnode)
2455
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2456
      if not os_obj:
2457
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2458
                                   " primary node"  % self.op.os_type)
2459

    
2460
    self.instance = instance
2461

    
2462
  def Exec(self, feedback_fn):
2463
    """Reinstall the instance.
2464

2465
    """
2466
    inst = self.instance
2467

    
2468
    if self.op.os_type is not None:
2469
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2470
      inst.os = self.op.os_type
2471
      self.cfg.Update(inst)
2472

    
2473
    _StartInstanceDisks(self, inst, None)
2474
    try:
2475
      feedback_fn("Running the instance OS create scripts...")
2476
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2477
                                           "sda", "sdb"):
2478
        raise errors.OpExecError("Could not install OS for instance %s"
2479
                                 " on node %s" %
2480
                                 (inst.name, inst.primary_node))
2481
    finally:
2482
      _ShutdownInstanceDisks(self, inst)
2483

    
2484

    
2485
class LURenameInstance(LogicalUnit):
2486
  """Rename an instance.
2487

2488
  """
2489
  HPATH = "instance-rename"
2490
  HTYPE = constants.HTYPE_INSTANCE
2491
  _OP_REQP = ["instance_name", "new_name"]
2492

    
2493
  def BuildHooksEnv(self):
2494
    """Build hooks env.
2495

2496
    This runs on master, primary and secondary nodes of the instance.
2497

2498
    """
2499
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2500
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2501
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2502
          list(self.instance.secondary_nodes))
2503
    return env, nl, nl
2504

    
2505
  def CheckPrereq(self):
2506
    """Check prerequisites.
2507

2508
    This checks that the instance is in the cluster and is not running.
2509

2510
    """
2511
    instance = self.cfg.GetInstanceInfo(
2512
      self.cfg.ExpandInstanceName(self.op.instance_name))
2513
    if instance is None:
2514
      raise errors.OpPrereqError("Instance '%s' not known" %
2515
                                 self.op.instance_name)
2516
    if instance.status != "down":
2517
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2518
                                 self.op.instance_name)
2519
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2520
                                              instance.name,
2521
                                              instance.hypervisor)
2522
    if remote_info:
2523
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2524
                                 (self.op.instance_name,
2525
                                  instance.primary_node))
2526
    self.instance = instance
2527

    
2528
    # new name verification
2529
    name_info = utils.HostInfo(self.op.new_name)
2530

    
2531
    self.op.new_name = new_name = name_info.name
2532
    instance_list = self.cfg.GetInstanceList()
2533
    if new_name in instance_list:
2534
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2535
                                 new_name)
2536

    
2537
    if not getattr(self.op, "ignore_ip", False):
2538
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2539
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2540
                                   (name_info.ip, new_name))
2541

    
2542

    
2543
  def Exec(self, feedback_fn):
2544
    """Reinstall the instance.
2545

2546
    """
2547
    inst = self.instance
2548
    old_name = inst.name
2549

    
2550
    if inst.disk_template == constants.DT_FILE:
2551
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2552

    
2553
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2554
    # Change the instance lock. This is definitely safe while we hold the BGL
2555
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2556
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2557

    
2558
    # re-read the instance from the configuration after rename
2559
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2560

    
2561
    if inst.disk_template == constants.DT_FILE:
2562
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2563
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2564
                                                     old_file_storage_dir,
2565
                                                     new_file_storage_dir)
2566

    
2567
      if not result:
2568
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2569
                                 " directory '%s' to '%s' (but the instance"
2570
                                 " has been renamed in Ganeti)" % (
2571
                                 inst.primary_node, old_file_storage_dir,
2572
                                 new_file_storage_dir))
2573

    
2574
      if not result[0]:
2575
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2576
                                 " (but the instance has been renamed in"
2577
                                 " Ganeti)" % (old_file_storage_dir,
2578
                                               new_file_storage_dir))
2579

    
2580
    _StartInstanceDisks(self, inst, None)
2581
    try:
2582
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2583
                                               old_name):
2584
        msg = ("Could not run OS rename script for instance %s on node %s"
2585
               " (but the instance has been renamed in Ganeti)" %
2586
               (inst.name, inst.primary_node))
2587
        logging.error(msg)
2588
    finally:
2589
      _ShutdownInstanceDisks(self, inst)
2590

    
2591

    
2592
class LURemoveInstance(LogicalUnit):
2593
  """Remove an instance.
2594

2595
  """
2596
  HPATH = "instance-remove"
2597
  HTYPE = constants.HTYPE_INSTANCE
2598
  _OP_REQP = ["instance_name", "ignore_failures"]
2599
  REQ_BGL = False
2600

    
2601
  def ExpandNames(self):
2602
    self._ExpandAndLockInstance()
2603
    self.needed_locks[locking.LEVEL_NODE] = []
2604
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2605

    
2606
  def DeclareLocks(self, level):
2607
    if level == locking.LEVEL_NODE:
2608
      self._LockInstancesNodes()
2609

    
2610
  def BuildHooksEnv(self):
2611
    """Build hooks env.
2612

2613
    This runs on master, primary and secondary nodes of the instance.
2614

2615
    """
2616
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2617
    nl = [self.cfg.GetMasterNode()]
2618
    return env, nl, nl
2619

    
2620
  def CheckPrereq(self):
2621
    """Check prerequisites.
2622

2623
    This checks that the instance is in the cluster.
2624

2625
    """
2626
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2627
    assert self.instance is not None, \
2628
      "Cannot retrieve locked instance %s" % self.op.instance_name
2629

    
2630
  def Exec(self, feedback_fn):
2631
    """Remove the instance.
2632

2633
    """
2634
    instance = self.instance
2635
    logging.info("Shutting down instance %s on node %s",
2636
                 instance.name, instance.primary_node)
2637

    
2638
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2639
      if self.op.ignore_failures:
2640
        feedback_fn("Warning: can't shutdown instance")
2641
      else:
2642
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2643
                                 (instance.name, instance.primary_node))
2644

    
2645
    logging.info("Removing block devices for instance %s", instance.name)
2646

    
2647
    if not _RemoveDisks(self, instance):
2648
      if self.op.ignore_failures:
2649
        feedback_fn("Warning: can't remove instance's disks")
2650
      else:
2651
        raise errors.OpExecError("Can't remove instance's disks")
2652

    
2653
    logging.info("Removing instance %s out of cluster config", instance.name)
2654

    
2655
    self.cfg.RemoveInstance(instance.name)
2656
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2657

    
2658

    
2659
class LUQueryInstances(NoHooksLU):
2660
  """Logical unit for querying instances.
2661

2662
  """
2663
  _OP_REQP = ["output_fields", "names"]
2664
  REQ_BGL = False
2665

    
2666
  def ExpandNames(self):
2667
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2668
    hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2669
    bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2670
    self.static_fields = frozenset([
2671
      "name", "os", "pnode", "snodes",
2672
      "admin_state", "admin_ram",
2673
      "disk_template", "ip", "mac", "bridge",
2674
      "sda_size", "sdb_size", "vcpus", "tags",
2675
      "network_port",
2676
      "serial_no", "hypervisor", "hvparams",
2677
      ] + hvp + bep)
2678

    
2679
    _CheckOutputFields(static=self.static_fields,
2680
                       dynamic=self.dynamic_fields,
2681
                       selected=self.op.output_fields)
2682

    
2683
    self.needed_locks = {}
2684
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2685
    self.share_locks[locking.LEVEL_NODE] = 1
2686

    
2687
    if self.op.names:
2688
      self.wanted = _GetWantedInstances(self, self.op.names)
2689
    else:
2690
      self.wanted = locking.ALL_SET
2691

    
2692
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2693
    if self.do_locking:
2694
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2695
      self.needed_locks[locking.LEVEL_NODE] = []
2696
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2697

    
2698
  def DeclareLocks(self, level):
2699
    if level == locking.LEVEL_NODE and self.do_locking:
2700
      self._LockInstancesNodes()
2701

    
2702
  def CheckPrereq(self):
2703
    """Check prerequisites.
2704

2705
    """
2706
    pass
2707

    
2708
  def Exec(self, feedback_fn):
2709
    """Computes the list of nodes and their attributes.
2710

2711
    """
2712
    all_info = self.cfg.GetAllInstancesInfo()
2713
    if self.do_locking:
2714
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2715
    elif self.wanted != locking.ALL_SET:
2716
      instance_names = self.wanted
2717
      missing = set(instance_names).difference(all_info.keys())
2718
      if missing:
2719
        raise errors.OpExecError(
2720
          "Some instances were removed before retrieving their data: %s"
2721
          % missing)
2722
    else:
2723
      instance_names = all_info.keys()
2724

    
2725
    instance_names = utils.NiceSort(instance_names)
2726
    instance_list = [all_info[iname] for iname in instance_names]
2727

    
2728
    # begin data gathering
2729

    
2730
    nodes = frozenset([inst.primary_node for inst in instance_list])
2731
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2732

    
2733
    bad_nodes = []
2734
    if self.dynamic_fields.intersection(self.op.output_fields):
2735
      live_data = {}
2736
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2737
      for name in nodes:
2738
        result = node_data[name]
2739
        if result:
2740
          live_data.update(result)
2741
        elif result == False:
2742
          bad_nodes.append(name)
2743
        # else no instance is alive
2744
    else:
2745
      live_data = dict([(name, {}) for name in instance_names])
2746

    
2747
    # end data gathering
2748

    
2749
    HVPREFIX = "hv/"
2750
    BEPREFIX = "be/"
2751
    output = []
2752
    for instance in instance_list:
2753
      iout = []
2754
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2755
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2756
      for field in self.op.output_fields:
2757
        if field == "name":
2758
          val = instance.name
2759
        elif field == "os":
2760
          val = instance.os
2761
        elif field == "pnode":
2762
          val = instance.primary_node
2763
        elif field == "snodes":
2764
          val = list(instance.secondary_nodes)
2765
        elif field == "admin_state":
2766
          val = (instance.status != "down")
2767
        elif field == "oper_state":
2768
          if instance.primary_node in bad_nodes:
2769
            val = None
2770
          else:
2771
            val = bool(live_data.get(instance.name))
2772
        elif field == "status":
2773
          if instance.primary_node in bad_nodes:
2774
            val = "ERROR_nodedown"
2775
          else:
2776
            running = bool(live_data.get(instance.name))
2777
            if running:
2778
              if instance.status != "down":
2779
                val = "running"
2780
              else:
2781
                val = "ERROR_up"
2782
            else:
2783
              if instance.status != "down":
2784
                val = "ERROR_down"
2785
              else:
2786
                val = "ADMIN_down"
2787
        elif field == "oper_ram":
2788
          if instance.primary_node in bad_nodes:
2789
            val = None
2790
          elif instance.name in live_data:
2791
            val = live_data[instance.name].get("memory", "?")
2792
          else:
2793
            val = "-"
2794
        elif field == "disk_template":
2795
          val = instance.disk_template
2796
        elif field == "ip":
2797
          val = instance.nics[0].ip
2798
        elif field == "bridge":
2799
          val = instance.nics[0].bridge
2800
        elif field == "mac":
2801
          val = instance.nics[0].mac
2802
        elif field == "sda_size" or field == "sdb_size":
2803
          disk = instance.FindDisk(field[:3])
2804
          if disk is None:
2805
            val = None
2806
          else:
2807
            val = disk.size
2808
        elif field == "tags":
2809
          val = list(instance.GetTags())
2810
        elif field == "serial_no":
2811
          val = instance.serial_no
2812
        elif field == "network_port":
2813
          val = instance.network_port
2814
        elif field == "hypervisor":
2815
          val = instance.hypervisor
2816
        elif field == "hvparams":
2817
          val = i_hv
2818
        elif (field.startswith(HVPREFIX) and
2819
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2820
          val = i_hv.get(field[len(HVPREFIX):], None)
2821
        elif field == "beparams":
2822
          val = i_be
2823
        elif (field.startswith(BEPREFIX) and
2824
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2825
          val = i_be.get(field[len(BEPREFIX):], None)
2826
        else:
2827
          raise errors.ParameterError(field)
2828
        iout.append(val)
2829
      output.append(iout)
2830

    
2831
    return output
2832

    
2833

    
2834
class LUFailoverInstance(LogicalUnit):
2835
  """Failover an instance.
2836

2837
  """
2838
  HPATH = "instance-failover"
2839
  HTYPE = constants.HTYPE_INSTANCE
2840
  _OP_REQP = ["instance_name", "ignore_consistency"]
2841
  REQ_BGL = False
2842

    
2843
  def ExpandNames(self):
2844
    self._ExpandAndLockInstance()
2845
    self.needed_locks[locking.LEVEL_NODE] = []
2846
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2847

    
2848
  def DeclareLocks(self, level):
2849
    if level == locking.LEVEL_NODE:
2850
      self._LockInstancesNodes()
2851

    
2852
  def BuildHooksEnv(self):
2853
    """Build hooks env.
2854

2855
    This runs on master, primary and secondary nodes of the instance.
2856

2857
    """
2858
    env = {
2859
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2860
      }
2861
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2862
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2863
    return env, nl, nl
2864

    
2865
  def CheckPrereq(self):
2866
    """Check prerequisites.
2867

2868
    This checks that the instance is in the cluster.
2869

2870
    """
2871
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2872
    assert self.instance is not None, \
2873
      "Cannot retrieve locked instance %s" % self.op.instance_name
2874

    
2875
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2876
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2877
      raise errors.OpPrereqError("Instance's disk layout is not"
2878
                                 " network mirrored, cannot failover.")
2879

    
2880
    secondary_nodes = instance.secondary_nodes
2881
    if not secondary_nodes:
2882
      raise errors.ProgrammerError("no secondary node but using "
2883
                                   "a mirrored disk template")
2884

    
2885
    target_node = secondary_nodes[0]
2886
    # check memory requirements on the secondary node
2887
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2888
                         instance.name, bep[constants.BE_MEMORY],
2889
                         instance.hypervisor)
2890

    
2891
    # check bridge existance
2892
    brlist = [nic.bridge for nic in instance.nics]
2893
    if not self.rpc.call_bridges_exist(target_node, brlist):
2894
      raise errors.OpPrereqError("One or more target bridges %s does not"
2895
                                 " exist on destination node '%s'" %
2896
                                 (brlist, target_node))
2897

    
2898
  def Exec(self, feedback_fn):
2899
    """Failover an instance.
2900

2901
    The failover is done by shutting it down on its present node and
2902
    starting it on the secondary.
2903

2904
    """
2905
    instance = self.instance
2906

    
2907
    source_node = instance.primary_node
2908
    target_node = instance.secondary_nodes[0]
2909

    
2910
    feedback_fn("* checking disk consistency between source and target")
2911
    for dev in instance.disks:
2912
      # for drbd, these are drbd over lvm
2913
      if not _CheckDiskConsistency(self, dev, target_node, False):
2914
        if instance.status == "up" and not self.op.ignore_consistency:
2915
          raise errors.OpExecError("Disk %s is degraded on target node,"
2916
                                   " aborting failover." % dev.iv_name)
2917

    
2918
    feedback_fn("* shutting down instance on source node")
2919
    logging.info("Shutting down instance %s on node %s",
2920
                 instance.name, source_node)
2921

    
2922
    if not self.rpc.call_instance_shutdown(source_node, instance):
2923
      if self.op.ignore_consistency:
2924
        logging.error("Could not shutdown instance %s on node %s. Proceeding"
2925
                      " anyway. Please make sure node %s is down",
2926
                      instance.name, source_node, source_node)
2927
      else:
2928
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2929
                                 (instance.name, source_node))
2930

    
2931
    feedback_fn("* deactivating the instance's disks on source node")
2932
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2933
      raise errors.OpExecError("Can't shut down the instance's disks.")
2934

    
2935
    instance.primary_node = target_node
2936
    # distribute new instance config to the other nodes
2937
    self.cfg.Update(instance)
2938

    
2939
    # Only start the instance if it's marked as up
2940
    if instance.status == "up":
2941
      feedback_fn("* activating the instance's disks on target node")
2942
      logging.info("Starting instance %s on node %s",
2943
                   instance.name, target_node)
2944

    
2945
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2946
                                               ignore_secondaries=True)
2947
      if not disks_ok:
2948
        _ShutdownInstanceDisks(self, instance)
2949
        raise errors.OpExecError("Can't activate the instance's disks")
2950

    
2951
      feedback_fn("* starting the instance on the target node")
2952
      if not self.rpc.call_instance_start(target_node, instance, None):
2953
        _ShutdownInstanceDisks(self, instance)
2954
        raise errors.OpExecError("Could not start instance %s on node %s." %
2955
                                 (instance.name, target_node))
2956

    
2957

    
2958
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2959
  """Create a tree of block devices on the primary node.
2960

2961
  This always creates all devices.
2962

2963
  """
2964
  if device.children:
2965
    for child in device.children:
2966
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2967
        return False
2968

    
2969
  lu.cfg.SetDiskID(device, node)
2970
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2971
                                       instance.name, True, info)
2972
  if not new_id:
2973
    return False
2974
  if device.physical_id is None:
2975
    device.physical_id = new_id
2976
  return True
2977

    
2978

    
2979
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2980
  """Create a tree of block devices on a secondary node.
2981

2982
  If this device type has to be created on secondaries, create it and
2983
  all its children.
2984

2985
  If not, just recurse to children keeping the same 'force' value.
2986

2987
  """
2988
  if device.CreateOnSecondary():
2989
    force = True
2990
  if device.children:
2991
    for child in device.children:
2992
      if not _CreateBlockDevOnSecondary(lu, node, instance,
2993
                                        child, force, info):
2994
        return False
2995

    
2996
  if not force:
2997
    return True
2998
  lu.cfg.SetDiskID(device, node)
2999
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3000
                                       instance.name, False, info)
3001
  if not new_id:
3002
    return False
3003
  if device.physical_id is None:
3004
    device.physical_id = new_id
3005
  return True
3006

    
3007

    
3008
def _GenerateUniqueNames(lu, exts):
3009
  """Generate a suitable LV name.
3010

3011
  This will generate a logical volume name for the given instance.
3012

3013
  """
3014
  results = []
3015
  for val in exts:
3016
    new_id = lu.cfg.GenerateUniqueID()
3017
    results.append("%s%s" % (new_id, val))
3018
  return results
3019

    
3020

    
3021
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3022
                         p_minor, s_minor):
3023
  """Generate a drbd8 device complete with its children.
3024

3025
  """
3026
  port = lu.cfg.AllocatePort()
3027
  vgname = lu.cfg.GetVGName()
3028
  shared_secret = lu.cfg.GenerateDRBDSecret()
3029
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3030
                          logical_id=(vgname, names[0]))
3031
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3032
                          logical_id=(vgname, names[1]))
3033
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3034
                          logical_id=(primary, secondary, port,
3035
                                      p_minor, s_minor,
3036
                                      shared_secret),
3037
                          children=[dev_data, dev_meta],
3038
                          iv_name=iv_name)
3039
  return drbd_dev
3040

    
3041

    
3042
def _GenerateDiskTemplate(lu, template_name,
3043
                          instance_name, primary_node,
3044
                          secondary_nodes, disk_sz, swap_sz,
3045
                          file_storage_dir, file_driver):
3046
  """Generate the entire disk layout for a given template type.
3047

3048
  """
3049
  #TODO: compute space requirements
3050

    
3051
  vgname = lu.cfg.GetVGName()
3052
  if template_name == constants.DT_DISKLESS:
3053
    disks = []
3054
  elif template_name == constants.DT_PLAIN:
3055
    if len(secondary_nodes) != 0:
3056
      raise errors.ProgrammerError("Wrong template configuration")
3057

    
3058
    names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3059
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3060
                           logical_id=(vgname, names[0]),
3061
                           iv_name = "sda")
3062
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3063
                           logical_id=(vgname, names[1]),
3064
                           iv_name = "sdb")
3065
    disks = [sda_dev, sdb_dev]
3066
  elif template_name == constants.DT_DRBD8:
3067
    if len(secondary_nodes) != 1:
3068
      raise errors.ProgrammerError("Wrong template configuration")
3069
    remote_node = secondary_nodes[0]
3070
    (minor_pa, minor_pb,
3071
     minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3072
      [primary_node, primary_node, remote_node, remote_node], instance_name)
3073

    
3074
    names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3075
                                      ".sdb_data", ".sdb_meta"])
3076
    drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3077
                                        disk_sz, names[0:2], "sda",
3078
                                        minor_pa, minor_sa)
3079
    drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3080
                                        swap_sz, names[2:4], "sdb",
3081
                                        minor_pb, minor_sb)
3082
    disks = [drbd_sda_dev, drbd_sdb_dev]
3083
  elif template_name == constants.DT_FILE:
3084
    if len(secondary_nodes) != 0:
3085
      raise errors.ProgrammerError("Wrong template configuration")
3086

    
3087
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3088
                                iv_name="sda", logical_id=(file_driver,
3089
                                "%s/sda" % file_storage_dir))
3090
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3091
                                iv_name="sdb", logical_id=(file_driver,
3092
                                "%s/sdb" % file_storage_dir))
3093
    disks = [file_sda_dev, file_sdb_dev]
3094
  else:
3095
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3096
  return disks
3097

    
3098

    
3099
def _GetInstanceInfoText(instance):
3100
  """Compute that text that should be added to the disk's metadata.
3101

3102
  """
3103
  return "originstname+%s" % instance.name
3104

    
3105

    
3106
def _CreateDisks(lu, instance):
3107
  """Create all disks for an instance.
3108

3109
  This abstracts away some work from AddInstance.
3110

3111
  Args:
3112
    instance: the instance object
3113

3114
  Returns:
3115
    True or False showing the success of the creation process
3116

3117
  """
3118
  info = _GetInstanceInfoText(instance)
3119

    
3120
  if instance.disk_template == constants.DT_FILE:
3121
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3122
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3123
                                                 file_storage_dir)
3124

    
3125
    if not result:
3126
      logging.error("Could not connect to node '%s'", instance.primary_node)
3127
      return False
3128

    
3129
    if not result[0]:
3130
      logging.error("Failed to create directory '%s'", file_storage_dir)
3131
      return False
3132

    
3133
  for device in instance.disks:
3134
    logging.info("Creating volume %s for instance %s",
3135
                 device.iv_name, instance.name)
3136
    #HARDCODE
3137
    for secondary_node in instance.secondary_nodes:
3138
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3139
                                        device, False, info):
3140
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3141
                      device.iv_name, device, secondary_node)
3142
        return False
3143
    #HARDCODE
3144
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3145
                                    instance, device, info):
3146
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3147
      return False
3148

    
3149
  return True
3150

    
3151

    
3152
def _RemoveDisks(lu, instance):
3153
  """Remove all disks for an instance.
3154

3155
  This abstracts away some work from `AddInstance()` and
3156
  `RemoveInstance()`. Note that in case some of the devices couldn't
3157
  be removed, the removal will continue with the other ones (compare
3158
  with `_CreateDisks()`).
3159

3160
  Args:
3161
    instance: the instance object
3162

3163
  Returns:
3164
    True or False showing the success of the removal proces
3165

3166
  """
3167
  logging.info("Removing block devices for instance %s", instance.name)
3168

    
3169
  result = True
3170
  for device in instance.disks:
3171
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3172
      lu.cfg.SetDiskID(disk, node)
3173
      if not lu.rpc.call_blockdev_remove(node, disk):
3174
        logging.error("Could not remove block device %s on node %s,"
3175
                      " continuing anyway", device.iv_name, node)
3176
        result = False
3177

    
3178
  if instance.disk_template == constants.DT_FILE:
3179
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3180
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3181
                                               file_storage_dir):
3182
      logging.error("Could not remove directory '%s'", file_storage_dir)
3183
      result = False
3184

    
3185
  return result
3186

    
3187

    
3188
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3189
  """Compute disk size requirements in the volume group
3190

3191
  This is currently hard-coded for the two-drive layout.
3192

3193
  """
3194
  # Required free disk space as a function of disk and swap space
3195
  req_size_dict = {
3196
    constants.DT_DISKLESS: None,
3197
    constants.DT_PLAIN: disk_size + swap_size,
3198
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3199
    constants.DT_DRBD8: disk_size + swap_size + 256,
3200
    constants.DT_FILE: None,
3201
  }
3202

    
3203
  if disk_template not in req_size_dict:
3204
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3205
                                 " is unknown" %  disk_template)
3206

    
3207
  return req_size_dict[disk_template]
3208

    
3209

    
3210
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3211
  """Hypervisor parameter validation.
3212

3213
  This function abstract the hypervisor parameter validation to be
3214
  used in both instance create and instance modify.
3215

3216
  @type lu: L{LogicalUnit}
3217
  @param lu: the logical unit for which we check
3218
  @type nodenames: list
3219
  @param nodenames: the list of nodes on which we should check
3220
  @type hvname: string
3221
  @param hvname: the name of the hypervisor we should use
3222
  @type hvparams: dict
3223
  @param hvparams: the parameters which we need to check
3224
  @raise errors.OpPrereqError: if the parameters are not valid
3225

3226
  """
3227
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3228
                                                  hvname,
3229
                                                  hvparams)
3230
  for node in nodenames:
3231
    info = hvinfo.get(node, None)
3232
    if not info or not isinstance(info, (tuple, list)):
3233
      raise errors.OpPrereqError("Cannot get current information"
3234
                                 " from node '%s' (%s)" % (node, info))
3235
    if not info[0]:
3236
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3237
                                 " %s" % info[1])
3238

    
3239

    
3240
class LUCreateInstance(LogicalUnit):
3241
  """Create an instance.
3242

3243
  """
3244
  HPATH = "instance-add"
3245
  HTYPE = constants.HTYPE_INSTANCE
3246
  _OP_REQP = ["instance_name", "disk_size",
3247
              "disk_template", "swap_size", "mode", "start",
3248
              "wait_for_sync", "ip_check", "mac",
3249
              "hvparams", "beparams"]
3250
  REQ_BGL = False
3251

    
3252
  def _ExpandNode(self, node):
3253
    """Expands and checks one node name.
3254

3255
    """
3256
    node_full = self.cfg.ExpandNodeName(node)
3257
    if node_full is None:
3258
      raise errors.OpPrereqError("Unknown node %s" % node)
3259
    return node_full
3260

    
3261
  def ExpandNames(self):
3262
    """ExpandNames for CreateInstance.
3263

3264
    Figure out the right locks for instance creation.
3265

3266
    """
3267
    self.needed_locks = {}
3268

    
3269
    # set optional parameters to none if they don't exist
3270
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3271
      if not hasattr(self.op, attr):
3272
        setattr(self.op, attr, None)
3273

    
3274
    # cheap checks, mostly valid constants given
3275

    
3276
    # verify creation mode
3277
    if self.op.mode not in (constants.INSTANCE_CREATE,
3278
                            constants.INSTANCE_IMPORT):
3279
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3280
                                 self.op.mode)
3281

    
3282
    # disk template and mirror node verification
3283
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3284
      raise errors.OpPrereqError("Invalid disk template name")
3285

    
3286
    if self.op.hypervisor is None:
3287
      self.op.hypervisor = self.cfg.GetHypervisorType()
3288

    
3289
    cluster = self.cfg.GetClusterInfo()
3290
    enabled_hvs = cluster.enabled_hypervisors
3291
    if self.op.hypervisor not in enabled_hvs:
3292
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3293
                                 " cluster (%s)" % (self.op.hypervisor,
3294
                                  ",".join(enabled_hvs)))
3295

    
3296
    # check hypervisor parameter syntax (locally)
3297

    
3298
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3299
                                  self.op.hvparams)
3300
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3301
    hv_type.CheckParameterSyntax(filled_hvp)
3302

    
3303
    # fill and remember the beparams dict
3304
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3305
                                    self.op.beparams)
3306

    
3307
    #### instance parameters check
3308

    
3309
    # instance name verification
3310
    hostname1 = utils.HostInfo(self.op.instance_name)
3311
    self.op.instance_name = instance_name = hostname1.name
3312

    
3313
    # this is just a preventive check, but someone might still add this
3314
    # instance in the meantime, and creation will fail at lock-add time
3315
    if instance_name in self.cfg.GetInstanceList():
3316
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3317
                                 instance_name)
3318

    
3319
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3320

    
3321
    # ip validity checks
3322
    ip = getattr(self.op, "ip", None)
3323
    if ip is None or ip.lower() == "none":
3324
      inst_ip = None
3325
    elif ip.lower() == constants.VALUE_AUTO:
3326
      inst_ip = hostname1.ip
3327
    else:
3328
      if not utils.IsValidIP(ip):
3329
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3330
                                   " like a valid IP" % ip)
3331
      inst_ip = ip
3332
    self.inst_ip = self.op.ip = inst_ip
3333
    # used in CheckPrereq for ip ping check
3334
    self.check_ip = hostname1.ip
3335

    
3336
    # MAC address verification
3337
    if self.op.mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3338
      if not utils.IsValidMac(self.op.mac.lower()):
3339
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3340
                                   self.op.mac)
3341

    
3342
    # file storage checks
3343
    if (self.op.file_driver and
3344
        not self.op.file_driver in constants.FILE_DRIVER):
3345
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3346
                                 self.op.file_driver)
3347

    
3348
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3349
      raise errors.OpPrereqError("File storage directory path not absolute")
3350

    
3351
    ### Node/iallocator related checks
3352
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3353
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3354
                                 " node must be given")
3355

    
3356
    if self.op.iallocator:
3357
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3358
    else:
3359
      self.op.pnode = self._ExpandNode(self.op.pnode)
3360
      nodelist = [self.op.pnode]
3361
      if self.op.snode is not None:
3362
        self.op.snode = self._ExpandNode(self.op.snode)
3363
        nodelist.append(self.op.snode)
3364
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3365

    
3366
    # in case of import lock the source node too
3367
    if self.op.mode == constants.INSTANCE_IMPORT:
3368
      src_node = getattr(self.op, "src_node", None)
3369
      src_path = getattr(self.op, "src_path", None)
3370

    
3371
      if src_node is None or src_path is None:
3372
        raise errors.OpPrereqError("Importing an instance requires source"
3373
                                   " node and path options")
3374

    
3375
      if not os.path.isabs(src_path):
3376
        raise errors.OpPrereqError("The source path must be absolute")
3377

    
3378
      self.op.src_node = src_node = self._ExpandNode(src_node)
3379
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3380
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3381

    
3382
    else: # INSTANCE_CREATE
3383
      if getattr(self.op, "os_type", None) is None:
3384
        raise errors.OpPrereqError("No guest OS specified")
3385

    
3386
  def _RunAllocator(self):
3387
    """Run the allocator based on input opcode.
3388

3389
    """
3390
    disks = [{"size": self.op.disk_size, "mode": "w"},
3391
             {"size": self.op.swap_size, "mode": "w"}]
3392
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3393
             "bridge": self.op.bridge}]
3394
    ial = IAllocator(self,
3395
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3396
                     name=self.op.instance_name,
3397
                     disk_template=self.op.disk_template,
3398
                     tags=[],
3399
                     os=self.op.os_type,
3400
                     vcpus=self.be_full[constants.BE_VCPUS],
3401
                     mem_size=self.be_full[constants.BE_MEMORY],
3402
                     disks=disks,
3403
                     nics=nics,
3404
                     )
3405

    
3406
    ial.Run(self.op.iallocator)
3407

    
3408
    if not ial.success:
3409
      raise errors.OpPrereqError("Can't compute nodes using"
3410
                                 " iallocator '%s': %s" % (self.op.iallocator,
3411
                                                           ial.info))
3412
    if len(ial.nodes) != ial.required_nodes:
3413
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3414
                                 " of nodes (%s), required %s" %
3415
                                 (self.op.iallocator, len(ial.nodes),
3416
                                  ial.required_nodes))
3417
    self.op.pnode = ial.nodes[0]
3418
    feedback_fn("Selected nodes for the instance: %s" %
3419
                (", ".join(ial.nodes),))
3420
    logging.info("Selected nodes for instance %s via iallocator %s: %s",
3421
                 self.op.instance_name, self.op.iallocator, ial.nodes)
3422
    if ial.required_nodes == 2:
3423
      self.op.snode = ial.nodes[1]
3424

    
3425
  def BuildHooksEnv(self):
3426
    """Build hooks env.
3427

3428
    This runs on master, primary and secondary nodes of the instance.
3429

3430
    """
3431
    env = {
3432
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3433
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3434
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3435
      "INSTANCE_ADD_MODE": self.op.mode,
3436
      }
3437
    if self.op.mode == constants.INSTANCE_IMPORT:
3438
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3439
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3440
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3441

    
3442
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3443
      primary_node=self.op.pnode,
3444
      secondary_nodes=self.secondaries,
3445
      status=self.instance_status,
3446
      os_type=self.op.os_type,
3447
      memory=self.be_full[constants.BE_MEMORY],
3448
      vcpus=self.be_full[constants.BE_VCPUS],
3449
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3450
    ))
3451

    
3452
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3453
          self.secondaries)
3454
    return env, nl, nl
3455

    
3456

    
3457
  def CheckPrereq(self):
3458
    """Check prerequisites.
3459

3460
    """
3461
    if (not self.cfg.GetVGName() and
3462
        self.op.disk_template not in constants.DTS_NOT_LVM):
3463
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3464
                                 " instances")
3465

    
3466

    
3467
    if self.op.mode == constants.INSTANCE_IMPORT:
3468
      src_node = self.op.src_node
3469
      src_path = self.op.src_path
3470

    
3471
      export_info = self.rpc.call_export_info(src_node, src_path)
3472

    
3473
      if not export_info:
3474
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3475

    
3476
      if not export_info.has_section(constants.INISECT_EXP):
3477
        raise errors.ProgrammerError("Corrupted export config")
3478

    
3479
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3480
      if (int(ei_version) != constants.EXPORT_VERSION):
3481
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3482
                                   (ei_version, constants.EXPORT_VERSION))
3483

    
3484
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3485
        raise errors.OpPrereqError("Can't import instance with more than"
3486
                                   " one data disk")
3487

    
3488
      # FIXME: are the old os-es, disk sizes, etc. useful?
3489
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3490
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3491
                                                         'disk0_dump'))
3492
      self.src_image = diskimage
3493

    
3494
      if self.op.mac == constants.VALUE_AUTO:
3495
        old_name = export_info.get(constants.INISECT_INS, 'name')
3496
        if self.op.instance_name == old_name:
3497
          # FIXME: adjust every nic, when we'll be able to create instances
3498
          # with more than one
3499
          if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3500
            self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3501

    
3502
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3503

    
3504
    if self.op.start and not self.op.ip_check:
3505
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3506
                                 " adding an instance in start mode")
3507

    
3508
    if self.op.ip_check:
3509
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3510
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3511
                                   (self.check_ip, self.op.instance_name))
3512

    
3513
    # bridge verification
3514
    bridge = getattr(self.op, "bridge", None)
3515
    if bridge is None:
3516
      self.op.bridge = self.cfg.GetDefBridge()
3517
    else:
3518
      self.op.bridge = bridge
3519

    
3520
    #### allocator run
3521

    
3522
    if self.op.iallocator is not None:
3523
      self._RunAllocator()
3524

    
3525
    #### node related checks
3526

    
3527
    # check primary node
3528
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3529
    assert self.pnode is not None, \
3530
      "Cannot retrieve locked node %s" % self.op.pnode
3531
    self.secondaries = []
3532

    
3533
    # mirror node verification
3534
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3535
      if self.op.snode is None:
3536
        raise errors.OpPrereqError("The networked disk templates need"
3537
                                   " a mirror node")
3538
      if self.op.snode == pnode.name:
3539
        raise errors.OpPrereqError("The secondary node cannot be"
3540
                                   " the primary node.")
3541
      self.secondaries.append(self.op.snode)
3542

    
3543
    nodenames = [pnode.name] + self.secondaries
3544

    
3545
    req_size = _ComputeDiskSize(self.op.disk_template,
3546
                                self.op.disk_size, self.op.swap_size)
3547

    
3548
    # Check lv size requirements
3549
    if req_size is not None:
3550
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3551
                                         self.op.hypervisor)
3552
      for node in nodenames:
3553
        info = nodeinfo.get(node, None)
3554
        if not info:
3555
          raise errors.OpPrereqError("Cannot get current information"
3556
                                     " from node '%s'" % node)
3557
        vg_free = info.get('vg_free', None)
3558
        if not isinstance(vg_free, int):
3559
          raise errors.OpPrereqError("Can't compute free disk space on"
3560
                                     " node %s" % node)
3561
        if req_size > info['vg_free']:
3562
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3563
                                     " %d MB available, %d MB required" %
3564
                                     (node, info['vg_free'], req_size))
3565

    
3566
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3567

    
3568
    # os verification
3569
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3570
    if not os_obj:
3571
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3572
                                 " primary node"  % self.op.os_type)
3573

    
3574
    # bridge check on primary node
3575
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3576
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3577
                                 " destination node '%s'" %
3578
                                 (self.op.bridge, pnode.name))
3579

    
3580
    # memory check on primary node
3581
    if self.op.start:
3582
      _CheckNodeFreeMemory(self, self.pnode.name,
3583
                           "creating instance %s" % self.op.instance_name,
3584
                           self.be_full[constants.BE_MEMORY],
3585
                           self.op.hypervisor)
3586

    
3587
    if self.op.start:
3588
      self.instance_status = 'up'
3589
    else:
3590
      self.instance_status = 'down'
3591

    
3592
  def Exec(self, feedback_fn):
3593
    """Create and add the instance to the cluster.
3594

3595
    """
3596
    instance = self.op.instance_name
3597
    pnode_name = self.pnode.name
3598

    
3599
    if self.op.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3600
      mac_address = self.cfg.GenerateMAC()
3601
    else:
3602
      mac_address = self.op.mac
3603

    
3604
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3605
    if self.inst_ip is not None:
3606
      nic.ip = self.inst_ip
3607

    
3608
    ht_kind = self.op.hypervisor
3609
    if ht_kind in constants.HTS_REQ_PORT:
3610
      network_port = self.cfg.AllocatePort()
3611
    else:
3612
      network_port = None
3613

    
3614
    ##if self.op.vnc_bind_address is None:
3615
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3616

    
3617
    # this is needed because os.path.join does not accept None arguments
3618
    if self.op.file_storage_dir is None:
3619
      string_file_storage_dir = ""
3620
    else:
3621
      string_file_storage_dir = self.op.file_storage_dir
3622

    
3623
    # build the full file storage dir path
3624
    file_storage_dir = os.path.normpath(os.path.join(
3625
                                        self.cfg.GetFileStorageDir(),
3626
                                        string_file_storage_dir, instance))
3627

    
3628

    
3629
    disks = _GenerateDiskTemplate(self,
3630
                                  self.op.disk_template,
3631
                                  instance, pnode_name,
3632
                                  self.secondaries, self.op.disk_size,
3633
                                  self.op.swap_size,
3634
                                  file_storage_dir,
3635
                                  self.op.file_driver)
3636

    
3637
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3638
                            primary_node=pnode_name,
3639
                            nics=[nic], disks=disks,
3640
                            disk_template=self.op.disk_template,
3641
                            status=self.instance_status,
3642
                            network_port=network_port,
3643
                            beparams=self.op.beparams,
3644
                            hvparams=self.op.hvparams,
3645
                            hypervisor=self.op.hypervisor,
3646
                            )
3647

    
3648
    feedback_fn("* creating instance disks...")
3649
    if not _CreateDisks(self, iobj):
3650
      _RemoveDisks(self, iobj)
3651
      self.cfg.ReleaseDRBDMinors(instance)
3652
      raise errors.OpExecError("Device creation failed, reverting...")
3653

    
3654
    feedback_fn("adding instance %s to cluster config" % instance)
3655

    
3656
    self.cfg.AddInstance(iobj)
3657
    # Declare that we don't want to remove the instance lock anymore, as we've
3658
    # added the instance to the config
3659
    del self.remove_locks[locking.LEVEL_INSTANCE]
3660
    # Remove the temp. assignements for the instance's drbds
3661
    self.cfg.ReleaseDRBDMinors(instance)
3662

    
3663
    if self.op.wait_for_sync:
3664
      disk_abort = not _WaitForSync(self, iobj)
3665
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3666
      # make sure the disks are not degraded (still sync-ing is ok)
3667
      time.sleep(15)
3668
      feedback_fn("* checking mirrors status")
3669
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3670
    else:
3671
      disk_abort = False
3672

    
3673
    if disk_abort:
3674
      _RemoveDisks(self, iobj)
3675
      self.cfg.RemoveInstance(iobj.name)
3676
      # Make sure the instance lock gets removed
3677
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3678
      raise errors.OpExecError("There are some degraded disks for"
3679
                               " this instance")
3680

    
3681
    feedback_fn("creating os for instance %s on node %s" %
3682
                (instance, pnode_name))
3683

    
3684
    if iobj.disk_template != constants.DT_DISKLESS:
3685
      if self.op.mode == constants.INSTANCE_CREATE:
3686
        feedback_fn("* running the instance OS create scripts...")
3687
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3688
          raise errors.OpExecError("could not add os for instance %s"
3689
                                   " on node %s" %
3690
                                   (instance, pnode_name))
3691

    
3692
      elif self.op.mode == constants.INSTANCE_IMPORT:
3693
        feedback_fn("* running the instance OS import scripts...")
3694
        src_node = self.op.src_node
3695
        src_image = self.src_image
3696
        cluster_name = self.cfg.GetClusterName()
3697
        if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3698
                                                src_node, src_image,
3699
                                                cluster_name):
3700
          raise errors.OpExecError("Could not import os for instance"
3701
                                   " %s on node %s" %
3702
                                   (instance, pnode_name))
3703
      else:
3704
        # also checked in the prereq part
3705
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3706
                                     % self.op.mode)
3707

    
3708
    if self.op.start:
3709
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3710
      feedback_fn("* starting instance...")
3711
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3712
        raise errors.OpExecError("Could not start instance")
3713

    
3714

    
3715
class LUConnectConsole(NoHooksLU):
3716
  """Connect to an instance's console.
3717

3718
  This is somewhat special in that it returns the command line that
3719
  you need to run on the master node in order to connect to the
3720
  console.
3721

3722
  """
3723
  _OP_REQP = ["instance_name"]
3724
  REQ_BGL = False
3725

    
3726
  def ExpandNames(self):
3727
    self._ExpandAndLockInstance()
3728

    
3729
  def CheckPrereq(self):
3730
    """Check prerequisites.
3731

3732
    This checks that the instance is in the cluster.
3733

3734
    """
3735
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3736
    assert self.instance is not None, \
3737
      "Cannot retrieve locked instance %s" % self.op.instance_name
3738

    
3739
  def Exec(self, feedback_fn):
3740
    """Connect to the console of an instance
3741

3742
    """
3743
    instance = self.instance
3744
    node = instance.primary_node
3745

    
3746
    node_insts = self.rpc.call_instance_list([node],
3747
                                             [instance.hypervisor])[node]
3748
    if node_insts is False:
3749
      raise errors.OpExecError("Can't connect to node %s." % node)
3750

    
3751
    if instance.name not in node_insts:
3752
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3753

    
3754
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3755

    
3756
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3757
    console_cmd = hyper.GetShellCommandForConsole(instance)
3758

    
3759
    # build ssh cmdline
3760
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3761

    
3762

    
3763
class LUReplaceDisks(LogicalUnit):
3764
  """Replace the disks of an instance.
3765

3766
  """
3767
  HPATH = "mirrors-replace"
3768
  HTYPE = constants.HTYPE_INSTANCE
3769
  _OP_REQP = ["instance_name", "mode", "disks"]
3770
  REQ_BGL = False
3771

    
3772
  def ExpandNames(self):
3773
    self._ExpandAndLockInstance()
3774

    
3775
    if not hasattr(self.op, "remote_node"):
3776
      self.op.remote_node = None
3777

    
3778
    ia_name = getattr(self.op, "iallocator", None)
3779
    if ia_name is not None:
3780
      if self.op.remote_node is not None:
3781
        raise errors.OpPrereqError("Give either the iallocator or the new"
3782
                                   " secondary, not both")
3783
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3784
    elif self.op.remote_node is not None:
3785
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3786
      if remote_node is None:
3787
        raise errors.OpPrereqError("Node '%s' not known" %
3788
                                   self.op.remote_node)
3789
      self.op.remote_node = remote_node
3790
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3791
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3792
    else:
3793
      self.needed_locks[locking.LEVEL_NODE] = []
3794
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3795

    
3796
  def DeclareLocks(self, level):
3797
    # If we're not already locking all nodes in the set we have to declare the
3798
    # instance's primary/secondary nodes.
3799
    if (level == locking.LEVEL_NODE and
3800
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3801
      self._LockInstancesNodes()
3802

    
3803
  def _RunAllocator(self):
3804
    """Compute a new secondary node using an IAllocator.
3805

3806
    """
3807
    ial = IAllocator(self,
3808
                     mode=constants.IALLOCATOR_MODE_RELOC,
3809
                     name=self.op.instance_name,
3810
                     relocate_from=[self.sec_node])
3811

    
3812
    ial.Run(self.op.iallocator)
3813

    
3814
    if not ial.success:
3815
      raise errors.OpPrereqError("Can't compute nodes using"
3816
                                 " iallocator '%s': %s" % (self.op.iallocator,
3817
                                                           ial.info))
3818
    if len(ial.nodes) != ial.required_nodes:
3819
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3820
                                 " of nodes (%s), required %s" %
3821
                                 (len(ial.nodes), ial.required_nodes))
3822
    self.op.remote_node = ial.nodes[0]
3823
    feedback_fn("Selected new secondary for the instance: %s" %
3824
                self.op.remote_node)
3825

    
3826
  def BuildHooksEnv(self):
3827
    """Build hooks env.
3828

3829
    This runs on the master, the primary and all the secondaries.
3830

3831
    """
3832
    env = {
3833
      "MODE": self.op.mode,
3834
      "NEW_SECONDARY": self.op.remote_node,
3835
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3836
      }
3837
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3838
    nl = [
3839
      self.cfg.GetMasterNode(),
3840
      self.instance.primary_node,
3841
      ]
3842
    if self.op.remote_node is not None:
3843
      nl.append(self.op.remote_node)
3844
    return env, nl, nl
3845

    
3846
  def CheckPrereq(self):
3847
    """Check prerequisites.
3848

3849
    This checks that the instance is in the cluster.
3850

3851
    """
3852
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3853
    assert instance is not None, \
3854
      "Cannot retrieve locked instance %s" % self.op.instance_name
3855
    self.instance = instance
3856

    
3857
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3858
      raise errors.OpPrereqError("Instance's disk layout is not"
3859
                                 " network mirrored.")
3860

    
3861
    if len(instance.secondary_nodes) != 1:
3862
      raise errors.OpPrereqError("The instance has a strange layout,"
3863
                                 " expected one secondary but found %d" %
3864
                                 len(instance.secondary_nodes))
3865

    
3866
    self.sec_node = instance.secondary_nodes[0]
3867

    
3868
    ia_name = getattr(self.op, "iallocator", None)
3869
    if ia_name is not None:
3870
      self._RunAllocator()
3871

    
3872
    remote_node = self.op.remote_node
3873
    if remote_node is not None:
3874
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3875
      assert self.remote_node_info is not None, \
3876
        "Cannot retrieve locked node %s" % remote_node
3877
    else:
3878
      self.remote_node_info = None
3879
    if remote_node == instance.primary_node:
3880
      raise errors.OpPrereqError("The specified node is the primary node of"
3881
                                 " the instance.")
3882
    elif remote_node == self.sec_node:
3883
      if self.op.mode == constants.REPLACE_DISK_SEC:
3884
        # this is for DRBD8, where we can't execute the same mode of
3885
        # replacement as for drbd7 (no different port allocated)
3886
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3887
                                   " replacement")
3888
    if instance.disk_template == constants.DT_DRBD8:
3889
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3890
          remote_node is not None):
3891
        # switch to replace secondary mode
3892
        self.op.mode = constants.REPLACE_DISK_SEC
3893

    
3894
      if self.op.mode == constants.REPLACE_DISK_ALL:
3895
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3896
                                   " secondary disk replacement, not"
3897
                                   " both at once")
3898
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3899
        if remote_node is not None:
3900
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3901
                                     " the secondary while doing a primary"
3902
                                     " node disk replacement")
3903
        self.tgt_node = instance.primary_node
3904
        self.oth_node = instance.secondary_nodes[0]
3905
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3906
        self.new_node = remote_node # this can be None, in which case
3907
                                    # we don't change the secondary
3908
        self.tgt_node = instance.secondary_nodes[0]
3909
        self.oth_node = instance.primary_node
3910
      else:
3911
        raise errors.ProgrammerError("Unhandled disk replace mode")
3912

    
3913
    for name in self.op.disks:
3914
      if instance.FindDisk(name) is None:
3915
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3916
                                   (name, instance.name))
3917

    
3918
  def _ExecD8DiskOnly(self, feedback_fn):
3919
    """Replace a disk on the primary or secondary for dbrd8.
3920

3921
    The algorithm for replace is quite complicated:
3922
      - for each disk to be replaced:
3923
        - create new LVs on the target node with unique names
3924
        - detach old LVs from the drbd device
3925
        - rename old LVs to name_replaced.<time_t>
3926
        - rename new LVs to old LVs
3927
        - attach the new LVs (with the old names now) to the drbd device
3928
      - wait for sync across all devices
3929
      - for each modified disk:
3930
        - remove old LVs (which have the name name_replaces.<time_t>)
3931

3932
    Failures are not very well handled.
3933

3934
    """
3935
    steps_total = 6
3936
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3937
    instance = self.instance
3938
    iv_names = {}
3939
    vgname = self.cfg.GetVGName()
3940
    # start of work
3941
    cfg = self.cfg
3942
    tgt_node = self.tgt_node
3943
    oth_node = self.oth_node
3944

    
3945
    # Step: check device activation
3946
    self.proc.LogStep(1, steps_total, "check device existence")
3947
    info("checking volume groups")
3948
    my_vg = cfg.GetVGName()
3949
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3950
    if not results:
3951
      raise errors.OpExecError("Can't list volume groups on the nodes")
3952
    for node in oth_node, tgt_node:
3953
      res = results.get(node, False)
3954
      if not res or my_vg not in res:
3955
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3956
                                 (my_vg, node))
3957
    for dev in instance.disks:
3958
      if not dev.iv_name in self.op.disks:
3959
        continue
3960
      for node in tgt_node, oth_node:
3961
        info("checking %s on %s" % (dev.iv_name, node))
3962
        cfg.SetDiskID(dev, node)
3963
        if not self.rpc.call_blockdev_find(node, dev):
3964
          raise errors.OpExecError("Can't find device %s on node %s" %
3965
                                   (dev.iv_name, node))
3966

    
3967
    # Step: check other node consistency
3968
    self.proc.LogStep(2, steps_total, "check peer consistency")
3969
    for dev in instance.disks:
3970
      if not dev.iv_name in self.op.disks:
3971
        continue
3972
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3973
      if not _CheckDiskConsistency(self, dev, oth_node,
3974
                                   oth_node==instance.primary_node):
3975
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3976
                                 " to replace disks on this node (%s)" %
3977
                                 (oth_node, tgt_node))
3978

    
3979
    # Step: create new storage
3980
    self.proc.LogStep(3, steps_total, "allocate new storage")
3981
    for dev in instance.disks:
3982
      if not dev.iv_name in self.op.disks:
3983
        continue
3984
      size = dev.size
3985
      cfg.SetDiskID(dev, tgt_node)
3986
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3987
      names = _GenerateUniqueNames(self, lv_names)
3988
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3989
                             logical_id=(vgname, names[0]))
3990
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3991
                             logical_id=(vgname, names[1]))
3992
      new_lvs = [lv_data, lv_meta]
3993
      old_lvs = dev.children
3994
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3995
      info("creating new local storage on %s for %s" %
3996
           (tgt_node, dev.iv_name))
3997
      # since we *always* want to create this LV, we use the
3998
      # _Create...OnPrimary (which forces the creation), even if we
3999
      # are talking about the secondary node
4000
      for new_lv in new_lvs:
4001
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4002
                                        _GetInstanceInfoText(instance)):
4003
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4004
                                   " node '%s'" %
4005
                                   (new_lv.logical_id[1], tgt_node))
4006

    
4007
    # Step: for each lv, detach+rename*2+attach
4008
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4009
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4010
      info("detaching %s drbd from local storage" % dev.iv_name)
4011
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4012
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4013
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4014
      #dev.children = []
4015
      #cfg.Update(instance)
4016

    
4017
      # ok, we created the new LVs, so now we know we have the needed
4018
      # storage; as such, we proceed on the target node to rename
4019
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4020
      # using the assumption that logical_id == physical_id (which in
4021
      # turn is the unique_id on that node)
4022

    
4023
      # FIXME(iustin): use a better name for the replaced LVs
4024
      temp_suffix = int(time.time())
4025
      ren_fn = lambda d, suff: (d.physical_id[0],
4026
                                d.physical_id[1] + "_replaced-%s" % suff)
4027
      # build the rename list based on what LVs exist on the node
4028
      rlist = []
4029
      for to_ren in old_lvs:
4030
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4031
        if find_res is not None: # device exists
4032
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4033

    
4034
      info("renaming the old LVs on the target node")
4035
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4036
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4037
      # now we rename the new LVs to the old LVs
4038
      info("renaming the new LVs on the target node")
4039
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4040
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4041
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4042

    
4043
      for old, new in zip(old_lvs, new_lvs):
4044
        new.logical_id = old.logical_id
4045
        cfg.SetDiskID(new, tgt_node)
4046

    
4047
      for disk in old_lvs:
4048
        disk.logical_id = ren_fn(disk, temp_suffix)
4049
        cfg.SetDiskID(disk, tgt_node)
4050

    
4051
      # now that the new lvs have the old name, we can add them to the device
4052
      info("adding new mirror component on %s" % tgt_node)
4053
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4054
        for new_lv in new_lvs:
4055
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4056
            warning("Can't rollback device %s", hint="manually cleanup unused"
4057
                    " logical volumes")
4058
        raise errors.OpExecError("Can't add local storage to drbd")
4059

    
4060
      dev.children = new_lvs
4061
      cfg.Update(instance)
4062

    
4063
    # Step: wait for sync
4064

    
4065
    # this can fail as the old devices are degraded and _WaitForSync
4066
    # does a combined result over all disks, so we don't check its
4067
    # return value
4068
    self.proc.LogStep(5, steps_total, "sync devices")
4069
    _WaitForSync(self, instance, unlock=True)
4070

    
4071
    # so check manually all the devices
4072
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4073
      cfg.SetDiskID(dev, instance.primary_node)
4074
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4075
      if is_degr:
4076
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4077

    
4078
    # Step: remove old storage
4079
    self.proc.LogStep(6, steps_total, "removing old storage")
4080
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4081
      info("remove logical volumes for %s" % name)
4082
      for lv in old_lvs:
4083
        cfg.SetDiskID(lv, tgt_node)
4084
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4085
          warning("Can't remove old LV", hint="manually remove unused LVs")
4086
          continue
4087

    
4088
  def _ExecD8Secondary(self, feedback_fn):
4089
    """Replace the secondary node for drbd8.
4090

4091
    The algorithm for replace is quite complicated:
4092
      - for all disks of the instance:
4093
        - create new LVs on the new node with same names
4094
        - shutdown the drbd device on the old secondary
4095
        - disconnect the drbd network on the primary
4096
        - create the drbd device on the new secondary
4097
        - network attach the drbd on the primary, using an artifice:
4098
          the drbd code for Attach() will connect to the network if it
4099
          finds a device which is connected to the good local disks but
4100
          not network enabled
4101
      - wait for sync across all devices
4102
      - remove all disks from the old secondary
4103

4104
    Failures are not very well handled.
4105

4106
    """
4107
    steps_total = 6
4108
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4109
    instance = self.instance
4110
    iv_names = {}
4111
    vgname = self.cfg.GetVGName()
4112
    # start of work
4113
    cfg = self.cfg
4114
    old_node = self.tgt_node
4115
    new_node = self.new_node
4116
    pri_node = instance.primary_node
4117

    
4118
    # Step: check device activation
4119
    self.proc.LogStep(1, steps_total, "check device existence")
4120
    info("checking volume groups")
4121
    my_vg = cfg.GetVGName()
4122
    results = self.rpc.call_vg_list([pri_node, new_node])
4123
    if not results:
4124
      raise errors.OpExecError("Can't list volume groups on the nodes")
4125
    for node in pri_node, new_node:
4126
      res = results.get(node, False)
4127
      if not res or my_vg not in res:
4128
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4129
                                 (my_vg, node))
4130
    for dev in instance.disks:
4131
      if not dev.iv_name in self.op.disks:
4132
        continue
4133
      info("checking %s on %s" % (dev.iv_name, pri_node))
4134
      cfg.SetDiskID(dev, pri_node)
4135
      if not self.rpc.call_blockdev_find(pri_node, dev):
4136
        raise errors.OpExecError("Can't find device %s on node %s" %
4137
                                 (dev.iv_name, pri_node))
4138

    
4139
    # Step: check other node consistency
4140
    self.proc.LogStep(2, steps_total, "check peer consistency")
4141
    for dev in instance.disks:
4142
      if not dev.iv_name in self.op.disks:
4143
        continue
4144
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4145
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4146
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4147
                                 " unsafe to replace the secondary" %
4148
                                 pri_node)
4149

    
4150
    # Step: create new storage
4151
    self.proc.LogStep(3, steps_total, "allocate new storage")
4152
    for dev in instance.disks:
4153
      size = dev.size
4154
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4155
      # since we *always* want to create this LV, we use the
4156
      # _Create...OnPrimary (which forces the creation), even if we
4157
      # are talking about the secondary node
4158
      for new_lv in dev.children:
4159
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4160
                                        _GetInstanceInfoText(instance)):
4161
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4162
                                   " node '%s'" %
4163
                                   (new_lv.logical_id[1], new_node))
4164

    
4165

    
4166
    # Step 4: dbrd minors and drbd setups changes
4167
    # after this, we must manually remove the drbd minors on both the
4168
    # error and the success paths
4169
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4170
                                   instance.name)
4171
    logging.debug("Allocated minors %s" % (minors,))
4172
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4173
    for dev, new_minor in zip(instance.disks, minors):
4174
      size = dev.size
4175
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4176
      # create new devices on new_node
4177
      if pri_node == dev.logical_id[0]:
4178
        new_logical_id = (pri_node, new_node,
4179
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4180
                          dev.logical_id[5])
4181
      else:
4182
        new_logical_id = (new_node, pri_node,
4183
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4184
                          dev.logical_id[5])
4185
      iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4186
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4187
                    new_logical_id)
4188
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4189
                              logical_id=new_logical_id,
4190
                              children=dev.children)
4191
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4192
                                        new_drbd, False,
4193
                                        _GetInstanceInfoText(instance)):
4194
        self.cfg.ReleaseDRBDMinors(instance.name)
4195
        raise errors.OpExecError("Failed to create new DRBD on"
4196
                                 " node '%s'" % new_node)
4197

    
4198
    for dev in instance.disks:
4199
      # we have new devices, shutdown the drbd on the old secondary
4200
      info("shutting down drbd for %s on old node" % dev.iv_name)
4201
      cfg.SetDiskID(dev, old_node)
4202
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4203
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4204
                hint="Please cleanup this device manually as soon as possible")
4205

    
4206
    info("detaching primary drbds from the network (=> standalone)")
4207
    done = 0
4208
    for dev in instance.disks:
4209
      cfg.SetDiskID(dev, pri_node)
4210
      # set the network part of the physical (unique in bdev terms) id
4211
      # to None, meaning detach from network
4212
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4213
      # and 'find' the device, which will 'fix' it to match the
4214
      # standalone state
4215
      if self.rpc.call_blockdev_find(pri_node, dev):
4216
        done += 1
4217
      else:
4218
        warning("Failed to detach drbd %s from network, unusual case" %
4219
                dev.iv_name)
4220

    
4221
    if not done:
4222
      # no detaches succeeded (very unlikely)
4223
      self.cfg.ReleaseDRBDMinors(instance.name)
4224
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4225

    
4226
    # if we managed to detach at least one, we update all the disks of
4227
    # the instance to point to the new secondary
4228
    info("updating instance configuration")
4229
    for dev, _, new_logical_id in iv_names.itervalues():
4230
      dev.logical_id = new_logical_id
4231
      cfg.SetDiskID(dev, pri_node)
4232
    cfg.Update(instance)
4233
    # we can remove now the temp minors as now the new values are
4234
    # written to the config file (and therefore stable)
4235
    self.cfg.ReleaseDRBDMinors(instance.name)
4236

    
4237
    # and now perform the drbd attach
4238
    info("attaching primary drbds to new secondary (standalone => connected)")
4239
    failures = []
4240
    for dev in instance.disks:
4241
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4242
      # since the attach is smart, it's enough to 'find' the device,
4243
      # it will automatically activate the network, if the physical_id
4244
      # is correct
4245
      cfg.SetDiskID(dev, pri_node)
4246
      logging.debug("Disk to attach: %s", dev)
4247
      if not self.rpc.call_blockdev_find(pri_node, dev):
4248
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4249
                "please do a gnt-instance info to see the status of disks")
4250

    
4251
    # this can fail as the old devices are degraded and _WaitForSync
4252
    # does a combined result over all disks, so we don't check its
4253
    # return value
4254
    self.proc.LogStep(5, steps_total, "sync devices")
4255
    _WaitForSync(self, instance, unlock=True)
4256

    
4257
    # so check manually all the devices
4258
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4259
      cfg.SetDiskID(dev, pri_node)
4260
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4261
      if is_degr:
4262
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4263

    
4264
    self.proc.LogStep(6, steps_total, "removing old storage")
4265
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4266
      info("remove logical volumes for %s" % name)
4267
      for lv in old_lvs:
4268
        cfg.SetDiskID(lv, old_node)
4269
        if not self.rpc.call_blockdev_remove(old_node, lv):
4270
          warning("Can't remove LV on old secondary",
4271
                  hint="Cleanup stale volumes by hand")
4272

    
4273
  def Exec(self, feedback_fn):
4274
    """Execute disk replacement.
4275

4276
    This dispatches the disk replacement to the appropriate handler.
4277

4278
    """
4279
    instance = self.instance
4280

    
4281
    # Activate the instance disks if we're replacing them on a down instance
4282
    if instance.status == "down":
4283
      _StartInstanceDisks(self, instance, True)
4284

    
4285
    if instance.disk_template == constants.DT_DRBD8:
4286
      if self.op.remote_node is None:
4287
        fn = self._ExecD8DiskOnly
4288
      else:
4289
        fn = self._ExecD8Secondary
4290
    else:
4291
      raise errors.ProgrammerError("Unhandled disk replacement case")
4292

    
4293
    ret = fn(feedback_fn)
4294

    
4295
    # Deactivate the instance disks if we're replacing them on a down instance
4296
    if instance.status == "down":
4297
      _SafeShutdownInstanceDisks(self, instance)
4298

    
4299
    return ret
4300

    
4301

    
4302
class LUGrowDisk(LogicalUnit):
4303
  """Grow a disk of an instance.
4304

4305
  """
4306
  HPATH = "disk-grow"
4307
  HTYPE = constants.HTYPE_INSTANCE
4308
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4309
  REQ_BGL = False
4310

    
4311
  def ExpandNames(self):
4312
    self._ExpandAndLockInstance()
4313
    self.needed_locks[locking.LEVEL_NODE] = []
4314
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4315

    
4316
  def DeclareLocks(self, level):
4317
    if level == locking.LEVEL_NODE:
4318
      self._LockInstancesNodes()
4319

    
4320
  def BuildHooksEnv(self):
4321
    """Build hooks env.
4322

4323
    This runs on the master, the primary and all the secondaries.
4324

4325
    """
4326
    env = {
4327
      "DISK": self.op.disk,
4328
      "AMOUNT": self.op.amount,
4329
      }
4330
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4331
    nl = [
4332
      self.cfg.GetMasterNode(),
4333
      self.instance.primary_node,
4334
      ]
4335
    return env, nl, nl
4336

    
4337
  def CheckPrereq(self):
4338
    """Check prerequisites.
4339

4340
    This checks that the instance is in the cluster.
4341

4342
    """
4343
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4344
    assert instance is not None, \
4345
      "Cannot retrieve locked instance %s" % self.op.instance_name
4346

    
4347
    self.instance = instance
4348

    
4349
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4350
      raise errors.OpPrereqError("Instance's disk layout does not support"
4351
                                 " growing.")
4352

    
4353
    if instance.FindDisk(self.op.disk) is None:
4354
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4355
                                 (self.op.disk, instance.name))
4356

    
4357
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4358
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4359
                                       instance.hypervisor)
4360
    for node in nodenames:
4361
      info = nodeinfo.get(node, None)
4362
      if not info:
4363
        raise errors.OpPrereqError("Cannot get current information"
4364
                                   " from node '%s'" % node)
4365
      vg_free = info.get('vg_free', None)
4366
      if not isinstance(vg_free, int):
4367
        raise errors.OpPrereqError("Can't compute free disk space on"
4368
                                   " node %s" % node)
4369
      if self.op.amount > info['vg_free']:
4370
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4371
                                   " %d MiB available, %d MiB required" %
4372
                                   (node, info['vg_free'], self.op.amount))
4373

    
4374
  def Exec(self, feedback_fn):
4375
    """Execute disk grow.
4376

4377
    """
4378
    instance = self.instance
4379
    disk = instance.FindDisk(self.op.disk)
4380
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4381
      self.cfg.SetDiskID(disk, node)
4382
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4383
      if (not result or not isinstance(result, (list, tuple)) or
4384
          len(result) != 2):
4385
        raise errors.OpExecError("grow request failed to node %s" % node)
4386
      elif not result[0]:
4387
        raise errors.OpExecError("grow request failed to node %s: %s" %
4388
                                 (node, result[1]))
4389
    disk.RecordGrow(self.op.amount)
4390
    self.cfg.Update(instance)
4391
    if self.op.wait_for_sync:
4392
      disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4393
      if disk_abort:
4394
        logging.error("Warning: disk sync-ing has not returned a good"
4395
                      " status.\nPlease check the instance.")
4396

    
4397

    
4398
class LUQueryInstanceData(NoHooksLU):
4399
  """Query runtime instance data.
4400

4401
  """
4402
  _OP_REQP = ["instances", "static"]
4403
  REQ_BGL = False
4404

    
4405
  def ExpandNames(self):
4406
    self.needed_locks = {}
4407
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4408

    
4409
    if not isinstance(self.op.instances, list):
4410
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4411

    
4412
    if self.op.instances:
4413
      self.wanted_names = []
4414
      for name in self.op.instances:
4415
        full_name = self.cfg.ExpandInstanceName(name)
4416
        if full_name is None:
4417
          raise errors.OpPrereqError("Instance '%s' not known" %
4418
                                     self.op.instance_name)
4419
        self.wanted_names.append(full_name)
4420
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4421
    else:
4422
      self.wanted_names = None
4423
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4424

    
4425
    self.needed_locks[locking.LEVEL_NODE] = []
4426
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4427

    
4428
  def DeclareLocks(self, level):
4429
    if level == locking.LEVEL_NODE:
4430
      self._LockInstancesNodes()
4431

    
4432
  def CheckPrereq(self):
4433
    """Check prerequisites.
4434

4435
    This only checks the optional instance list against the existing names.
4436

4437
    """
4438
    if self.wanted_names is None:
4439
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4440

    
4441
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4442
                             in self.wanted_names]
4443
    return
4444

    
4445
  def _ComputeDiskStatus(self, instance, snode, dev):
4446
    """Compute block device status.
4447

4448
    """
4449
    static = self.op.static
4450
    if not static:
4451
      self.cfg.SetDiskID(dev, instance.primary_node)
4452
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4453
    else:
4454
      dev_pstatus = None
4455

    
4456
    if dev.dev_type in constants.LDS_DRBD:
4457
      # we change the snode then (otherwise we use the one passed in)
4458
      if dev.logical_id[0] == instance.primary_node:
4459
        snode = dev.logical_id[1]
4460
      else:
4461
        snode = dev.logical_id[0]
4462

    
4463
    if snode and not static:
4464
      self.cfg.SetDiskID(dev, snode)
4465
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4466
    else:
4467
      dev_sstatus = None
4468

    
4469
    if dev.children:
4470
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4471
                      for child in dev.children]
4472
    else:
4473
      dev_children = []
4474

    
4475
    data = {
4476
      "iv_name": dev.iv_name,
4477
      "dev_type": dev.dev_type,
4478
      "logical_id": dev.logical_id,
4479
      "physical_id": dev.physical_id,
4480
      "pstatus": dev_pstatus,
4481
      "sstatus": dev_sstatus,
4482
      "children": dev_children,
4483
      }
4484

    
4485
    return data
4486

    
4487
  def Exec(self, feedback_fn):
4488
    """Gather and return data"""
4489
    result = {}
4490

    
4491
    cluster = self.cfg.GetClusterInfo()
4492

    
4493
    for instance in self.wanted_instances:
4494
      if not self.op.static:
4495
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4496
                                                  instance.name,
4497
                                                  instance.hypervisor)
4498
        if remote_info and "state" in remote_info:
4499
          remote_state = "up"
4500
        else:
4501
          remote_state = "down"
4502
      else:
4503
        remote_state = None
4504
      if instance.status == "down":
4505
        config_state = "down"
4506
      else:
4507
        config_state = "up"
4508

    
4509
      disks = [self._ComputeDiskStatus(instance, None, device)
4510
               for device in instance.disks]
4511

    
4512
      idict = {
4513
        "name": instance.name,
4514
        "config_state": config_state,
4515
        "run_state": remote_state,
4516
        "pnode": instance.primary_node,
4517
        "snodes": instance.secondary_nodes,
4518
        "os": instance.os,
4519
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4520
        "disks": disks,
4521
        "hypervisor": instance.hypervisor,
4522
        "network_port": instance.network_port,
4523
        "hv_instance": instance.hvparams,
4524
        "hv_actual": cluster.FillHV(instance),
4525
        "be_instance": instance.beparams,
4526
        "be_actual": cluster.FillBE(instance),
4527
        }
4528

    
4529
      result[instance.name] = idict
4530

    
4531
    return result
4532

    
4533

    
4534
class LUSetInstanceParams(LogicalUnit):
4535
  """Modifies an instances's parameters.
4536

4537
  """
4538
  HPATH = "instance-modify"
4539
  HTYPE = constants.HTYPE_INSTANCE
4540
  _OP_REQP = ["instance_name", "hvparams"]
4541
  REQ_BGL = False
4542

    
4543
  def ExpandNames(self):
4544
    self._ExpandAndLockInstance()
4545
    self.needed_locks[locking.LEVEL_NODE] = []
4546
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4547

    
4548

    
4549
  def DeclareLocks(self, level):
4550
    if level == locking.LEVEL_NODE:
4551
      self._LockInstancesNodes()
4552

    
4553
  def BuildHooksEnv(self):
4554
    """Build hooks env.
4555

4556
    This runs on the master, primary and secondaries.
4557

4558
    """
4559
    args = dict()
4560
    if constants.BE_MEMORY in self.be_new:
4561
      args['memory'] = self.be_new[constants.BE_MEMORY]
4562
    if constants.BE_VCPUS in self.be_new:
4563
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4564
    if self.do_ip or self.do_bridge or self.mac:
4565
      if self.do_ip:
4566
        ip = self.ip
4567
      else:
4568
        ip = self.instance.nics[0].ip
4569
      if self.bridge:
4570
        bridge = self.bridge
4571
      else:
4572
        bridge = self.instance.nics[0].bridge
4573
      if self.mac:
4574
        mac = self.mac
4575
      else:
4576
        mac = self.instance.nics[0].mac
4577
      args['nics'] = [(ip, bridge, mac)]
4578
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4579
    nl = [self.cfg.GetMasterNode(),
4580
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4581
    return env, nl, nl
4582

    
4583
  def CheckPrereq(self):
4584
    """Check prerequisites.
4585

4586
    This only checks the instance list against the existing names.
4587

4588
    """
4589
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4590
    # a separate CheckArguments function, if we implement one, so the operation
4591
    # can be aborted without waiting for any lock, should it have an error...
4592
    self.ip = getattr(self.op, "ip", None)
4593
    self.mac = getattr(self.op, "mac", None)
4594
    self.bridge = getattr(self.op, "bridge", None)
4595
    self.kernel_path = getattr(self.op, "kernel_path", None)
4596
    self.initrd_path = getattr(self.op, "initrd_path", None)
4597
    self.force = getattr(self.op, "force", None)
4598
    all_parms = [self.ip, self.bridge, self.mac]
4599
    if (all_parms.count(None) == len(all_parms) and
4600
        not self.op.hvparams and
4601
        not self.op.beparams):
4602
      raise errors.OpPrereqError("No changes submitted")
4603
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4604
      val = self.op.beparams.get(item, None)
4605
      if val is not None:
4606
        try:
4607
          val = int(val)
4608
        except ValueError, err:
4609
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4610
        self.op.beparams[item] = val
4611
    if self.ip is not None:
4612
      self.do_ip = True
4613
      if self.ip.lower() == "none":
4614
        self.ip = None
4615
      else:
4616
        if not utils.IsValidIP(self.ip):
4617
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4618
    else:
4619
      self.do_ip = False
4620
    self.do_bridge = (self.bridge is not None)
4621
    if self.mac is not None:
4622
      if self.cfg.IsMacInUse(self.mac):
4623
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4624
                                   self.mac)
4625
      if not utils.IsValidMac(self.mac):
4626
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4627

    
4628
    # checking the new params on the primary/secondary nodes
4629

    
4630
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4631
    assert self.instance is not None, \
4632
      "Cannot retrieve locked instance %s" % self.op.instance_name
4633
    pnode = self.instance.primary_node
4634
    nodelist = [pnode]
4635
    nodelist.extend(instance.secondary_nodes)
4636

    
4637
    # hvparams processing
4638
    if self.op.hvparams:
4639
      i_hvdict = copy.deepcopy(instance.hvparams)
4640
      for key, val in self.op.hvparams.iteritems():
4641
        if val is None:
4642
          try:
4643
            del i_hvdict[key]
4644
          except KeyError:
4645
            pass
4646
        else:
4647
          i_hvdict[key] = val
4648
      cluster = self.cfg.GetClusterInfo()
4649
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4650
                                i_hvdict)
4651
      # local check
4652
      hypervisor.GetHypervisor(
4653
        instance.hypervisor).CheckParameterSyntax(hv_new)
4654
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4655
      self.hv_new = hv_new # the new actual values
4656
      self.hv_inst = i_hvdict # the new dict (without defaults)
4657
    else:
4658
      self.hv_new = self.hv_inst = {}
4659

    
4660
    # beparams processing
4661
    if self.op.beparams:
4662
      i_bedict = copy.deepcopy(instance.beparams)
4663
      for key, val in self.op.beparams.iteritems():
4664
        if val is None:
4665
          try:
4666
            del i_bedict[key]
4667
          except KeyError:
4668
            pass
4669
        else:
4670
          i_bedict[key] = val
4671
      cluster = self.cfg.GetClusterInfo()
4672
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4673
                                i_bedict)
4674
      self.be_new = be_new # the new actual values
4675
      self.be_inst = i_bedict # the new dict (without defaults)
4676
    else:
4677
      self.hv_new = self.hv_inst = {}
4678

    
4679
    self.warn = []
4680

    
4681
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4682
      mem_check_list = [pnode]
4683
      if be_new[constants.BE_AUTO_BALANCE]:
4684
        # either we changed auto_balance to yes or it was from before
4685
        mem_check_list.extend(instance.secondary_nodes)
4686
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4687
                                                  instance.hypervisor)
4688
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4689
                                         instance.hypervisor)
4690

    
4691
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4692
        # Assume the primary node is unreachable and go ahead
4693
        self.warn.append("Can't get info from primary node %s" % pnode)
4694
      else:
4695
        if instance_info:
4696
          current_mem = instance_info['memory']
4697
        else:
4698
          # Assume instance not running
4699
          # (there is a slight race condition here, but it's not very probable,
4700
          # and we have no other way to check)
4701
          current_mem = 0
4702
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4703
                    nodeinfo[pnode]['memory_free'])
4704
        if miss_mem > 0:
4705
          raise errors.OpPrereqError("This change will prevent the instance"
4706
                                     " from starting, due to %d MB of memory"
4707
                                     " missing on its primary node" % miss_mem)
4708

    
4709
      if be_new[constants.BE_AUTO_BALANCE]:
4710
        for node in instance.secondary_nodes:
4711
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4712
            self.warn.append("Can't get info from secondary node %s" % node)
4713
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4714
            self.warn.append("Not enough memory to failover instance to"
4715
                             " secondary node %s" % node)
4716

    
4717
    return
4718

    
4719
  def Exec(self, feedback_fn):
4720
    """Modifies an instance.
4721

4722
    All parameters take effect only at the next restart of the instance.
4723
    """
4724
    # Process here the warnings from CheckPrereq, as we don't have a
4725
    # feedback_fn there.
4726
    for warn in self.warn:
4727
      feedback_fn("WARNING: %s" % warn)
4728

    
4729
    result = []
4730
    instance = self.instance
4731
    if self.do_ip:
4732
      instance.nics[0].ip = self.ip
4733
      result.append(("ip", self.ip))
4734
    if self.bridge:
4735
      instance.nics[0].bridge = self.bridge
4736
      result.append(("bridge", self.bridge))
4737
    if self.mac:
4738
      instance.nics[0].mac = self.mac
4739
      result.append(("mac", self.mac))
4740
    if self.op.hvparams:
4741
      instance.hvparams = self.hv_new
4742
      for key, val in self.op.hvparams.iteritems():
4743
        result.append(("hv/%s" % key, val))
4744
    if self.op.beparams:
4745
      instance.beparams = self.be_inst
4746
      for key, val in self.op.beparams.iteritems():
4747
        result.append(("be/%s" % key, val))
4748

    
4749
    self.cfg.Update(instance)
4750

    
4751
    return result
4752

    
4753

    
4754
class LUQueryExports(NoHooksLU):
4755
  """Query the exports list
4756

4757
  """
4758
  _OP_REQP = ['nodes']
4759
  REQ_BGL = False
4760

    
4761
  def ExpandNames(self):
4762
    self.needed_locks = {}
4763
    self.share_locks[locking.LEVEL_NODE] = 1
4764
    if not self.op.nodes:
4765
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4766
    else:
4767
      self.needed_locks[locking.LEVEL_NODE] = \
4768
        _GetWantedNodes(self, self.op.nodes)
4769

    
4770
  def CheckPrereq(self):
4771
    """Check prerequisites.
4772

4773
    """
4774
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4775

    
4776
  def Exec(self, feedback_fn):
4777
    """Compute the list of all the exported system images.
4778

4779
    Returns:
4780
      a dictionary with the structure node->(export-list)
4781
      where export-list is a list of the instances exported on
4782
      that node.
4783

4784
    """
4785
    return self.rpc.call_export_list(self.nodes)
4786

    
4787

    
4788
class LUExportInstance(LogicalUnit):
4789
  """Export an instance to an image in the cluster.
4790

4791
  """
4792
  HPATH = "instance-export"
4793
  HTYPE = constants.HTYPE_INSTANCE
4794
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4795
  REQ_BGL = False
4796

    
4797
  def ExpandNames(self):
4798
    self._ExpandAndLockInstance()
4799
    # FIXME: lock only instance primary and destination node
4800
    #
4801
    # Sad but true, for now we have do lock all nodes, as we don't know where
4802
    # the previous export might be, and and in this LU we search for it and
4803
    # remove it from its current node. In the future we could fix this by:
4804
    #  - making a tasklet to search (share-lock all), then create the new one,
4805
    #    then one to remove, after
4806
    #  - removing the removal operation altoghether
4807
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4808

    
4809
  def DeclareLocks(self, level):
4810
    """Last minute lock declaration."""
4811
    # All nodes are locked anyway, so nothing to do here.
4812

    
4813
  def BuildHooksEnv(self):
4814
    """Build hooks env.
4815

4816
    This will run on the master, primary node and target node.
4817

4818
    """
4819
    env = {
4820
      "EXPORT_NODE": self.op.target_node,
4821
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4822
      }
4823
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4824
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4825
          self.op.target_node]
4826
    return env, nl, nl
4827

    
4828
  def CheckPrereq(self):
4829
    """Check prerequisites.
4830

4831
    This checks that the instance and node names are valid.
4832

4833
    """
4834
    instance_name = self.op.instance_name
4835
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4836
    assert self.instance is not None, \
4837
          "Cannot retrieve locked instance %s" % self.op.instance_name
4838

    
4839
    self.dst_node = self.cfg.GetNodeInfo(
4840
      self.cfg.ExpandNodeName(self.op.target_node))
4841

    
4842
    assert self.dst_node is not None, \
4843
          "Cannot retrieve locked node %s" % self.op.target_node
4844

    
4845
    # instance disk type verification
4846
    for disk in self.instance.disks:
4847
      if disk.dev_type == constants.LD_FILE:
4848
        raise errors.OpPrereqError("Export not supported for instances with"
4849
                                   " file-based disks")
4850

    
4851
  def Exec(self, feedback_fn):
4852
    """Export an instance to an image in the cluster.
4853

4854
    """
4855
    instance = self.instance
4856
    dst_node = self.dst_node
4857
    src_node = instance.primary_node
4858
    if self.op.shutdown:
4859
      # shutdown the instance, but not the disks
4860
      if not self.rpc.call_instance_shutdown(src_node, instance):
4861
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4862
                                 (instance.name, src_node))
4863

    
4864
    vgname = self.cfg.GetVGName()
4865

    
4866
    snap_disks = []
4867

    
4868
    try:
4869
      for disk in instance.disks:
4870
        if disk.iv_name == "sda":
4871
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4872
          new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4873

    
4874
          if not new_dev_name:
4875
            logging.error("Could not snapshot block device %s on node %s",
4876
                          disk.logical_id[1], src_node)
4877
          else:
4878
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4879
                                      logical_id=(vgname, new_dev_name),
4880
                                      physical_id=(vgname, new_dev_name),
4881
                                      iv_name=disk.iv_name)
4882
            snap_disks.append(new_dev)
4883

    
4884
    finally:
4885
      if self.op.shutdown and instance.status == "up":
4886
        if not self.rpc.call_instance_start(src_node, instance, None):
4887
          _ShutdownInstanceDisks(self, instance)
4888
          raise errors.OpExecError("Could not start instance")
4889

    
4890
    # TODO: check for size
4891

    
4892
    cluster_name = self.cfg.GetClusterName()
4893
    for dev in snap_disks:
4894
      if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4895
                                      instance, cluster_name):
4896
        logging.error("Could not export block device %s from node %s to"
4897
                      " node %s", dev.logical_id[1], src_node, dst_node.name)
4898
      if not self.rpc.call_blockdev_remove(src_node, dev):
4899
        logging.error("Could not remove snapshot block device %s from node"
4900
                      " %s", dev.logical_id[1], src_node)
4901

    
4902
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4903
      logging.error("Could not finalize export for instance %s on node %s",
4904
                    instance.name, dst_node.name)
4905

    
4906
    nodelist = self.cfg.GetNodeList()
4907
    nodelist.remove(dst_node.name)
4908

    
4909
    # on one-node clusters nodelist will be empty after the removal
4910
    # if we proceed the backup would be removed because OpQueryExports
4911
    # substitutes an empty list with the full cluster node list.
4912
    if nodelist:
4913
      exportlist = self.rpc.call_export_list(nodelist)
4914
      for node in exportlist:
4915
        if instance.name in exportlist[node]:
4916
          if not self.rpc.call_export_remove(node, instance.name):
4917
            logging.error("Could not remove older export for instance %s"
4918
                          " on node %s", instance.name, node)
4919

    
4920

    
4921
class LURemoveExport(NoHooksLU):
4922
  """Remove exports related to the named instance.
4923

4924
  """
4925
  _OP_REQP = ["instance_name"]
4926
  REQ_BGL = False
4927

    
4928
  def ExpandNames(self):
4929
    self.needed_locks = {}
4930
    # We need all nodes to be locked in order for RemoveExport to work, but we
4931
    # don't need to lock the instance itself, as nothing will happen to it (and
4932
    # we can remove exports also for a removed instance)
4933
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4934

    
4935
  def CheckPrereq(self):
4936
    """Check prerequisites.
4937
    """
4938
    pass
4939

    
4940
  def Exec(self, feedback_fn):
4941
    """Remove any export.
4942

4943
    """
4944
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4945
    # If the instance was not found we'll try with the name that was passed in.
4946
    # This will only work if it was an FQDN, though.
4947
    fqdn_warn = False
4948
    if not instance_name:
4949
      fqdn_warn = True
4950
      instance_name = self.op.instance_name
4951

    
4952
    exportlist = self.rpc.call_export_list(self.acquired_locks[
4953
      locking.LEVEL_NODE])
4954
    found = False
4955
    for node in exportlist:
4956
      if instance_name in exportlist[node]:
4957
        found = True
4958
        if not self.rpc.call_export_remove(node, instance_name):
4959
          logging.error("Could not remove export for instance %s"
4960
                        " on node %s", instance_name, node)
4961

    
4962
    if fqdn_warn and not found:
4963
      feedback_fn("Export not found. If trying to remove an export belonging"
4964
                  " to a deleted instance please use its Fully Qualified"
4965
                  " Domain Name.")
4966

    
4967

    
4968
class TagsLU(NoHooksLU):
4969
  """Generic tags LU.
4970

4971
  This is an abstract class which is the parent of all the other tags LUs.
4972

4973
  """
4974

    
4975
  def ExpandNames(self):
4976
    self.needed_locks = {}
4977
    if self.op.kind == constants.TAG_NODE:
4978
      name = self.cfg.ExpandNodeName(self.op.name)
4979
      if name is None:
4980
        raise errors.OpPrereqError("Invalid node name (%s)" %
4981
                                   (self.op.name,))
4982
      self.op.name = name
4983
      self.needed_locks[locking.LEVEL_NODE] = name
4984
    elif self.op.kind == constants.TAG_INSTANCE:
4985
      name = self.cfg.ExpandInstanceName(self.op.name)
4986
      if name is None:
4987
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4988
                                   (self.op.name,))
4989
      self.op.name = name
4990
      self.needed_locks[locking.LEVEL_INSTANCE] = name
4991

    
4992
  def CheckPrereq(self):
4993
    """Check prerequisites.
4994

4995
    """
4996
    if self.op.kind == constants.TAG_CLUSTER:
4997
      self.target = self.cfg.GetClusterInfo()
4998
    elif self.op.kind == constants.TAG_NODE:
4999
      self.target = self.cfg.GetNodeInfo(self.op.name)
5000
    elif self.op.kind == constants.TAG_INSTANCE:
5001
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5002
    else:
5003
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5004
                                 str(self.op.kind))
5005

    
5006

    
5007
class LUGetTags(TagsLU):
5008
  """Returns the tags of a given object.
5009

5010
  """
5011
  _OP_REQP = ["kind", "name"]
5012
  REQ_BGL = False
5013

    
5014
  def Exec(self, feedback_fn):
5015
    """Returns the tag list.
5016

5017
    """
5018
    return list(self.target.GetTags())
5019

    
5020

    
5021
class LUSearchTags(NoHooksLU):
5022
  """Searches the tags for a given pattern.
5023

5024
  """
5025
  _OP_REQP = ["pattern"]
5026
  REQ_BGL = False
5027

    
5028
  def ExpandNames(self):
5029
    self.needed_locks = {}
5030

    
5031
  def CheckPrereq(self):
5032
    """Check prerequisites.
5033

5034
    This checks the pattern passed for validity by compiling it.
5035

5036
    """
5037
    try:
5038
      self.re = re.compile(self.op.pattern)
5039
    except re.error, err:
5040
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5041
                                 (self.op.pattern, err))
5042

    
5043
  def Exec(self, feedback_fn):
5044
    """Returns the tag list.
5045

5046
    """
5047
    cfg = self.cfg
5048
    tgts = [("/cluster", cfg.GetClusterInfo())]
5049
    ilist = cfg.GetAllInstancesInfo().values()
5050
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5051
    nlist = cfg.GetAllNodesInfo().values()
5052
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5053
    results = []
5054
    for path, target in tgts:
5055
      for tag in target.GetTags():
5056
        if self.re.search(tag):
5057
          results.append((path, tag))
5058
    return results
5059

    
5060

    
5061
class LUAddTags(TagsLU):
5062
  """Sets a tag on a given object.
5063

5064
  """
5065
  _OP_REQP = ["kind", "name", "tags"]
5066
  REQ_BGL = False
5067

    
5068
  def CheckPrereq(self):
5069
    """Check prerequisites.
5070

5071
    This checks the type and length of the tag name and value.
5072

5073
    """
5074
    TagsLU.CheckPrereq(self)
5075
    for tag in self.op.tags:
5076
      objects.TaggableObject.ValidateTag(tag)
5077

    
5078
  def Exec(self, feedback_fn):
5079
    """Sets the tag.
5080

5081
    """
5082
    try:
5083
      for tag in self.op.tags:
5084
        self.target.AddTag(tag)
5085
    except errors.TagError, err:
5086
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5087
    try:
5088
      self.cfg.Update(self.target)
5089
    except errors.ConfigurationError:
5090
      raise errors.OpRetryError("There has been a modification to the"
5091
                                " config file and the operation has been"
5092
                                " aborted. Please retry.")
5093

    
5094

    
5095
class LUDelTags(TagsLU):
5096
  """Delete a list of tags from a given object.
5097

5098
  """
5099
  _OP_REQP = ["kind", "name", "tags"]
5100
  REQ_BGL = False
5101

    
5102
  def CheckPrereq(self):
5103
    """Check prerequisites.
5104

5105
    This checks that we have the given tag.
5106

5107
    """
5108
    TagsLU.CheckPrereq(self)
5109
    for tag in self.op.tags:
5110
      objects.TaggableObject.ValidateTag(tag)
5111
    del_tags = frozenset(self.op.tags)
5112
    cur_tags = self.target.GetTags()
5113
    if not del_tags <= cur_tags:
5114
      diff_tags = del_tags - cur_tags
5115
      diff_names = ["'%s'" % tag for tag in diff_tags]
5116
      diff_names.sort()
5117
      raise errors.OpPrereqError("Tag(s) %s not found" %
5118
                                 (",".join(diff_names)))
5119

    
5120
  def Exec(self, feedback_fn):
5121
    """Remove the tag from the object.
5122

5123
    """
5124
    for tag in self.op.tags:
5125
      self.target.RemoveTag(tag)
5126
    try:
5127
      self.cfg.Update(self.target)
5128
    except errors.ConfigurationError:
5129
      raise errors.OpRetryError("There has been a modification to the"
5130
                                " config file and the operation has been"
5131
                                " aborted. Please retry.")
5132

    
5133

    
5134
class LUTestDelay(NoHooksLU):
5135
  """Sleep for a specified amount of time.
5136

5137
  This LU sleeps on the master and/or nodes for a specified amount of
5138
  time.
5139

5140
  """
5141
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5142
  REQ_BGL = False
5143

    
5144
  def ExpandNames(self):
5145
    """Expand names and set required locks.
5146

5147
    This expands the node list, if any.
5148

5149
    """
5150
    self.needed_locks = {}
5151
    if self.op.on_nodes:
5152
      # _GetWantedNodes can be used here, but is not always appropriate to use
5153
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5154
      # more information.
5155
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5156
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5157

    
5158
  def CheckPrereq(self):
5159
    """Check prerequisites.
5160

5161
    """
5162

    
5163
  def Exec(self, feedback_fn):
5164
    """Do the actual sleep.
5165

5166
    """
5167
    if self.op.on_master:
5168
      if not utils.TestDelay(self.op.duration):
5169
        raise errors.OpExecError("Error during master delay test")
5170
    if self.op.on_nodes:
5171
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5172
      if not result:
5173
        raise errors.OpExecError("Complete failure from rpc call")
5174
      for node, node_result in result.items():
5175
        if not node_result:
5176
          raise errors.OpExecError("Failure during rpc call to node %s,"
5177
                                   " result: %s" % (node, node_result))
5178

    
5179

    
5180
class IAllocator(object):
5181
  """IAllocator framework.
5182

5183
  An IAllocator instance has three sets of attributes:
5184
    - cfg that is needed to query the cluster
5185
    - input data (all members of the _KEYS class attribute are required)
5186
    - four buffer attributes (in|out_data|text), that represent the
5187
      input (to the external script) in text and data structure format,
5188
      and the output from it, again in two formats
5189
    - the result variables from the script (success, info, nodes) for
5190
      easy usage
5191

5192
  """
5193
  _ALLO_KEYS = [
5194
    "mem_size", "disks", "disk_template",
5195
    "os", "tags", "nics", "vcpus",
5196
    ]
5197
  _RELO_KEYS = [
5198
    "relocate_from",
5199
    ]
5200

    
5201
  def __init__(self, lu, mode, name, **kwargs):
5202
    self.lu = lu
5203
    # init buffer variables
5204
    self.in_text = self.out_text = self.in_data = self.out_data = None
5205
    # init all input fields so that pylint is happy
5206
    self.mode = mode
5207
    self.name = name
5208
    self.mem_size = self.disks = self.disk_template = None
5209
    self.os = self.tags = self.nics = self.vcpus = None
5210
    self.relocate_from = None
5211
    # computed fields
5212
    self.required_nodes = None
5213
    # init result fields
5214
    self.success = self.info = self.nodes = None
5215
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5216
      keyset = self._ALLO_KEYS
5217
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5218
      keyset = self._RELO_KEYS
5219
    else:
5220
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5221
                                   " IAllocator" % self.mode)
5222
    for key in kwargs:
5223
      if key not in keyset:
5224
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5225
                                     " IAllocator" % key)
5226
      setattr(self, key, kwargs[key])
5227
    for key in keyset:
5228
      if key not in kwargs:
5229
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5230
                                     " IAllocator" % key)
5231
    self._BuildInputData()
5232

    
5233
  def _ComputeClusterData(self):
5234
    """Compute the generic allocator input data.
5235

5236
    This is the data that is independent of the actual operation.
5237

5238
    """
5239
    cfg = self.lu.cfg
5240
    cluster_info = cfg.GetClusterInfo()
5241
    # cluster data
5242
    data = {
5243
      "version": 1,
5244
      "cluster_name": cfg.GetClusterName(),
5245
      "cluster_tags": list(cluster_info.GetTags()),
5246
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5247
      # we don't have job IDs
5248
      }
5249

    
5250
    i_list = []
5251
    cluster = self.cfg.GetClusterInfo()
5252
    for iname in cfg.GetInstanceList():
5253
      i_obj = cfg.GetInstanceInfo(iname)
5254
      i_list.append((i_obj, cluster.FillBE(i_obj)))
5255

    
5256
    # node data
5257
    node_results = {}
5258
    node_list = cfg.GetNodeList()
5259
    # FIXME: here we have only one hypervisor information, but
5260
    # instance can belong to different hypervisors
5261
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5262
                                           cfg.GetHypervisorType())
5263
    for nname in node_list:
5264
      ninfo = cfg.GetNodeInfo(nname)
5265
      if nname not in node_data or not isinstance(node_data[nname], dict):
5266
        raise errors.OpExecError("Can't get data for node %s" % nname)
5267
      remote_info = node_data[nname]
5268
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5269
                   'vg_size', 'vg_free', 'cpu_total']:
5270
        if attr not in remote_info:
5271
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5272
                                   (nname, attr))
5273
        try:
5274
          remote_info[attr] = int(remote_info[attr])
5275
        except ValueError, err:
5276
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5277
                                   " %s" % (nname, attr, str(err)))
5278
      # compute memory used by primary instances
5279
      i_p_mem = i_p_up_mem = 0
5280
      for iinfo, beinfo in i_list:
5281
        if iinfo.primary_node == nname:
5282
          i_p_mem += beinfo[constants.BE_MEMORY]
5283
          if iinfo.status == "up":
5284
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5285

    
5286
      # compute memory used by instances
5287
      pnr = {
5288
        "tags": list(ninfo.GetTags()),
5289
        "total_memory": remote_info['memory_total'],
5290
        "reserved_memory": remote_info['memory_dom0'],
5291
        "free_memory": remote_info['memory_free'],
5292
        "i_pri_memory": i_p_mem,
5293
        "i_pri_up_memory": i_p_up_mem,
5294
        "total_disk": remote_info['vg_size'],
5295
        "free_disk": remote_info['vg_free'],
5296
        "primary_ip": ninfo.primary_ip,
5297
        "secondary_ip": ninfo.secondary_ip,
5298
        "total_cpus": remote_info['cpu_total'],
5299
        }
5300
      node_results[nname] = pnr
5301
    data["nodes"] = node_results
5302

    
5303
    # instance data
5304
    instance_data = {}
5305
    for iinfo, beinfo in i_list:
5306
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5307
                  for n in iinfo.nics]
5308
      pir = {
5309
        "tags": list(iinfo.GetTags()),
5310
        "should_run": iinfo.status == "up",
5311
        "vcpus": beinfo[constants.BE_VCPUS],
5312
        "memory": beinfo[constants.BE_MEMORY],
5313
        "os": iinfo.os,
5314
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5315
        "nics": nic_data,
5316
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5317
        "disk_template": iinfo.disk_template,
5318
        "hypervisor": iinfo.hypervisor,
5319
        }
5320
      instance_data[iinfo.name] = pir
5321

    
5322
    data["instances"] = instance_data
5323

    
5324
    self.in_data = data
5325

    
5326
  def _AddNewInstance(self):
5327
    """Add new instance data to allocator structure.
5328

5329
    This in combination with _AllocatorGetClusterData will create the
5330
    correct structure needed as input for the allocator.
5331

5332
    The checks for the completeness of the opcode must have already been
5333
    done.
5334

5335
    """
5336
    data = self.in_data
5337
    if len(self.disks) != 2:
5338
      raise errors.OpExecError("Only two-disk configurations supported")
5339

    
5340
    disk_space = _ComputeDiskSize(self.disk_template,
5341
                                  self.disks[0]["size"], self.disks[1]["size"])
5342

    
5343
    if self.disk_template in constants.DTS_NET_MIRROR:
5344
      self.required_nodes = 2
5345
    else:
5346
      self.required_nodes = 1
5347
    request = {
5348
      "type": "allocate",
5349
      "name": self.name,
5350
      "disk_template": self.disk_template,
5351
      "tags": self.tags,
5352
      "os": self.os,
5353
      "vcpus": self.vcpus,
5354
      "memory": self.mem_size,
5355
      "disks": self.disks,
5356
      "disk_space_total": disk_space,
5357
      "nics": self.nics,
5358
      "required_nodes": self.required_nodes,
5359
      }
5360
    data["request"] = request
5361

    
5362
  def _AddRelocateInstance(self):
5363
    """Add relocate instance data to allocator structure.
5364

5365
    This in combination with _IAllocatorGetClusterData will create the
5366
    correct structure needed as input for the allocator.
5367

5368
    The checks for the completeness of the opcode must have already been
5369
    done.
5370

5371
    """
5372
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5373
    if instance is None:
5374
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5375
                                   " IAllocator" % self.name)
5376

    
5377
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5378
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5379

    
5380
    if len(instance.secondary_nodes) != 1:
5381
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5382

    
5383
    self.required_nodes = 1
5384

    
5385
    disk_space = _ComputeDiskSize(instance.disk_template,
5386
                                  instance.disks[0].size,
5387
                                  instance.disks[1].size)
5388

    
5389
    request = {
5390
      "type": "relocate",
5391
      "name": self.name,
5392
      "disk_space_total": disk_space,
5393
      "required_nodes": self.required_nodes,
5394
      "relocate_from": self.relocate_from,
5395
      }
5396
    self.in_data["request"] = request
5397

    
5398
  def _BuildInputData(self):
5399
    """Build input data structures.
5400

5401
    """
5402
    self._ComputeClusterData()
5403

    
5404
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5405
      self._AddNewInstance()
5406
    else:
5407
      self._AddRelocateInstance()
5408

    
5409
    self.in_text = serializer.Dump(self.in_data)
5410

    
5411
  def Run(self, name, validate=True, call_fn=None):
5412
    """Run an instance allocator and return the results.
5413

5414
    """
5415
    if call_fn is None:
5416
      call_fn = self.lu.rpc.call_iallocator_runner
5417
    data = self.in_text
5418

    
5419
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5420

    
5421
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5422
      raise errors.OpExecError("Invalid result from master iallocator runner")
5423

    
5424
    rcode, stdout, stderr, fail = result
5425

    
5426
    if rcode == constants.IARUN_NOTFOUND:
5427
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5428
    elif rcode == constants.IARUN_FAILURE:
5429
      raise errors.OpExecError("Instance allocator call failed: %s,"
5430
                               " output: %s" % (fail, stdout+stderr))
5431
    self.out_text = stdout
5432
    if validate:
5433
      self._ValidateResult()
5434

    
5435
  def _ValidateResult(self):
5436
    """Process the allocator results.
5437

5438
    This will process and if successful save the result in
5439
    self.out_data and the other parameters.
5440

5441
    """
5442
    try:
5443
      rdict = serializer.Load(self.out_text)
5444
    except Exception, err:
5445
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5446

    
5447
    if not isinstance(rdict, dict):
5448
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5449

    
5450
    for key in "success", "info", "nodes":
5451
      if key not in rdict:
5452
        raise errors.OpExecError("Can't parse iallocator results:"
5453
                                 " missing key '%s'" % key)
5454
      setattr(self, key, rdict[key])
5455

    
5456
    if not isinstance(rdict["nodes"], list):
5457
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5458
                               " is not a list")
5459
    self.out_data = rdict
5460

    
5461

    
5462
class LUTestAllocator(NoHooksLU):
5463
  """Run allocator tests.
5464

5465
  This LU runs the allocator tests
5466

5467
  """
5468
  _OP_REQP = ["direction", "mode", "name"]
5469

    
5470
  def CheckPrereq(self):
5471
    """Check prerequisites.
5472

5473
    This checks the opcode parameters depending on the director and mode test.
5474

5475
    """
5476
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5477
      for attr in ["name", "mem_size", "disks", "disk_template",
5478
                   "os", "tags", "nics", "vcpus"]:
5479
        if not hasattr(self.op, attr):
5480
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5481
                                     attr)
5482
      iname = self.cfg.ExpandInstanceName(self.op.name)
5483
      if iname is not None:
5484
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5485
                                   iname)
5486
      if not isinstance(self.op.nics, list):
5487
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5488
      for row in self.op.nics:
5489
        if (not isinstance(row, dict) or
5490
            "mac" not in row or
5491
            "ip" not in row or
5492
            "bridge" not in row):
5493
          raise errors.OpPrereqError("Invalid contents of the"
5494
                                     " 'nics' parameter")
5495
      if not isinstance(self.op.disks, list):
5496
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5497
      if len(self.op.disks) != 2:
5498
        raise errors.OpPrereqError("Only two-disk configurations supported")
5499
      for row in self.op.disks:
5500
        if (not isinstance(row, dict) or
5501
            "size" not in row or
5502
            not isinstance(row["size"], int) or
5503
            "mode" not in row or
5504
            row["mode"] not in ['r', 'w']):
5505
          raise errors.OpPrereqError("Invalid contents of the"
5506
                                     " 'disks' parameter")
5507
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5508
      if not hasattr(self.op, "name"):
5509
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5510
      fname = self.cfg.ExpandInstanceName(self.op.name)
5511
      if fname is None:
5512
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5513
                                   self.op.name)
5514
      self.op.name = fname
5515
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5516
    else:
5517
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5518
                                 self.op.mode)
5519

    
5520
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5521
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5522
        raise errors.OpPrereqError("Missing allocator name")
5523
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5524
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5525
                                 self.op.direction)
5526

    
5527
  def Exec(self, feedback_fn):
5528
    """Run the allocator test.
5529

5530
    """
5531
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5532
      ial = IAllocator(self,
5533
                       mode=self.op.mode,
5534
                       name=self.op.name,
5535
                       mem_size=self.op.mem_size,
5536
                       disks=self.op.disks,
5537
                       disk_template=self.op.disk_template,
5538
                       os=self.op.os,
5539
                       tags=self.op.tags,
5540
                       nics=self.op.nics,
5541
                       vcpus=self.op.vcpus,
5542
                       )
5543
    else:
5544
      ial = IAllocator(self,
5545
                       mode=self.op.mode,
5546
                       name=self.op.name,
5547
                       relocate_from=list(self.relocate_from),
5548
                       )
5549

    
5550
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5551
      result = ial.in_text
5552
    else:
5553
      ial.Run(self.op.allocator, validate=False)
5554
      result = ial.out_text
5555
    return result