Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 779c15bb

History | View | Annotate | Download (189.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_BGL = True
69

    
70
  def __init__(self, processor, op, context, rpc):
71
    """Constructor for LogicalUnit.
72

73
    This needs to be overriden in derived classes in order to check op
74
    validity.
75

76
    """
77
    self.proc = processor
78
    self.op = op
79
    self.cfg = context.cfg
80
    self.context = context
81
    self.rpc = rpc
82
    # Dicts used to declare locking needs to mcpu
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    self.add_locks = {}
87
    self.remove_locks = {}
88
    # Used to force good behavior when calling helper functions
89
    self.recalculate_locks = {}
90
    self.__ssh = None
91

    
92
    for attr_name in self._OP_REQP:
93
      attr_val = getattr(op, attr_name, None)
94
      if attr_val is None:
95
        raise errors.OpPrereqError("Required parameter '%s' missing" %
96
                                   attr_name)
97

    
98
    if not self.cfg.IsCluster():
99
      raise errors.OpPrereqError("Cluster not initialized yet,"
100
                                 " use 'gnt-cluster init' first.")
101
    if self.REQ_MASTER:
102
      master = self.cfg.GetMasterNode()
103
      if master != utils.HostInfo().name:
104
        raise errors.OpPrereqError("Commands must be run on the master"
105
                                   " node %s" % master)
106

    
107
  def __GetSSH(self):
108
    """Returns the SshRunner object
109

110
    """
111
    if not self.__ssh:
112
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113
    return self.__ssh
114

    
115
  ssh = property(fget=__GetSSH)
116

    
117
  def ExpandNames(self):
118
    """Expand names for this LU.
119

120
    This method is called before starting to execute the opcode, and it should
121
    update all the parameters of the opcode to their canonical form (e.g. a
122
    short node name must be fully expanded after this method has successfully
123
    completed). This way locking, hooks, logging, ecc. can work correctly.
124

125
    LUs which implement this method must also populate the self.needed_locks
126
    member, as a dict with lock levels as keys, and a list of needed lock names
127
    as values. Rules:
128
      - Use an empty dict if you don't need any lock
129
      - If you don't need any lock at a particular level omit that level
130
      - Don't put anything for the BGL level
131
      - If you want all locks at a level use locking.ALL_SET as a value
132

133
    If you need to share locks (rather than acquire them exclusively) at one
134
    level you can modify self.share_locks, setting a true value (usually 1) for
135
    that level. By default locks are not shared.
136

137
    Examples:
138
    # Acquire all nodes and one instance
139
    self.needed_locks = {
140
      locking.LEVEL_NODE: locking.ALL_SET,
141
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
142
    }
143
    # Acquire just two nodes
144
    self.needed_locks = {
145
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146
    }
147
    # Acquire no locks
148
    self.needed_locks = {} # No, you can't leave it to the default value None
149

150
    """
151
    # The implementation of this method is mandatory only if the new LU is
152
    # concurrent, so that old LUs don't need to be changed all at the same
153
    # time.
154
    if self.REQ_BGL:
155
      self.needed_locks = {} # Exclusive LUs don't need locks.
156
    else:
157
      raise NotImplementedError
158

    
159
  def DeclareLocks(self, level):
160
    """Declare LU locking needs for a level
161

162
    While most LUs can just declare their locking needs at ExpandNames time,
163
    sometimes there's the need to calculate some locks after having acquired
164
    the ones before. This function is called just before acquiring locks at a
165
    particular level, but after acquiring the ones at lower levels, and permits
166
    such calculations. It can be used to modify self.needed_locks, and by
167
    default it does nothing.
168

169
    This function is only called if you have something already set in
170
    self.needed_locks for the level.
171

172
    @param level: Locking level which is going to be locked
173
    @type level: member of ganeti.locking.LEVELS
174

175
    """
176

    
177
  def CheckPrereq(self):
178
    """Check prerequisites for this LU.
179

180
    This method should check that the prerequisites for the execution
181
    of this LU are fulfilled. It can do internode communication, but
182
    it should be idempotent - no cluster or system changes are
183
    allowed.
184

185
    The method should raise errors.OpPrereqError in case something is
186
    not fulfilled. Its return value is ignored.
187

188
    This method should also update all the parameters of the opcode to
189
    their canonical form if it hasn't been done by ExpandNames before.
190

191
    """
192
    raise NotImplementedError
193

    
194
  def Exec(self, feedback_fn):
195
    """Execute the LU.
196

197
    This method should implement the actual work. It should raise
198
    errors.OpExecError for failures that are somewhat dealt with in
199
    code, or expected.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def BuildHooksEnv(self):
205
    """Build hooks environment for this LU.
206

207
    This method should return a three-node tuple consisting of: a dict
208
    containing the environment that will be used for running the
209
    specific hook for this LU, a list of node names on which the hook
210
    should run before the execution, and a list of node names on which
211
    the hook should run after the execution.
212

213
    The keys of the dict must not have 'GANETI_' prefixed as this will
214
    be handled in the hooks runner. Also note additional keys will be
215
    added by the hooks runner. If the LU doesn't define any
216
    environment, an empty dict (and not None) should be returned.
217

218
    No nodes should be returned as an empty list (and not None).
219

220
    Note that if the HPATH for a LU class is None, this function will
221
    not be called.
222

223
    """
224
    raise NotImplementedError
225

    
226
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227
    """Notify the LU about the results of its hooks.
228

229
    This method is called every time a hooks phase is executed, and notifies
230
    the Logical Unit about the hooks' result. The LU can then use it to alter
231
    its result based on the hooks.  By default the method does nothing and the
232
    previous result is passed back unchanged but any LU can define it if it
233
    wants to use the local cluster hook-scripts somehow.
234

235
    Args:
236
      phase: the hooks phase that has just been run
237
      hooks_results: the results of the multi-node hooks rpc call
238
      feedback_fn: function to send feedback back to the caller
239
      lu_result: the previous result this LU had, or None in the PRE phase.
240

241
    """
242
    return lu_result
243

    
244
  def _ExpandAndLockInstance(self):
245
    """Helper function to expand and lock an instance.
246

247
    Many LUs that work on an instance take its name in self.op.instance_name
248
    and need to expand it and then declare the expanded name for locking. This
249
    function does it, and then updates self.op.instance_name to the expanded
250
    name. It also initializes needed_locks as a dict, if this hasn't been done
251
    before.
252

253
    """
254
    if self.needed_locks is None:
255
      self.needed_locks = {}
256
    else:
257
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258
        "_ExpandAndLockInstance called with instance-level locks set"
259
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260
    if expanded_name is None:
261
      raise errors.OpPrereqError("Instance '%s' not known" %
262
                                  self.op.instance_name)
263
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264
    self.op.instance_name = expanded_name
265

    
266
  def _LockInstancesNodes(self, primary_only=False):
267
    """Helper function to declare instances' nodes for locking.
268

269
    This function should be called after locking one or more instances to lock
270
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271
    with all primary or secondary nodes for instances already locked and
272
    present in self.needed_locks[locking.LEVEL_INSTANCE].
273

274
    It should be called from DeclareLocks, and for safety only works if
275
    self.recalculate_locks[locking.LEVEL_NODE] is set.
276

277
    In the future it may grow parameters to just lock some instance's nodes, or
278
    to just lock primaries or secondary nodes, if needed.
279

280
    If should be called in DeclareLocks in a way similar to:
281

282
    if level == locking.LEVEL_NODE:
283
      self._LockInstancesNodes()
284

285
    @type primary_only: boolean
286
    @param primary_only: only lock primary nodes of locked instances
287

288
    """
289
    assert locking.LEVEL_NODE in self.recalculate_locks, \
290
      "_LockInstancesNodes helper function called with no nodes to recalculate"
291

    
292
    # TODO: check if we're really been called with the instance locks held
293

    
294
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
295
    # future we might want to have different behaviors depending on the value
296
    # of self.recalculate_locks[locking.LEVEL_NODE]
297
    wanted_nodes = []
298
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
299
      instance = self.context.cfg.GetInstanceInfo(instance_name)
300
      wanted_nodes.append(instance.primary_node)
301
      if not primary_only:
302
        wanted_nodes.extend(instance.secondary_nodes)
303

    
304
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
305
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
306
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
307
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
308

    
309
    del self.recalculate_locks[locking.LEVEL_NODE]
310

    
311

    
312
class NoHooksLU(LogicalUnit):
313
  """Simple LU which runs no hooks.
314

315
  This LU is intended as a parent for other LogicalUnits which will
316
  run no hooks, in order to reduce duplicate code.
317

318
  """
319
  HPATH = None
320
  HTYPE = None
321

    
322

    
323
def _GetWantedNodes(lu, nodes):
324
  """Returns list of checked and expanded node names.
325

326
  Args:
327
    nodes: List of nodes (strings) or None for all
328

329
  """
330
  if not isinstance(nodes, list):
331
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
332

    
333
  if not nodes:
334
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
335
      " non-empty list of nodes whose name is to be expanded.")
336

    
337
  wanted = []
338
  for name in nodes:
339
    node = lu.cfg.ExpandNodeName(name)
340
    if node is None:
341
      raise errors.OpPrereqError("No such node name '%s'" % name)
342
    wanted.append(node)
343

    
344
  return utils.NiceSort(wanted)
345

    
346

    
347
def _GetWantedInstances(lu, instances):
348
  """Returns list of checked and expanded instance names.
349

350
  Args:
351
    instances: List of instances (strings) or None for all
352

353
  """
354
  if not isinstance(instances, list):
355
    raise errors.OpPrereqError("Invalid argument type 'instances'")
356

    
357
  if instances:
358
    wanted = []
359

    
360
    for name in instances:
361
      instance = lu.cfg.ExpandInstanceName(name)
362
      if instance is None:
363
        raise errors.OpPrereqError("No such instance name '%s'" % name)
364
      wanted.append(instance)
365

    
366
  else:
367
    wanted = lu.cfg.GetInstanceList()
368
  return utils.NiceSort(wanted)
369

    
370

    
371
def _CheckOutputFields(static, dynamic, selected):
372
  """Checks whether all selected fields are valid.
373

374
  Args:
375
    static: Static fields
376
    dynamic: Dynamic fields
377

378
  """
379
  static_fields = frozenset(static)
380
  dynamic_fields = frozenset(dynamic)
381

    
382
  all_fields = static_fields | dynamic_fields
383

    
384
  if not all_fields.issuperset(selected):
385
    raise errors.OpPrereqError("Unknown output fields selected: %s"
386
                               % ",".join(frozenset(selected).
387
                                          difference(all_fields)))
388

    
389

    
390
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
391
                          memory, vcpus, nics):
392
  """Builds instance related env variables for hooks from single variables.
393

394
  Args:
395
    secondary_nodes: List of secondary nodes as strings
396
  """
397
  env = {
398
    "OP_TARGET": name,
399
    "INSTANCE_NAME": name,
400
    "INSTANCE_PRIMARY": primary_node,
401
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
402
    "INSTANCE_OS_TYPE": os_type,
403
    "INSTANCE_STATUS": status,
404
    "INSTANCE_MEMORY": memory,
405
    "INSTANCE_VCPUS": vcpus,
406
  }
407

    
408
  if nics:
409
    nic_count = len(nics)
410
    for idx, (ip, bridge, mac) in enumerate(nics):
411
      if ip is None:
412
        ip = ""
413
      env["INSTANCE_NIC%d_IP" % idx] = ip
414
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
415
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
416
  else:
417
    nic_count = 0
418

    
419
  env["INSTANCE_NIC_COUNT"] = nic_count
420

    
421
  return env
422

    
423

    
424
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
425
  """Builds instance related env variables for hooks from an object.
426

427
  Args:
428
    instance: objects.Instance object of instance
429
    override: dict of values to override
430
  """
431
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
432
  args = {
433
    'name': instance.name,
434
    'primary_node': instance.primary_node,
435
    'secondary_nodes': instance.secondary_nodes,
436
    'os_type': instance.os,
437
    'status': instance.os,
438
    'memory': bep[constants.BE_MEMORY],
439
    'vcpus': bep[constants.BE_VCPUS],
440
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441
  }
442
  if override:
443
    args.update(override)
444
  return _BuildInstanceHookEnv(**args)
445

    
446

    
447
def _CheckInstanceBridgesExist(lu, instance):
448
  """Check that the brigdes needed by an instance exist.
449

450
  """
451
  # check bridges existance
452
  brlist = [nic.bridge for nic in instance.nics]
453
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
454
    raise errors.OpPrereqError("one or more target bridges %s does not"
455
                               " exist on destination node '%s'" %
456
                               (brlist, instance.primary_node))
457

    
458

    
459
class LUDestroyCluster(NoHooksLU):
460
  """Logical unit for destroying the cluster.
461

462
  """
463
  _OP_REQP = []
464

    
465
  def CheckPrereq(self):
466
    """Check prerequisites.
467

468
    This checks whether the cluster is empty.
469

470
    Any errors are signalled by raising errors.OpPrereqError.
471

472
    """
473
    master = self.cfg.GetMasterNode()
474

    
475
    nodelist = self.cfg.GetNodeList()
476
    if len(nodelist) != 1 or nodelist[0] != master:
477
      raise errors.OpPrereqError("There are still %d node(s) in"
478
                                 " this cluster." % (len(nodelist) - 1))
479
    instancelist = self.cfg.GetInstanceList()
480
    if instancelist:
481
      raise errors.OpPrereqError("There are still %d instance(s) in"
482
                                 " this cluster." % len(instancelist))
483

    
484
  def Exec(self, feedback_fn):
485
    """Destroys the cluster.
486

487
    """
488
    master = self.cfg.GetMasterNode()
489
    if not self.rpc.call_node_stop_master(master, False):
490
      raise errors.OpExecError("Could not disable the master role")
491
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492
    utils.CreateBackup(priv_key)
493
    utils.CreateBackup(pub_key)
494
    return master
495

    
496

    
497
class LUVerifyCluster(LogicalUnit):
498
  """Verifies the cluster status.
499

500
  """
501
  HPATH = "cluster-verify"
502
  HTYPE = constants.HTYPE_CLUSTER
503
  _OP_REQP = ["skip_checks"]
504
  REQ_BGL = False
505

    
506
  def ExpandNames(self):
507
    self.needed_locks = {
508
      locking.LEVEL_NODE: locking.ALL_SET,
509
      locking.LEVEL_INSTANCE: locking.ALL_SET,
510
    }
511
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512

    
513
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514
                  remote_version, feedback_fn):
515
    """Run multiple tests against a node.
516

517
    Test list:
518
      - compares ganeti version
519
      - checks vg existance and size > 20G
520
      - checks config file checksum
521
      - checks ssh to other nodes
522

523
    Args:
524
      node: name of the node to check
525
      file_list: required list of files
526
      local_cksum: dictionary of local files and their checksums
527

528
    """
529
    # compares ganeti version
530
    local_version = constants.PROTOCOL_VERSION
531
    if not remote_version:
532
      feedback_fn("  - ERROR: connection to %s failed" % (node))
533
      return True
534

    
535
    if local_version != remote_version:
536
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537
                      (local_version, node, remote_version))
538
      return True
539

    
540
    # checks vg existance and size > 20G
541

    
542
    bad = False
543
    if not vglist:
544
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545
                      (node,))
546
      bad = True
547
    else:
548
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549
                                            constants.MIN_VG_SIZE)
550
      if vgstatus:
551
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552
        bad = True
553

    
554
    if not node_result:
555
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
556
      return True
557

    
558
    # checks config file checksum
559
    # checks ssh to any
560

    
561
    if 'filelist' not in node_result:
562
      bad = True
563
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
564
    else:
565
      remote_cksum = node_result['filelist']
566
      for file_name in file_list:
567
        if file_name not in remote_cksum:
568
          bad = True
569
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
570
        elif remote_cksum[file_name] != local_cksum[file_name]:
571
          bad = True
572
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
573

    
574
    if 'nodelist' not in node_result:
575
      bad = True
576
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
577
    else:
578
      if node_result['nodelist']:
579
        bad = True
580
        for node in node_result['nodelist']:
581
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
582
                          (node, node_result['nodelist'][node]))
583
    if 'node-net-test' not in node_result:
584
      bad = True
585
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
586
    else:
587
      if node_result['node-net-test']:
588
        bad = True
589
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
590
        for node in nlist:
591
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
592
                          (node, node_result['node-net-test'][node]))
593

    
594
    hyp_result = node_result.get('hypervisor', None)
595
    if isinstance(hyp_result, dict):
596
      for hv_name, hv_result in hyp_result.iteritems():
597
        if hv_result is not None:
598
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
599
                      (hv_name, hv_result))
600
    return bad
601

    
602
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
603
                      node_instance, feedback_fn):
604
    """Verify an instance.
605

606
    This function checks to see if the required block devices are
607
    available on the instance's node.
608

609
    """
610
    bad = False
611

    
612
    node_current = instanceconfig.primary_node
613

    
614
    node_vol_should = {}
615
    instanceconfig.MapLVsByNode(node_vol_should)
616

    
617
    for node in node_vol_should:
618
      for volume in node_vol_should[node]:
619
        if node not in node_vol_is or volume not in node_vol_is[node]:
620
          feedback_fn("  - ERROR: volume %s missing on node %s" %
621
                          (volume, node))
622
          bad = True
623

    
624
    if not instanceconfig.status == 'down':
625
      if (node_current not in node_instance or
626
          not instance in node_instance[node_current]):
627
        feedback_fn("  - ERROR: instance %s not running on node %s" %
628
                        (instance, node_current))
629
        bad = True
630

    
631
    for node in node_instance:
632
      if (not node == node_current):
633
        if instance in node_instance[node]:
634
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
635
                          (instance, node))
636
          bad = True
637

    
638
    return bad
639

    
640
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
641
    """Verify if there are any unknown volumes in the cluster.
642

643
    The .os, .swap and backup volumes are ignored. All other volumes are
644
    reported as unknown.
645

646
    """
647
    bad = False
648

    
649
    for node in node_vol_is:
650
      for volume in node_vol_is[node]:
651
        if node not in node_vol_should or volume not in node_vol_should[node]:
652
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
653
                      (volume, node))
654
          bad = True
655
    return bad
656

    
657
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
658
    """Verify the list of running instances.
659

660
    This checks what instances are running but unknown to the cluster.
661

662
    """
663
    bad = False
664
    for node in node_instance:
665
      for runninginstance in node_instance[node]:
666
        if runninginstance not in instancelist:
667
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
668
                          (runninginstance, node))
669
          bad = True
670
    return bad
671

    
672
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
673
    """Verify N+1 Memory Resilience.
674

675
    Check that if one single node dies we can still start all the instances it
676
    was primary for.
677

678
    """
679
    bad = False
680

    
681
    for node, nodeinfo in node_info.iteritems():
682
      # This code checks that every node which is now listed as secondary has
683
      # enough memory to host all instances it is supposed to should a single
684
      # other node in the cluster fail.
685
      # FIXME: not ready for failover to an arbitrary node
686
      # FIXME: does not support file-backed instances
687
      # WARNING: we currently take into account down instances as well as up
688
      # ones, considering that even if they're down someone might want to start
689
      # them even in the event of a node failure.
690
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
691
        needed_mem = 0
692
        for instance in instances:
693
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
694
          if bep[constants.BE_AUTO_BALANCE]:
695
            needed_mem += bep[constants.BE_MEMORY]
696
        if nodeinfo['mfree'] < needed_mem:
697
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
698
                      " failovers should node %s fail" % (node, prinode))
699
          bad = True
700
    return bad
701

    
702
  def CheckPrereq(self):
703
    """Check prerequisites.
704

705
    Transform the list of checks we're going to skip into a set and check that
706
    all its members are valid.
707

708
    """
709
    self.skip_set = frozenset(self.op.skip_checks)
710
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
711
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
712

    
713
  def BuildHooksEnv(self):
714
    """Build hooks env.
715

716
    Cluster-Verify hooks just rone in the post phase and their failure makes
717
    the output be logged in the verify output and the verification to fail.
718

719
    """
720
    all_nodes = self.cfg.GetNodeList()
721
    # TODO: populate the environment with useful information for verify hooks
722
    env = {}
723
    return env, [], all_nodes
724

    
725
  def Exec(self, feedback_fn):
726
    """Verify integrity of cluster, performing various test on nodes.
727

728
    """
729
    bad = False
730
    feedback_fn("* Verifying global settings")
731
    for msg in self.cfg.VerifyConfig():
732
      feedback_fn("  - ERROR: %s" % msg)
733

    
734
    vg_name = self.cfg.GetVGName()
735
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
736
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
737
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
738
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
739
    i_non_redundant = [] # Non redundant instances
740
    i_non_a_balanced = [] # Non auto-balanced instances
741
    node_volume = {}
742
    node_instance = {}
743
    node_info = {}
744
    instance_cfg = {}
745

    
746
    # FIXME: verify OS list
747
    # do local checksums
748
    file_names = []
749
    file_names.append(constants.SSL_CERT_FILE)
750
    file_names.append(constants.CLUSTER_CONF_FILE)
751
    local_checksums = utils.FingerprintFiles(file_names)
752

    
753
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
754
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
755
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
756
    all_vglist = self.rpc.call_vg_list(nodelist)
757
    node_verify_param = {
758
      'filelist': file_names,
759
      'nodelist': nodelist,
760
      'hypervisor': hypervisors,
761
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
762
                        for node in nodeinfo]
763
      }
764
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
765
                                           self.cfg.GetClusterName())
766
    all_rversion = self.rpc.call_version(nodelist)
767
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
768
                                        self.cfg.GetHypervisorType())
769

    
770
    cluster = self.cfg.GetClusterInfo()
771
    for node in nodelist:
772
      feedback_fn("* Verifying node %s" % node)
773
      result = self._VerifyNode(node, file_names, local_checksums,
774
                                all_vglist[node], all_nvinfo[node],
775
                                all_rversion[node], feedback_fn)
776
      bad = bad or result
777

    
778
      # node_volume
779
      volumeinfo = all_volumeinfo[node]
780

    
781
      if isinstance(volumeinfo, basestring):
782
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
783
                    (node, volumeinfo[-400:].encode('string_escape')))
784
        bad = True
785
        node_volume[node] = {}
786
      elif not isinstance(volumeinfo, dict):
787
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
788
        bad = True
789
        continue
790
      else:
791
        node_volume[node] = volumeinfo
792

    
793
      # node_instance
794
      nodeinstance = all_instanceinfo[node]
795
      if type(nodeinstance) != list:
796
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
797
        bad = True
798
        continue
799

    
800
      node_instance[node] = nodeinstance
801

    
802
      # node_info
803
      nodeinfo = all_ninfo[node]
804
      if not isinstance(nodeinfo, dict):
805
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
806
        bad = True
807
        continue
808

    
809
      try:
810
        node_info[node] = {
811
          "mfree": int(nodeinfo['memory_free']),
812
          "dfree": int(nodeinfo['vg_free']),
813
          "pinst": [],
814
          "sinst": [],
815
          # dictionary holding all instances this node is secondary for,
816
          # grouped by their primary node. Each key is a cluster node, and each
817
          # value is a list of instances which have the key as primary and the
818
          # current node as secondary.  this is handy to calculate N+1 memory
819
          # availability if you can only failover from a primary to its
820
          # secondary.
821
          "sinst-by-pnode": {},
822
        }
823
      except ValueError:
824
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
825
        bad = True
826
        continue
827

    
828
    node_vol_should = {}
829

    
830
    for instance in instancelist:
831
      feedback_fn("* Verifying instance %s" % instance)
832
      inst_config = self.cfg.GetInstanceInfo(instance)
833
      result =  self._VerifyInstance(instance, inst_config, node_volume,
834
                                     node_instance, feedback_fn)
835
      bad = bad or result
836

    
837
      inst_config.MapLVsByNode(node_vol_should)
838

    
839
      instance_cfg[instance] = inst_config
840

    
841
      pnode = inst_config.primary_node
842
      if pnode in node_info:
843
        node_info[pnode]['pinst'].append(instance)
844
      else:
845
        feedback_fn("  - ERROR: instance %s, connection to primary node"
846
                    " %s failed" % (instance, pnode))
847
        bad = True
848

    
849
      # If the instance is non-redundant we cannot survive losing its primary
850
      # node, so we are not N+1 compliant. On the other hand we have no disk
851
      # templates with more than one secondary so that situation is not well
852
      # supported either.
853
      # FIXME: does not support file-backed instances
854
      if len(inst_config.secondary_nodes) == 0:
855
        i_non_redundant.append(instance)
856
      elif len(inst_config.secondary_nodes) > 1:
857
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
858
                    % instance)
859

    
860
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
861
        i_non_a_balanced.append(instance)
862

    
863
      for snode in inst_config.secondary_nodes:
864
        if snode in node_info:
865
          node_info[snode]['sinst'].append(instance)
866
          if pnode not in node_info[snode]['sinst-by-pnode']:
867
            node_info[snode]['sinst-by-pnode'][pnode] = []
868
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
869
        else:
870
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
871
                      " %s failed" % (instance, snode))
872

    
873
    feedback_fn("* Verifying orphan volumes")
874
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
875
                                       feedback_fn)
876
    bad = bad or result
877

    
878
    feedback_fn("* Verifying remaining instances")
879
    result = self._VerifyOrphanInstances(instancelist, node_instance,
880
                                         feedback_fn)
881
    bad = bad or result
882

    
883
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
884
      feedback_fn("* Verifying N+1 Memory redundancy")
885
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
886
      bad = bad or result
887

    
888
    feedback_fn("* Other Notes")
889
    if i_non_redundant:
890
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
891
                  % len(i_non_redundant))
892

    
893
    if i_non_a_balanced:
894
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
895
                  % len(i_non_a_balanced))
896

    
897
    return not bad
898

    
899
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
900
    """Analize the post-hooks' result, handle it, and send some
901
    nicely-formatted feedback back to the user.
902

903
    Args:
904
      phase: the hooks phase that has just been run
905
      hooks_results: the results of the multi-node hooks rpc call
906
      feedback_fn: function to send feedback back to the caller
907
      lu_result: previous Exec result
908

909
    """
910
    # We only really run POST phase hooks, and are only interested in
911
    # their results
912
    if phase == constants.HOOKS_PHASE_POST:
913
      # Used to change hooks' output to proper indentation
914
      indent_re = re.compile('^', re.M)
915
      feedback_fn("* Hooks Results")
916
      if not hooks_results:
917
        feedback_fn("  - ERROR: general communication failure")
918
        lu_result = 1
919
      else:
920
        for node_name in hooks_results:
921
          show_node_header = True
922
          res = hooks_results[node_name]
923
          if res is False or not isinstance(res, list):
924
            feedback_fn("    Communication failure")
925
            lu_result = 1
926
            continue
927
          for script, hkr, output in res:
928
            if hkr == constants.HKR_FAIL:
929
              # The node header is only shown once, if there are
930
              # failing hooks on that node
931
              if show_node_header:
932
                feedback_fn("  Node %s:" % node_name)
933
                show_node_header = False
934
              feedback_fn("    ERROR: Script %s failed, output:" % script)
935
              output = indent_re.sub('      ', output)
936
              feedback_fn("%s" % output)
937
              lu_result = 1
938

    
939
      return lu_result
940

    
941

    
942
class LUVerifyDisks(NoHooksLU):
943
  """Verifies the cluster disks status.
944

945
  """
946
  _OP_REQP = []
947
  REQ_BGL = False
948

    
949
  def ExpandNames(self):
950
    self.needed_locks = {
951
      locking.LEVEL_NODE: locking.ALL_SET,
952
      locking.LEVEL_INSTANCE: locking.ALL_SET,
953
    }
954
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
955

    
956
  def CheckPrereq(self):
957
    """Check prerequisites.
958

959
    This has no prerequisites.
960

961
    """
962
    pass
963

    
964
  def Exec(self, feedback_fn):
965
    """Verify integrity of cluster disks.
966

967
    """
968
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
969

    
970
    vg_name = self.cfg.GetVGName()
971
    nodes = utils.NiceSort(self.cfg.GetNodeList())
972
    instances = [self.cfg.GetInstanceInfo(name)
973
                 for name in self.cfg.GetInstanceList()]
974

    
975
    nv_dict = {}
976
    for inst in instances:
977
      inst_lvs = {}
978
      if (inst.status != "up" or
979
          inst.disk_template not in constants.DTS_NET_MIRROR):
980
        continue
981
      inst.MapLVsByNode(inst_lvs)
982
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
983
      for node, vol_list in inst_lvs.iteritems():
984
        for vol in vol_list:
985
          nv_dict[(node, vol)] = inst
986

    
987
    if not nv_dict:
988
      return result
989

    
990
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
991

    
992
    to_act = set()
993
    for node in nodes:
994
      # node_volume
995
      lvs = node_lvs[node]
996

    
997
      if isinstance(lvs, basestring):
998
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
999
        res_nlvm[node] = lvs
1000
      elif not isinstance(lvs, dict):
1001
        logger.Info("connection to node %s failed or invalid data returned" %
1002
                    (node,))
1003
        res_nodes.append(node)
1004
        continue
1005

    
1006
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1007
        inst = nv_dict.pop((node, lv_name), None)
1008
        if (not lv_online and inst is not None
1009
            and inst.name not in res_instances):
1010
          res_instances.append(inst.name)
1011

    
1012
    # any leftover items in nv_dict are missing LVs, let's arrange the
1013
    # data better
1014
    for key, inst in nv_dict.iteritems():
1015
      if inst.name not in res_missing:
1016
        res_missing[inst.name] = []
1017
      res_missing[inst.name].append(key)
1018

    
1019
    return result
1020

    
1021

    
1022
class LURenameCluster(LogicalUnit):
1023
  """Rename the cluster.
1024

1025
  """
1026
  HPATH = "cluster-rename"
1027
  HTYPE = constants.HTYPE_CLUSTER
1028
  _OP_REQP = ["name"]
1029

    
1030
  def BuildHooksEnv(self):
1031
    """Build hooks env.
1032

1033
    """
1034
    env = {
1035
      "OP_TARGET": self.cfg.GetClusterName(),
1036
      "NEW_NAME": self.op.name,
1037
      }
1038
    mn = self.cfg.GetMasterNode()
1039
    return env, [mn], [mn]
1040

    
1041
  def CheckPrereq(self):
1042
    """Verify that the passed name is a valid one.
1043

1044
    """
1045
    hostname = utils.HostInfo(self.op.name)
1046

    
1047
    new_name = hostname.name
1048
    self.ip = new_ip = hostname.ip
1049
    old_name = self.cfg.GetClusterName()
1050
    old_ip = self.cfg.GetMasterIP()
1051
    if new_name == old_name and new_ip == old_ip:
1052
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1053
                                 " cluster has changed")
1054
    if new_ip != old_ip:
1055
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1056
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1057
                                   " reachable on the network. Aborting." %
1058
                                   new_ip)
1059

    
1060
    self.op.name = new_name
1061

    
1062
  def Exec(self, feedback_fn):
1063
    """Rename the cluster.
1064

1065
    """
1066
    clustername = self.op.name
1067
    ip = self.ip
1068

    
1069
    # shutdown the master IP
1070
    master = self.cfg.GetMasterNode()
1071
    if not self.rpc.call_node_stop_master(master, False):
1072
      raise errors.OpExecError("Could not disable the master role")
1073

    
1074
    try:
1075
      # modify the sstore
1076
      # TODO: sstore
1077
      ss.SetKey(ss.SS_MASTER_IP, ip)
1078
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1079

    
1080
      # Distribute updated ss config to all nodes
1081
      myself = self.cfg.GetNodeInfo(master)
1082
      dist_nodes = self.cfg.GetNodeList()
1083
      if myself.name in dist_nodes:
1084
        dist_nodes.remove(myself.name)
1085

    
1086
      logger.Debug("Copying updated ssconf data to all nodes")
1087
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1088
        fname = ss.KeyToFilename(keyname)
1089
        result = self.rpc.call_upload_file(dist_nodes, fname)
1090
        for to_node in dist_nodes:
1091
          if not result[to_node]:
1092
            logger.Error("copy of file %s to node %s failed" %
1093
                         (fname, to_node))
1094
    finally:
1095
      if not self.rpc.call_node_start_master(master, False):
1096
        logger.Error("Could not re-enable the master role on the master,"
1097
                     " please restart manually.")
1098

    
1099

    
1100
def _RecursiveCheckIfLVMBased(disk):
1101
  """Check if the given disk or its children are lvm-based.
1102

1103
  Args:
1104
    disk: ganeti.objects.Disk object
1105

1106
  Returns:
1107
    boolean indicating whether a LD_LV dev_type was found or not
1108

1109
  """
1110
  if disk.children:
1111
    for chdisk in disk.children:
1112
      if _RecursiveCheckIfLVMBased(chdisk):
1113
        return True
1114
  return disk.dev_type == constants.LD_LV
1115

    
1116

    
1117
class LUSetClusterParams(LogicalUnit):
1118
  """Change the parameters of the cluster.
1119

1120
  """
1121
  HPATH = "cluster-modify"
1122
  HTYPE = constants.HTYPE_CLUSTER
1123
  _OP_REQP = []
1124
  REQ_BGL = False
1125

    
1126
  def ExpandNames(self):
1127
    # FIXME: in the future maybe other cluster params won't require checking on
1128
    # all nodes to be modified.
1129
    self.needed_locks = {
1130
      locking.LEVEL_NODE: locking.ALL_SET,
1131
    }
1132
    self.share_locks[locking.LEVEL_NODE] = 1
1133

    
1134
  def BuildHooksEnv(self):
1135
    """Build hooks env.
1136

1137
    """
1138
    env = {
1139
      "OP_TARGET": self.cfg.GetClusterName(),
1140
      "NEW_VG_NAME": self.op.vg_name,
1141
      }
1142
    mn = self.cfg.GetMasterNode()
1143
    return env, [mn], [mn]
1144

    
1145
  def CheckPrereq(self):
1146
    """Check prerequisites.
1147

1148
    This checks whether the given params don't conflict and
1149
    if the given volume group is valid.
1150

1151
    """
1152
    # FIXME: This only works because there is only one parameter that can be
1153
    # changed or removed.
1154
    if self.op.vg_name is not None and not self.op.vg_name:
1155
      instances = self.cfg.GetAllInstancesInfo().values()
1156
      for inst in instances:
1157
        for disk in inst.disks:
1158
          if _RecursiveCheckIfLVMBased(disk):
1159
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1160
                                       " lvm-based instances exist")
1161

    
1162
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1163

    
1164
    # if vg_name not None, checks given volume group on all nodes
1165
    if self.op.vg_name:
1166
      vglist = self.rpc.call_vg_list(node_list)
1167
      for node in node_list:
1168
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1169
                                              constants.MIN_VG_SIZE)
1170
        if vgstatus:
1171
          raise errors.OpPrereqError("Error on node '%s': %s" %
1172
                                     (node, vgstatus))
1173

    
1174
    self.cluster = cluster = self.cfg.GetClusterInfo()
1175
    # beparams changes do not need validation (we can't validate?),
1176
    # but we still process here
1177
    if self.op.beparams:
1178
      self.new_beparams = cluster.FillDict(
1179
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1180

    
1181
    # hypervisor list/parameters
1182
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1183
    if self.op.hvparams:
1184
      if not isinstance(self.op.hvparams, dict):
1185
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1186
      for hv_name, hv_dict in self.op.hvparams.items():
1187
        if hv_name not in self.new_hvparams:
1188
          self.new_hvparams[hv_name] = hv_dict
1189
        else:
1190
          self.new_hvparams[hv_name].update(hv_dict)
1191

    
1192
    if self.op.enabled_hypervisors is not None:
1193
      self.hv_list = self.op.enabled_hypervisors
1194
    else:
1195
      self.hv_list = cluster.enabled_hypervisors
1196

    
1197
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1198
      # either the enabled list has changed, or the parameters have, validate
1199
      for hv_name, hv_params in self.new_hvparams.items():
1200
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1201
            (self.op.enabled_hypervisors and
1202
             hv_name in self.op.enabled_hypervisors)):
1203
          # either this is a new hypervisor, or its parameters have changed
1204
          hv_class = hypervisor.GetHypervisor(hv_name)
1205
          hv_class.CheckParameterSyntax(hv_params)
1206
          _CheckHVParams(self, node_list, hv_name, hv_params)
1207

    
1208
  def Exec(self, feedback_fn):
1209
    """Change the parameters of the cluster.
1210

1211
    """
1212
    if self.op.vg_name is not None:
1213
      if self.op.vg_name != self.cfg.GetVGName():
1214
        self.cfg.SetVGName(self.op.vg_name)
1215
      else:
1216
        feedback_fn("Cluster LVM configuration already in desired"
1217
                    " state, not changing")
1218
    if self.op.hvparams:
1219
      self.cluster.hvparams = self.new_hvparams
1220
    if self.op.enabled_hypervisors is not None:
1221
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1222
    if self.op.beparams:
1223
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1224
    self.cfg.Update(self.cluster)
1225

    
1226

    
1227
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1228
  """Sleep and poll for an instance's disk to sync.
1229

1230
  """
1231
  if not instance.disks:
1232
    return True
1233

    
1234
  if not oneshot:
1235
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1236

    
1237
  node = instance.primary_node
1238

    
1239
  for dev in instance.disks:
1240
    lu.cfg.SetDiskID(dev, node)
1241

    
1242
  retries = 0
1243
  while True:
1244
    max_time = 0
1245
    done = True
1246
    cumul_degraded = False
1247
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1248
    if not rstats:
1249
      lu.proc.LogWarning("Can't get any data from node %s" % node)
1250
      retries += 1
1251
      if retries >= 10:
1252
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1253
                                 " aborting." % node)
1254
      time.sleep(6)
1255
      continue
1256
    retries = 0
1257
    for i in range(len(rstats)):
1258
      mstat = rstats[i]
1259
      if mstat is None:
1260
        lu.proc.LogWarning("Can't compute data for node %s/%s" %
1261
                           (node, instance.disks[i].iv_name))
1262
        continue
1263
      # we ignore the ldisk parameter
1264
      perc_done, est_time, is_degraded, _ = mstat
1265
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1266
      if perc_done is not None:
1267
        done = False
1268
        if est_time is not None:
1269
          rem_time = "%d estimated seconds remaining" % est_time
1270
          max_time = est_time
1271
        else:
1272
          rem_time = "no time estimate"
1273
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1274
                        (instance.disks[i].iv_name, perc_done, rem_time))
1275
    if done or oneshot:
1276
      break
1277

    
1278
    time.sleep(min(60, max_time))
1279

    
1280
  if done:
1281
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1282
  return not cumul_degraded
1283

    
1284

    
1285
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1286
  """Check that mirrors are not degraded.
1287

1288
  The ldisk parameter, if True, will change the test from the
1289
  is_degraded attribute (which represents overall non-ok status for
1290
  the device(s)) to the ldisk (representing the local storage status).
1291

1292
  """
1293
  lu.cfg.SetDiskID(dev, node)
1294
  if ldisk:
1295
    idx = 6
1296
  else:
1297
    idx = 5
1298

    
1299
  result = True
1300
  if on_primary or dev.AssembleOnSecondary():
1301
    rstats = lu.rpc.call_blockdev_find(node, dev)
1302
    if not rstats:
1303
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1304
      result = False
1305
    else:
1306
      result = result and (not rstats[idx])
1307
  if dev.children:
1308
    for child in dev.children:
1309
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1310

    
1311
  return result
1312

    
1313

    
1314
class LUDiagnoseOS(NoHooksLU):
1315
  """Logical unit for OS diagnose/query.
1316

1317
  """
1318
  _OP_REQP = ["output_fields", "names"]
1319
  REQ_BGL = False
1320

    
1321
  def ExpandNames(self):
1322
    if self.op.names:
1323
      raise errors.OpPrereqError("Selective OS query not supported")
1324

    
1325
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1326
    _CheckOutputFields(static=[],
1327
                       dynamic=self.dynamic_fields,
1328
                       selected=self.op.output_fields)
1329

    
1330
    # Lock all nodes, in shared mode
1331
    self.needed_locks = {}
1332
    self.share_locks[locking.LEVEL_NODE] = 1
1333
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1334

    
1335
  def CheckPrereq(self):
1336
    """Check prerequisites.
1337

1338
    """
1339

    
1340
  @staticmethod
1341
  def _DiagnoseByOS(node_list, rlist):
1342
    """Remaps a per-node return list into an a per-os per-node dictionary
1343

1344
      Args:
1345
        node_list: a list with the names of all nodes
1346
        rlist: a map with node names as keys and OS objects as values
1347

1348
      Returns:
1349
        map: a map with osnames as keys and as value another map, with
1350
             nodes as
1351
             keys and list of OS objects as values
1352
             e.g. {"debian-etch": {"node1": [<object>,...],
1353
                                   "node2": [<object>,]}
1354
                  }
1355

1356
    """
1357
    all_os = {}
1358
    for node_name, nr in rlist.iteritems():
1359
      if not nr:
1360
        continue
1361
      for os_obj in nr:
1362
        if os_obj.name not in all_os:
1363
          # build a list of nodes for this os containing empty lists
1364
          # for each node in node_list
1365
          all_os[os_obj.name] = {}
1366
          for nname in node_list:
1367
            all_os[os_obj.name][nname] = []
1368
        all_os[os_obj.name][node_name].append(os_obj)
1369
    return all_os
1370

    
1371
  def Exec(self, feedback_fn):
1372
    """Compute the list of OSes.
1373

1374
    """
1375
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1376
    node_data = self.rpc.call_os_diagnose(node_list)
1377
    if node_data == False:
1378
      raise errors.OpExecError("Can't gather the list of OSes")
1379
    pol = self._DiagnoseByOS(node_list, node_data)
1380
    output = []
1381
    for os_name, os_data in pol.iteritems():
1382
      row = []
1383
      for field in self.op.output_fields:
1384
        if field == "name":
1385
          val = os_name
1386
        elif field == "valid":
1387
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1388
        elif field == "node_status":
1389
          val = {}
1390
          for node_name, nos_list in os_data.iteritems():
1391
            val[node_name] = [(v.status, v.path) for v in nos_list]
1392
        else:
1393
          raise errors.ParameterError(field)
1394
        row.append(val)
1395
      output.append(row)
1396

    
1397
    return output
1398

    
1399

    
1400
class LURemoveNode(LogicalUnit):
1401
  """Logical unit for removing a node.
1402

1403
  """
1404
  HPATH = "node-remove"
1405
  HTYPE = constants.HTYPE_NODE
1406
  _OP_REQP = ["node_name"]
1407

    
1408
  def BuildHooksEnv(self):
1409
    """Build hooks env.
1410

1411
    This doesn't run on the target node in the pre phase as a failed
1412
    node would then be impossible to remove.
1413

1414
    """
1415
    env = {
1416
      "OP_TARGET": self.op.node_name,
1417
      "NODE_NAME": self.op.node_name,
1418
      }
1419
    all_nodes = self.cfg.GetNodeList()
1420
    all_nodes.remove(self.op.node_name)
1421
    return env, all_nodes, all_nodes
1422

    
1423
  def CheckPrereq(self):
1424
    """Check prerequisites.
1425

1426
    This checks:
1427
     - the node exists in the configuration
1428
     - it does not have primary or secondary instances
1429
     - it's not the master
1430

1431
    Any errors are signalled by raising errors.OpPrereqError.
1432

1433
    """
1434
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1435
    if node is None:
1436
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1437

    
1438
    instance_list = self.cfg.GetInstanceList()
1439

    
1440
    masternode = self.cfg.GetMasterNode()
1441
    if node.name == masternode:
1442
      raise errors.OpPrereqError("Node is the master node,"
1443
                                 " you need to failover first.")
1444

    
1445
    for instance_name in instance_list:
1446
      instance = self.cfg.GetInstanceInfo(instance_name)
1447
      if node.name == instance.primary_node:
1448
        raise errors.OpPrereqError("Instance %s still running on the node,"
1449
                                   " please remove first." % instance_name)
1450
      if node.name in instance.secondary_nodes:
1451
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1452
                                   " please remove first." % instance_name)
1453
    self.op.node_name = node.name
1454
    self.node = node
1455

    
1456
  def Exec(self, feedback_fn):
1457
    """Removes the node from the cluster.
1458

1459
    """
1460
    node = self.node
1461
    logger.Info("stopping the node daemon and removing configs from node %s" %
1462
                node.name)
1463

    
1464
    self.context.RemoveNode(node.name)
1465

    
1466
    self.rpc.call_node_leave_cluster(node.name)
1467

    
1468

    
1469
class LUQueryNodes(NoHooksLU):
1470
  """Logical unit for querying nodes.
1471

1472
  """
1473
  _OP_REQP = ["output_fields", "names"]
1474
  REQ_BGL = False
1475

    
1476
  def ExpandNames(self):
1477
    self.dynamic_fields = frozenset([
1478
      "dtotal", "dfree",
1479
      "mtotal", "mnode", "mfree",
1480
      "bootid",
1481
      "ctotal",
1482
      ])
1483

    
1484
    self.static_fields = frozenset([
1485
      "name", "pinst_cnt", "sinst_cnt",
1486
      "pinst_list", "sinst_list",
1487
      "pip", "sip", "tags",
1488
      "serial_no",
1489
      ])
1490

    
1491
    _CheckOutputFields(static=self.static_fields,
1492
                       dynamic=self.dynamic_fields,
1493
                       selected=self.op.output_fields)
1494

    
1495
    self.needed_locks = {}
1496
    self.share_locks[locking.LEVEL_NODE] = 1
1497

    
1498
    if self.op.names:
1499
      self.wanted = _GetWantedNodes(self, self.op.names)
1500
    else:
1501
      self.wanted = locking.ALL_SET
1502

    
1503
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1504
    if self.do_locking:
1505
      # if we don't request only static fields, we need to lock the nodes
1506
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1507

    
1508

    
1509
  def CheckPrereq(self):
1510
    """Check prerequisites.
1511

1512
    """
1513
    # The validation of the node list is done in the _GetWantedNodes,
1514
    # if non empty, and if empty, there's no validation to do
1515
    pass
1516

    
1517
  def Exec(self, feedback_fn):
1518
    """Computes the list of nodes and their attributes.
1519

1520
    """
1521
    all_info = self.cfg.GetAllNodesInfo()
1522
    if self.do_locking:
1523
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1524
    elif self.wanted != locking.ALL_SET:
1525
      nodenames = self.wanted
1526
      missing = set(nodenames).difference(all_info.keys())
1527
      if missing:
1528
        raise errors.OpExecError(
1529
          "Some nodes were removed before retrieving their data: %s" % missing)
1530
    else:
1531
      nodenames = all_info.keys()
1532

    
1533
    nodenames = utils.NiceSort(nodenames)
1534
    nodelist = [all_info[name] for name in nodenames]
1535

    
1536
    # begin data gathering
1537

    
1538
    if self.dynamic_fields.intersection(self.op.output_fields):
1539
      live_data = {}
1540
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1541
                                          self.cfg.GetHypervisorType())
1542
      for name in nodenames:
1543
        nodeinfo = node_data.get(name, None)
1544
        if nodeinfo:
1545
          live_data[name] = {
1546
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1547
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1548
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1549
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1550
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1551
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1552
            "bootid": nodeinfo['bootid'],
1553
            }
1554
        else:
1555
          live_data[name] = {}
1556
    else:
1557
      live_data = dict.fromkeys(nodenames, {})
1558

    
1559
    node_to_primary = dict([(name, set()) for name in nodenames])
1560
    node_to_secondary = dict([(name, set()) for name in nodenames])
1561

    
1562
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1563
                             "sinst_cnt", "sinst_list"))
1564
    if inst_fields & frozenset(self.op.output_fields):
1565
      instancelist = self.cfg.GetInstanceList()
1566

    
1567
      for instance_name in instancelist:
1568
        inst = self.cfg.GetInstanceInfo(instance_name)
1569
        if inst.primary_node in node_to_primary:
1570
          node_to_primary[inst.primary_node].add(inst.name)
1571
        for secnode in inst.secondary_nodes:
1572
          if secnode in node_to_secondary:
1573
            node_to_secondary[secnode].add(inst.name)
1574

    
1575
    # end data gathering
1576

    
1577
    output = []
1578
    for node in nodelist:
1579
      node_output = []
1580
      for field in self.op.output_fields:
1581
        if field == "name":
1582
          val = node.name
1583
        elif field == "pinst_list":
1584
          val = list(node_to_primary[node.name])
1585
        elif field == "sinst_list":
1586
          val = list(node_to_secondary[node.name])
1587
        elif field == "pinst_cnt":
1588
          val = len(node_to_primary[node.name])
1589
        elif field == "sinst_cnt":
1590
          val = len(node_to_secondary[node.name])
1591
        elif field == "pip":
1592
          val = node.primary_ip
1593
        elif field == "sip":
1594
          val = node.secondary_ip
1595
        elif field == "tags":
1596
          val = list(node.GetTags())
1597
        elif field == "serial_no":
1598
          val = node.serial_no
1599
        elif field in self.dynamic_fields:
1600
          val = live_data[node.name].get(field, None)
1601
        else:
1602
          raise errors.ParameterError(field)
1603
        node_output.append(val)
1604
      output.append(node_output)
1605

    
1606
    return output
1607

    
1608

    
1609
class LUQueryNodeVolumes(NoHooksLU):
1610
  """Logical unit for getting volumes on node(s).
1611

1612
  """
1613
  _OP_REQP = ["nodes", "output_fields"]
1614
  REQ_BGL = False
1615

    
1616
  def ExpandNames(self):
1617
    _CheckOutputFields(static=["node"],
1618
                       dynamic=["phys", "vg", "name", "size", "instance"],
1619
                       selected=self.op.output_fields)
1620

    
1621
    self.needed_locks = {}
1622
    self.share_locks[locking.LEVEL_NODE] = 1
1623
    if not self.op.nodes:
1624
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1625
    else:
1626
      self.needed_locks[locking.LEVEL_NODE] = \
1627
        _GetWantedNodes(self, self.op.nodes)
1628

    
1629
  def CheckPrereq(self):
1630
    """Check prerequisites.
1631

1632
    This checks that the fields required are valid output fields.
1633

1634
    """
1635
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1636

    
1637
  def Exec(self, feedback_fn):
1638
    """Computes the list of nodes and their attributes.
1639

1640
    """
1641
    nodenames = self.nodes
1642
    volumes = self.rpc.call_node_volumes(nodenames)
1643

    
1644
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1645
             in self.cfg.GetInstanceList()]
1646

    
1647
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1648

    
1649
    output = []
1650
    for node in nodenames:
1651
      if node not in volumes or not volumes[node]:
1652
        continue
1653

    
1654
      node_vols = volumes[node][:]
1655
      node_vols.sort(key=lambda vol: vol['dev'])
1656

    
1657
      for vol in node_vols:
1658
        node_output = []
1659
        for field in self.op.output_fields:
1660
          if field == "node":
1661
            val = node
1662
          elif field == "phys":
1663
            val = vol['dev']
1664
          elif field == "vg":
1665
            val = vol['vg']
1666
          elif field == "name":
1667
            val = vol['name']
1668
          elif field == "size":
1669
            val = int(float(vol['size']))
1670
          elif field == "instance":
1671
            for inst in ilist:
1672
              if node not in lv_by_node[inst]:
1673
                continue
1674
              if vol['name'] in lv_by_node[inst][node]:
1675
                val = inst.name
1676
                break
1677
            else:
1678
              val = '-'
1679
          else:
1680
            raise errors.ParameterError(field)
1681
          node_output.append(str(val))
1682

    
1683
        output.append(node_output)
1684

    
1685
    return output
1686

    
1687

    
1688
class LUAddNode(LogicalUnit):
1689
  """Logical unit for adding node to the cluster.
1690

1691
  """
1692
  HPATH = "node-add"
1693
  HTYPE = constants.HTYPE_NODE
1694
  _OP_REQP = ["node_name"]
1695

    
1696
  def BuildHooksEnv(self):
1697
    """Build hooks env.
1698

1699
    This will run on all nodes before, and on all nodes + the new node after.
1700

1701
    """
1702
    env = {
1703
      "OP_TARGET": self.op.node_name,
1704
      "NODE_NAME": self.op.node_name,
1705
      "NODE_PIP": self.op.primary_ip,
1706
      "NODE_SIP": self.op.secondary_ip,
1707
      }
1708
    nodes_0 = self.cfg.GetNodeList()
1709
    nodes_1 = nodes_0 + [self.op.node_name, ]
1710
    return env, nodes_0, nodes_1
1711

    
1712
  def CheckPrereq(self):
1713
    """Check prerequisites.
1714

1715
    This checks:
1716
     - the new node is not already in the config
1717
     - it is resolvable
1718
     - its parameters (single/dual homed) matches the cluster
1719

1720
    Any errors are signalled by raising errors.OpPrereqError.
1721

1722
    """
1723
    node_name = self.op.node_name
1724
    cfg = self.cfg
1725

    
1726
    dns_data = utils.HostInfo(node_name)
1727

    
1728
    node = dns_data.name
1729
    primary_ip = self.op.primary_ip = dns_data.ip
1730
    secondary_ip = getattr(self.op, "secondary_ip", None)
1731
    if secondary_ip is None:
1732
      secondary_ip = primary_ip
1733
    if not utils.IsValidIP(secondary_ip):
1734
      raise errors.OpPrereqError("Invalid secondary IP given")
1735
    self.op.secondary_ip = secondary_ip
1736

    
1737
    node_list = cfg.GetNodeList()
1738
    if not self.op.readd and node in node_list:
1739
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1740
                                 node)
1741
    elif self.op.readd and node not in node_list:
1742
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1743

    
1744
    for existing_node_name in node_list:
1745
      existing_node = cfg.GetNodeInfo(existing_node_name)
1746

    
1747
      if self.op.readd and node == existing_node_name:
1748
        if (existing_node.primary_ip != primary_ip or
1749
            existing_node.secondary_ip != secondary_ip):
1750
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1751
                                     " address configuration as before")
1752
        continue
1753

    
1754
      if (existing_node.primary_ip == primary_ip or
1755
          existing_node.secondary_ip == primary_ip or
1756
          existing_node.primary_ip == secondary_ip or
1757
          existing_node.secondary_ip == secondary_ip):
1758
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1759
                                   " existing node %s" % existing_node.name)
1760

    
1761
    # check that the type of the node (single versus dual homed) is the
1762
    # same as for the master
1763
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1764
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1765
    newbie_singlehomed = secondary_ip == primary_ip
1766
    if master_singlehomed != newbie_singlehomed:
1767
      if master_singlehomed:
1768
        raise errors.OpPrereqError("The master has no private ip but the"
1769
                                   " new node has one")
1770
      else:
1771
        raise errors.OpPrereqError("The master has a private ip but the"
1772
                                   " new node doesn't have one")
1773

    
1774
    # checks reachablity
1775
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1776
      raise errors.OpPrereqError("Node not reachable by ping")
1777

    
1778
    if not newbie_singlehomed:
1779
      # check reachability from my secondary ip to newbie's secondary ip
1780
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1781
                           source=myself.secondary_ip):
1782
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1783
                                   " based ping to noded port")
1784

    
1785
    self.new_node = objects.Node(name=node,
1786
                                 primary_ip=primary_ip,
1787
                                 secondary_ip=secondary_ip)
1788

    
1789
  def Exec(self, feedback_fn):
1790
    """Adds the new node to the cluster.
1791

1792
    """
1793
    new_node = self.new_node
1794
    node = new_node.name
1795

    
1796
    # check connectivity
1797
    result = self.rpc.call_version([node])[node]
1798
    if result:
1799
      if constants.PROTOCOL_VERSION == result:
1800
        logger.Info("communication to node %s fine, sw version %s match" %
1801
                    (node, result))
1802
      else:
1803
        raise errors.OpExecError("Version mismatch master version %s,"
1804
                                 " node version %s" %
1805
                                 (constants.PROTOCOL_VERSION, result))
1806
    else:
1807
      raise errors.OpExecError("Cannot get version from the new node")
1808

    
1809
    # setup ssh on node
1810
    logger.Info("copy ssh key to node %s" % node)
1811
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1812
    keyarray = []
1813
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1814
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1815
                priv_key, pub_key]
1816

    
1817
    for i in keyfiles:
1818
      f = open(i, 'r')
1819
      try:
1820
        keyarray.append(f.read())
1821
      finally:
1822
        f.close()
1823

    
1824
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1825
                                    keyarray[2],
1826
                                    keyarray[3], keyarray[4], keyarray[5])
1827

    
1828
    if not result:
1829
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1830

    
1831
    # Add node to our /etc/hosts, and add key to known_hosts
1832
    utils.AddHostToEtcHosts(new_node.name)
1833

    
1834
    if new_node.secondary_ip != new_node.primary_ip:
1835
      if not self.rpc.call_node_has_ip_address(new_node.name,
1836
                                               new_node.secondary_ip):
1837
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1838
                                 " you gave (%s). Please fix and re-run this"
1839
                                 " command." % new_node.secondary_ip)
1840

    
1841
    node_verify_list = [self.cfg.GetMasterNode()]
1842
    node_verify_param = {
1843
      'nodelist': [node],
1844
      # TODO: do a node-net-test as well?
1845
    }
1846

    
1847
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1848
                                       self.cfg.GetClusterName())
1849
    for verifier in node_verify_list:
1850
      if not result[verifier]:
1851
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1852
                                 " for remote verification" % verifier)
1853
      if result[verifier]['nodelist']:
1854
        for failed in result[verifier]['nodelist']:
1855
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1856
                      (verifier, result[verifier]['nodelist'][failed]))
1857
        raise errors.OpExecError("ssh/hostname verification failed.")
1858

    
1859
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1860
    # including the node just added
1861
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1862
    dist_nodes = self.cfg.GetNodeList()
1863
    if not self.op.readd:
1864
      dist_nodes.append(node)
1865
    if myself.name in dist_nodes:
1866
      dist_nodes.remove(myself.name)
1867

    
1868
    logger.Debug("Copying hosts and known_hosts to all nodes")
1869
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1870
      result = self.rpc.call_upload_file(dist_nodes, fname)
1871
      for to_node in dist_nodes:
1872
        if not result[to_node]:
1873
          logger.Error("copy of file %s to node %s failed" %
1874
                       (fname, to_node))
1875

    
1876
    to_copy = []
1877
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1878
      to_copy.append(constants.VNC_PASSWORD_FILE)
1879
    for fname in to_copy:
1880
      result = self.rpc.call_upload_file([node], fname)
1881
      if not result[node]:
1882
        logger.Error("could not copy file %s to node %s" % (fname, node))
1883

    
1884
    if self.op.readd:
1885
      self.context.ReaddNode(new_node)
1886
    else:
1887
      self.context.AddNode(new_node)
1888

    
1889

    
1890
class LUQueryClusterInfo(NoHooksLU):
1891
  """Query cluster configuration.
1892

1893
  """
1894
  _OP_REQP = []
1895
  REQ_MASTER = False
1896
  REQ_BGL = False
1897

    
1898
  def ExpandNames(self):
1899
    self.needed_locks = {}
1900

    
1901
  def CheckPrereq(self):
1902
    """No prerequsites needed for this LU.
1903

1904
    """
1905
    pass
1906

    
1907
  def Exec(self, feedback_fn):
1908
    """Return cluster config.
1909

1910
    """
1911
    cluster = self.cfg.GetClusterInfo()
1912
    result = {
1913
      "software_version": constants.RELEASE_VERSION,
1914
      "protocol_version": constants.PROTOCOL_VERSION,
1915
      "config_version": constants.CONFIG_VERSION,
1916
      "os_api_version": constants.OS_API_VERSION,
1917
      "export_version": constants.EXPORT_VERSION,
1918
      "architecture": (platform.architecture()[0], platform.machine()),
1919
      "name": cluster.cluster_name,
1920
      "master": cluster.master_node,
1921
      "hypervisor_type": cluster.hypervisor,
1922
      "enabled_hypervisors": cluster.enabled_hypervisors,
1923
      "hvparams": cluster.hvparams,
1924
      "beparams": cluster.beparams,
1925
      }
1926

    
1927
    return result
1928

    
1929

    
1930
class LUQueryConfigValues(NoHooksLU):
1931
  """Return configuration values.
1932

1933
  """
1934
  _OP_REQP = []
1935
  REQ_BGL = False
1936

    
1937
  def ExpandNames(self):
1938
    self.needed_locks = {}
1939

    
1940
    static_fields = ["cluster_name", "master_node", "drain_flag"]
1941
    _CheckOutputFields(static=static_fields,
1942
                       dynamic=[],
1943
                       selected=self.op.output_fields)
1944

    
1945
  def CheckPrereq(self):
1946
    """No prerequisites.
1947

1948
    """
1949
    pass
1950

    
1951
  def Exec(self, feedback_fn):
1952
    """Dump a representation of the cluster config to the standard output.
1953

1954
    """
1955
    values = []
1956
    for field in self.op.output_fields:
1957
      if field == "cluster_name":
1958
        entry = self.cfg.GetClusterName()
1959
      elif field == "master_node":
1960
        entry = self.cfg.GetMasterNode()
1961
      elif field == "drain_flag":
1962
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1963
      else:
1964
        raise errors.ParameterError(field)
1965
      values.append(entry)
1966
    return values
1967

    
1968

    
1969
class LUActivateInstanceDisks(NoHooksLU):
1970
  """Bring up an instance's disks.
1971

1972
  """
1973
  _OP_REQP = ["instance_name"]
1974
  REQ_BGL = False
1975

    
1976
  def ExpandNames(self):
1977
    self._ExpandAndLockInstance()
1978
    self.needed_locks[locking.LEVEL_NODE] = []
1979
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1980

    
1981
  def DeclareLocks(self, level):
1982
    if level == locking.LEVEL_NODE:
1983
      self._LockInstancesNodes()
1984

    
1985
  def CheckPrereq(self):
1986
    """Check prerequisites.
1987

1988
    This checks that the instance is in the cluster.
1989

1990
    """
1991
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1992
    assert self.instance is not None, \
1993
      "Cannot retrieve locked instance %s" % self.op.instance_name
1994

    
1995
  def Exec(self, feedback_fn):
1996
    """Activate the disks.
1997

1998
    """
1999
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2000
    if not disks_ok:
2001
      raise errors.OpExecError("Cannot activate block devices")
2002

    
2003
    return disks_info
2004

    
2005

    
2006
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2007
  """Prepare the block devices for an instance.
2008

2009
  This sets up the block devices on all nodes.
2010

2011
  Args:
2012
    instance: a ganeti.objects.Instance object
2013
    ignore_secondaries: if true, errors on secondary nodes won't result
2014
                        in an error return from the function
2015

2016
  Returns:
2017
    false if the operation failed
2018
    list of (host, instance_visible_name, node_visible_name) if the operation
2019
         suceeded with the mapping from node devices to instance devices
2020
  """
2021
  device_info = []
2022
  disks_ok = True
2023
  iname = instance.name
2024
  # With the two passes mechanism we try to reduce the window of
2025
  # opportunity for the race condition of switching DRBD to primary
2026
  # before handshaking occured, but we do not eliminate it
2027

    
2028
  # The proper fix would be to wait (with some limits) until the
2029
  # connection has been made and drbd transitions from WFConnection
2030
  # into any other network-connected state (Connected, SyncTarget,
2031
  # SyncSource, etc.)
2032

    
2033
  # 1st pass, assemble on all nodes in secondary mode
2034
  for inst_disk in instance.disks:
2035
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2036
      lu.cfg.SetDiskID(node_disk, node)
2037
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2038
      if not result:
2039
        logger.Error("could not prepare block device %s on node %s"
2040
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2041
        if not ignore_secondaries:
2042
          disks_ok = False
2043

    
2044
  # FIXME: race condition on drbd migration to primary
2045

    
2046
  # 2nd pass, do only the primary node
2047
  for inst_disk in instance.disks:
2048
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2049
      if node != instance.primary_node:
2050
        continue
2051
      lu.cfg.SetDiskID(node_disk, node)
2052
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2053
      if not result:
2054
        logger.Error("could not prepare block device %s on node %s"
2055
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2056
        disks_ok = False
2057
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2058

    
2059
  # leave the disks configured for the primary node
2060
  # this is a workaround that would be fixed better by
2061
  # improving the logical/physical id handling
2062
  for disk in instance.disks:
2063
    lu.cfg.SetDiskID(disk, instance.primary_node)
2064

    
2065
  return disks_ok, device_info
2066

    
2067

    
2068
def _StartInstanceDisks(lu, instance, force):
2069
  """Start the disks of an instance.
2070

2071
  """
2072
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2073
                                           ignore_secondaries=force)
2074
  if not disks_ok:
2075
    _ShutdownInstanceDisks(lu, instance)
2076
    if force is not None and not force:
2077
      logger.Error("If the message above refers to a secondary node,"
2078
                   " you can retry the operation using '--force'.")
2079
    raise errors.OpExecError("Disk consistency error")
2080

    
2081

    
2082
class LUDeactivateInstanceDisks(NoHooksLU):
2083
  """Shutdown an instance's disks.
2084

2085
  """
2086
  _OP_REQP = ["instance_name"]
2087
  REQ_BGL = False
2088

    
2089
  def ExpandNames(self):
2090
    self._ExpandAndLockInstance()
2091
    self.needed_locks[locking.LEVEL_NODE] = []
2092
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2093

    
2094
  def DeclareLocks(self, level):
2095
    if level == locking.LEVEL_NODE:
2096
      self._LockInstancesNodes()
2097

    
2098
  def CheckPrereq(self):
2099
    """Check prerequisites.
2100

2101
    This checks that the instance is in the cluster.
2102

2103
    """
2104
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2105
    assert self.instance is not None, \
2106
      "Cannot retrieve locked instance %s" % self.op.instance_name
2107

    
2108
  def Exec(self, feedback_fn):
2109
    """Deactivate the disks
2110

2111
    """
2112
    instance = self.instance
2113
    _SafeShutdownInstanceDisks(self, instance)
2114

    
2115

    
2116
def _SafeShutdownInstanceDisks(lu, instance):
2117
  """Shutdown block devices of an instance.
2118

2119
  This function checks if an instance is running, before calling
2120
  _ShutdownInstanceDisks.
2121

2122
  """
2123
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2124
                                      [instance.hypervisor])
2125
  ins_l = ins_l[instance.primary_node]
2126
  if not type(ins_l) is list:
2127
    raise errors.OpExecError("Can't contact node '%s'" %
2128
                             instance.primary_node)
2129

    
2130
  if instance.name in ins_l:
2131
    raise errors.OpExecError("Instance is running, can't shutdown"
2132
                             " block devices.")
2133

    
2134
  _ShutdownInstanceDisks(lu, instance)
2135

    
2136

    
2137
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2138
  """Shutdown block devices of an instance.
2139

2140
  This does the shutdown on all nodes of the instance.
2141

2142
  If the ignore_primary is false, errors on the primary node are
2143
  ignored.
2144

2145
  """
2146
  result = True
2147
  for disk in instance.disks:
2148
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2149
      lu.cfg.SetDiskID(top_disk, node)
2150
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2151
        logger.Error("could not shutdown block device %s on node %s" %
2152
                     (disk.iv_name, node))
2153
        if not ignore_primary or node != instance.primary_node:
2154
          result = False
2155
  return result
2156

    
2157

    
2158
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2159
  """Checks if a node has enough free memory.
2160

2161
  This function check if a given node has the needed amount of free
2162
  memory. In case the node has less memory or we cannot get the
2163
  information from the node, this function raise an OpPrereqError
2164
  exception.
2165

2166
  @type lu: C{LogicalUnit}
2167
  @param lu: a logical unit from which we get configuration data
2168
  @type node: C{str}
2169
  @param node: the node to check
2170
  @type reason: C{str}
2171
  @param reason: string to use in the error message
2172
  @type requested: C{int}
2173
  @param requested: the amount of memory in MiB to check for
2174
  @type hypervisor: C{str}
2175
  @param hypervisor: the hypervisor to ask for memory stats
2176
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2177
      we cannot check the node
2178

2179
  """
2180
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2181
  if not nodeinfo or not isinstance(nodeinfo, dict):
2182
    raise errors.OpPrereqError("Could not contact node %s for resource"
2183
                             " information" % (node,))
2184

    
2185
  free_mem = nodeinfo[node].get('memory_free')
2186
  if not isinstance(free_mem, int):
2187
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2188
                             " was '%s'" % (node, free_mem))
2189
  if requested > free_mem:
2190
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2191
                             " needed %s MiB, available %s MiB" %
2192
                             (node, reason, requested, free_mem))
2193

    
2194

    
2195
class LUStartupInstance(LogicalUnit):
2196
  """Starts an instance.
2197

2198
  """
2199
  HPATH = "instance-start"
2200
  HTYPE = constants.HTYPE_INSTANCE
2201
  _OP_REQP = ["instance_name", "force"]
2202
  REQ_BGL = False
2203

    
2204
  def ExpandNames(self):
2205
    self._ExpandAndLockInstance()
2206
    self.needed_locks[locking.LEVEL_NODE] = []
2207
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2208

    
2209
  def DeclareLocks(self, level):
2210
    if level == locking.LEVEL_NODE:
2211
      self._LockInstancesNodes()
2212

    
2213
  def BuildHooksEnv(self):
2214
    """Build hooks env.
2215

2216
    This runs on master, primary and secondary nodes of the instance.
2217

2218
    """
2219
    env = {
2220
      "FORCE": self.op.force,
2221
      }
2222
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2223
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2224
          list(self.instance.secondary_nodes))
2225
    return env, nl, nl
2226

    
2227
  def CheckPrereq(self):
2228
    """Check prerequisites.
2229

2230
    This checks that the instance is in the cluster.
2231

2232
    """
2233
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2234
    assert self.instance is not None, \
2235
      "Cannot retrieve locked instance %s" % self.op.instance_name
2236

    
2237
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2238
    # check bridges existance
2239
    _CheckInstanceBridgesExist(self, instance)
2240

    
2241
    _CheckNodeFreeMemory(self, instance.primary_node,
2242
                         "starting instance %s" % instance.name,
2243
                         bep[constants.BE_MEMORY], instance.hypervisor)
2244

    
2245
  def Exec(self, feedback_fn):
2246
    """Start the instance.
2247

2248
    """
2249
    instance = self.instance
2250
    force = self.op.force
2251
    extra_args = getattr(self.op, "extra_args", "")
2252

    
2253
    self.cfg.MarkInstanceUp(instance.name)
2254

    
2255
    node_current = instance.primary_node
2256

    
2257
    _StartInstanceDisks(self, instance, force)
2258

    
2259
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2260
      _ShutdownInstanceDisks(self, instance)
2261
      raise errors.OpExecError("Could not start instance")
2262

    
2263

    
2264
class LURebootInstance(LogicalUnit):
2265
  """Reboot an instance.
2266

2267
  """
2268
  HPATH = "instance-reboot"
2269
  HTYPE = constants.HTYPE_INSTANCE
2270
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2271
  REQ_BGL = False
2272

    
2273
  def ExpandNames(self):
2274
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2275
                                   constants.INSTANCE_REBOOT_HARD,
2276
                                   constants.INSTANCE_REBOOT_FULL]:
2277
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2278
                                  (constants.INSTANCE_REBOOT_SOFT,
2279
                                   constants.INSTANCE_REBOOT_HARD,
2280
                                   constants.INSTANCE_REBOOT_FULL))
2281
    self._ExpandAndLockInstance()
2282
    self.needed_locks[locking.LEVEL_NODE] = []
2283
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2284

    
2285
  def DeclareLocks(self, level):
2286
    if level == locking.LEVEL_NODE:
2287
      primary_only = not constants.INSTANCE_REBOOT_FULL
2288
      self._LockInstancesNodes(primary_only=primary_only)
2289

    
2290
  def BuildHooksEnv(self):
2291
    """Build hooks env.
2292

2293
    This runs on master, primary and secondary nodes of the instance.
2294

2295
    """
2296
    env = {
2297
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2298
      }
2299
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2300
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2301
          list(self.instance.secondary_nodes))
2302
    return env, nl, nl
2303

    
2304
  def CheckPrereq(self):
2305
    """Check prerequisites.
2306

2307
    This checks that the instance is in the cluster.
2308

2309
    """
2310
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2311
    assert self.instance is not None, \
2312
      "Cannot retrieve locked instance %s" % self.op.instance_name
2313

    
2314
    # check bridges existance
2315
    _CheckInstanceBridgesExist(self, instance)
2316

    
2317
  def Exec(self, feedback_fn):
2318
    """Reboot the instance.
2319

2320
    """
2321
    instance = self.instance
2322
    ignore_secondaries = self.op.ignore_secondaries
2323
    reboot_type = self.op.reboot_type
2324
    extra_args = getattr(self.op, "extra_args", "")
2325

    
2326
    node_current = instance.primary_node
2327

    
2328
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2329
                       constants.INSTANCE_REBOOT_HARD]:
2330
      if not self.rpc.call_instance_reboot(node_current, instance,
2331
                                           reboot_type, extra_args):
2332
        raise errors.OpExecError("Could not reboot instance")
2333
    else:
2334
      if not self.rpc.call_instance_shutdown(node_current, instance):
2335
        raise errors.OpExecError("could not shutdown instance for full reboot")
2336
      _ShutdownInstanceDisks(self, instance)
2337
      _StartInstanceDisks(self, instance, ignore_secondaries)
2338
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2339
        _ShutdownInstanceDisks(self, instance)
2340
        raise errors.OpExecError("Could not start instance for full reboot")
2341

    
2342
    self.cfg.MarkInstanceUp(instance.name)
2343

    
2344

    
2345
class LUShutdownInstance(LogicalUnit):
2346
  """Shutdown an instance.
2347

2348
  """
2349
  HPATH = "instance-stop"
2350
  HTYPE = constants.HTYPE_INSTANCE
2351
  _OP_REQP = ["instance_name"]
2352
  REQ_BGL = False
2353

    
2354
  def ExpandNames(self):
2355
    self._ExpandAndLockInstance()
2356
    self.needed_locks[locking.LEVEL_NODE] = []
2357
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2358

    
2359
  def DeclareLocks(self, level):
2360
    if level == locking.LEVEL_NODE:
2361
      self._LockInstancesNodes()
2362

    
2363
  def BuildHooksEnv(self):
2364
    """Build hooks env.
2365

2366
    This runs on master, primary and secondary nodes of the instance.
2367

2368
    """
2369
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2370
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2371
          list(self.instance.secondary_nodes))
2372
    return env, nl, nl
2373

    
2374
  def CheckPrereq(self):
2375
    """Check prerequisites.
2376

2377
    This checks that the instance is in the cluster.
2378

2379
    """
2380
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2381
    assert self.instance is not None, \
2382
      "Cannot retrieve locked instance %s" % self.op.instance_name
2383

    
2384
  def Exec(self, feedback_fn):
2385
    """Shutdown the instance.
2386

2387
    """
2388
    instance = self.instance
2389
    node_current = instance.primary_node
2390
    self.cfg.MarkInstanceDown(instance.name)
2391
    if not self.rpc.call_instance_shutdown(node_current, instance):
2392
      logger.Error("could not shutdown instance")
2393

    
2394
    _ShutdownInstanceDisks(self, instance)
2395

    
2396

    
2397
class LUReinstallInstance(LogicalUnit):
2398
  """Reinstall an instance.
2399

2400
  """
2401
  HPATH = "instance-reinstall"
2402
  HTYPE = constants.HTYPE_INSTANCE
2403
  _OP_REQP = ["instance_name"]
2404
  REQ_BGL = False
2405

    
2406
  def ExpandNames(self):
2407
    self._ExpandAndLockInstance()
2408
    self.needed_locks[locking.LEVEL_NODE] = []
2409
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2410

    
2411
  def DeclareLocks(self, level):
2412
    if level == locking.LEVEL_NODE:
2413
      self._LockInstancesNodes()
2414

    
2415
  def BuildHooksEnv(self):
2416
    """Build hooks env.
2417

2418
    This runs on master, primary and secondary nodes of the instance.
2419

2420
    """
2421
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2422
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2423
          list(self.instance.secondary_nodes))
2424
    return env, nl, nl
2425

    
2426
  def CheckPrereq(self):
2427
    """Check prerequisites.
2428

2429
    This checks that the instance is in the cluster and is not running.
2430

2431
    """
2432
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2433
    assert instance is not None, \
2434
      "Cannot retrieve locked instance %s" % self.op.instance_name
2435

    
2436
    if instance.disk_template == constants.DT_DISKLESS:
2437
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2438
                                 self.op.instance_name)
2439
    if instance.status != "down":
2440
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2441
                                 self.op.instance_name)
2442
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2443
                                              instance.name,
2444
                                              instance.hypervisor)
2445
    if remote_info:
2446
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2447
                                 (self.op.instance_name,
2448
                                  instance.primary_node))
2449

    
2450
    self.op.os_type = getattr(self.op, "os_type", None)
2451
    if self.op.os_type is not None:
2452
      # OS verification
2453
      pnode = self.cfg.GetNodeInfo(
2454
        self.cfg.ExpandNodeName(instance.primary_node))
2455
      if pnode is None:
2456
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2457
                                   self.op.pnode)
2458
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2459
      if not os_obj:
2460
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2461
                                   " primary node"  % self.op.os_type)
2462

    
2463
    self.instance = instance
2464

    
2465
  def Exec(self, feedback_fn):
2466
    """Reinstall the instance.
2467

2468
    """
2469
    inst = self.instance
2470

    
2471
    if self.op.os_type is not None:
2472
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2473
      inst.os = self.op.os_type
2474
      self.cfg.Update(inst)
2475

    
2476
    _StartInstanceDisks(self, inst, None)
2477
    try:
2478
      feedback_fn("Running the instance OS create scripts...")
2479
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2480
                                           "sda", "sdb"):
2481
        raise errors.OpExecError("Could not install OS for instance %s"
2482
                                 " on node %s" %
2483
                                 (inst.name, inst.primary_node))
2484
    finally:
2485
      _ShutdownInstanceDisks(self, inst)
2486

    
2487

    
2488
class LURenameInstance(LogicalUnit):
2489
  """Rename an instance.
2490

2491
  """
2492
  HPATH = "instance-rename"
2493
  HTYPE = constants.HTYPE_INSTANCE
2494
  _OP_REQP = ["instance_name", "new_name"]
2495

    
2496
  def BuildHooksEnv(self):
2497
    """Build hooks env.
2498

2499
    This runs on master, primary and secondary nodes of the instance.
2500

2501
    """
2502
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2503
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2504
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2505
          list(self.instance.secondary_nodes))
2506
    return env, nl, nl
2507

    
2508
  def CheckPrereq(self):
2509
    """Check prerequisites.
2510

2511
    This checks that the instance is in the cluster and is not running.
2512

2513
    """
2514
    instance = self.cfg.GetInstanceInfo(
2515
      self.cfg.ExpandInstanceName(self.op.instance_name))
2516
    if instance is None:
2517
      raise errors.OpPrereqError("Instance '%s' not known" %
2518
                                 self.op.instance_name)
2519
    if instance.status != "down":
2520
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2521
                                 self.op.instance_name)
2522
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2523
                                              instance.name,
2524
                                              instance.hypervisor)
2525
    if remote_info:
2526
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2527
                                 (self.op.instance_name,
2528
                                  instance.primary_node))
2529
    self.instance = instance
2530

    
2531
    # new name verification
2532
    name_info = utils.HostInfo(self.op.new_name)
2533

    
2534
    self.op.new_name = new_name = name_info.name
2535
    instance_list = self.cfg.GetInstanceList()
2536
    if new_name in instance_list:
2537
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2538
                                 new_name)
2539

    
2540
    if not getattr(self.op, "ignore_ip", False):
2541
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2542
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2543
                                   (name_info.ip, new_name))
2544

    
2545

    
2546
  def Exec(self, feedback_fn):
2547
    """Reinstall the instance.
2548

2549
    """
2550
    inst = self.instance
2551
    old_name = inst.name
2552

    
2553
    if inst.disk_template == constants.DT_FILE:
2554
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2555

    
2556
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2557
    # Change the instance lock. This is definitely safe while we hold the BGL
2558
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2559
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2560

    
2561
    # re-read the instance from the configuration after rename
2562
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2563

    
2564
    if inst.disk_template == constants.DT_FILE:
2565
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2566
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2567
                                                     old_file_storage_dir,
2568
                                                     new_file_storage_dir)
2569

    
2570
      if not result:
2571
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2572
                                 " directory '%s' to '%s' (but the instance"
2573
                                 " has been renamed in Ganeti)" % (
2574
                                 inst.primary_node, old_file_storage_dir,
2575
                                 new_file_storage_dir))
2576

    
2577
      if not result[0]:
2578
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2579
                                 " (but the instance has been renamed in"
2580
                                 " Ganeti)" % (old_file_storage_dir,
2581
                                               new_file_storage_dir))
2582

    
2583
    _StartInstanceDisks(self, inst, None)
2584
    try:
2585
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2586
                                               old_name,
2587
                                               "sda", "sdb"):
2588
        msg = ("Could not run OS rename script for instance %s on node %s"
2589
               " (but the instance has been renamed in Ganeti)" %
2590
               (inst.name, inst.primary_node))
2591
        logger.Error(msg)
2592
    finally:
2593
      _ShutdownInstanceDisks(self, inst)
2594

    
2595

    
2596
class LURemoveInstance(LogicalUnit):
2597
  """Remove an instance.
2598

2599
  """
2600
  HPATH = "instance-remove"
2601
  HTYPE = constants.HTYPE_INSTANCE
2602
  _OP_REQP = ["instance_name", "ignore_failures"]
2603
  REQ_BGL = False
2604

    
2605
  def ExpandNames(self):
2606
    self._ExpandAndLockInstance()
2607
    self.needed_locks[locking.LEVEL_NODE] = []
2608
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2609

    
2610
  def DeclareLocks(self, level):
2611
    if level == locking.LEVEL_NODE:
2612
      self._LockInstancesNodes()
2613

    
2614
  def BuildHooksEnv(self):
2615
    """Build hooks env.
2616

2617
    This runs on master, primary and secondary nodes of the instance.
2618

2619
    """
2620
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2621
    nl = [self.cfg.GetMasterNode()]
2622
    return env, nl, nl
2623

    
2624
  def CheckPrereq(self):
2625
    """Check prerequisites.
2626

2627
    This checks that the instance is in the cluster.
2628

2629
    """
2630
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2631
    assert self.instance is not None, \
2632
      "Cannot retrieve locked instance %s" % self.op.instance_name
2633

    
2634
  def Exec(self, feedback_fn):
2635
    """Remove the instance.
2636

2637
    """
2638
    instance = self.instance
2639
    logger.Info("shutting down instance %s on node %s" %
2640
                (instance.name, instance.primary_node))
2641

    
2642
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2643
      if self.op.ignore_failures:
2644
        feedback_fn("Warning: can't shutdown instance")
2645
      else:
2646
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2647
                                 (instance.name, instance.primary_node))
2648

    
2649
    logger.Info("removing block devices for instance %s" % instance.name)
2650

    
2651
    if not _RemoveDisks(self, instance):
2652
      if self.op.ignore_failures:
2653
        feedback_fn("Warning: can't remove instance's disks")
2654
      else:
2655
        raise errors.OpExecError("Can't remove instance's disks")
2656

    
2657
    logger.Info("removing instance %s out of cluster config" % instance.name)
2658

    
2659
    self.cfg.RemoveInstance(instance.name)
2660
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2661

    
2662

    
2663
class LUQueryInstances(NoHooksLU):
2664
  """Logical unit for querying instances.
2665

2666
  """
2667
  _OP_REQP = ["output_fields", "names"]
2668
  REQ_BGL = False
2669

    
2670
  def ExpandNames(self):
2671
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2672
    hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2673
    bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2674
    self.static_fields = frozenset([
2675
      "name", "os", "pnode", "snodes",
2676
      "admin_state", "admin_ram",
2677
      "disk_template", "ip", "mac", "bridge",
2678
      "sda_size", "sdb_size", "vcpus", "tags",
2679
      "network_port",
2680
      "serial_no", "hypervisor", "hvparams",
2681
      ] + hvp + bep)
2682

    
2683
    _CheckOutputFields(static=self.static_fields,
2684
                       dynamic=self.dynamic_fields,
2685
                       selected=self.op.output_fields)
2686

    
2687
    self.needed_locks = {}
2688
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2689
    self.share_locks[locking.LEVEL_NODE] = 1
2690

    
2691
    if self.op.names:
2692
      self.wanted = _GetWantedInstances(self, self.op.names)
2693
    else:
2694
      self.wanted = locking.ALL_SET
2695

    
2696
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2697
    if self.do_locking:
2698
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2699
      self.needed_locks[locking.LEVEL_NODE] = []
2700
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2701

    
2702
  def DeclareLocks(self, level):
2703
    if level == locking.LEVEL_NODE and self.do_locking:
2704
      self._LockInstancesNodes()
2705

    
2706
  def CheckPrereq(self):
2707
    """Check prerequisites.
2708

2709
    """
2710
    pass
2711

    
2712
  def Exec(self, feedback_fn):
2713
    """Computes the list of nodes and their attributes.
2714

2715
    """
2716
    all_info = self.cfg.GetAllInstancesInfo()
2717
    if self.do_locking:
2718
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2719
    elif self.wanted != locking.ALL_SET:
2720
      instance_names = self.wanted
2721
      missing = set(instance_names).difference(all_info.keys())
2722
      if missing:
2723
        raise errors.OpExecError(
2724
          "Some instances were removed before retrieving their data: %s"
2725
          % missing)
2726
    else:
2727
      instance_names = all_info.keys()
2728

    
2729
    instance_names = utils.NiceSort(instance_names)
2730
    instance_list = [all_info[iname] for iname in instance_names]
2731

    
2732
    # begin data gathering
2733

    
2734
    nodes = frozenset([inst.primary_node for inst in instance_list])
2735
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2736

    
2737
    bad_nodes = []
2738
    if self.dynamic_fields.intersection(self.op.output_fields):
2739
      live_data = {}
2740
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2741
      for name in nodes:
2742
        result = node_data[name]
2743
        if result:
2744
          live_data.update(result)
2745
        elif result == False:
2746
          bad_nodes.append(name)
2747
        # else no instance is alive
2748
    else:
2749
      live_data = dict([(name, {}) for name in instance_names])
2750

    
2751
    # end data gathering
2752

    
2753
    HVPREFIX = "hv/"
2754
    BEPREFIX = "be/"
2755
    output = []
2756
    for instance in instance_list:
2757
      iout = []
2758
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2759
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2760
      for field in self.op.output_fields:
2761
        if field == "name":
2762
          val = instance.name
2763
        elif field == "os":
2764
          val = instance.os
2765
        elif field == "pnode":
2766
          val = instance.primary_node
2767
        elif field == "snodes":
2768
          val = list(instance.secondary_nodes)
2769
        elif field == "admin_state":
2770
          val = (instance.status != "down")
2771
        elif field == "oper_state":
2772
          if instance.primary_node in bad_nodes:
2773
            val = None
2774
          else:
2775
            val = bool(live_data.get(instance.name))
2776
        elif field == "status":
2777
          if instance.primary_node in bad_nodes:
2778
            val = "ERROR_nodedown"
2779
          else:
2780
            running = bool(live_data.get(instance.name))
2781
            if running:
2782
              if instance.status != "down":
2783
                val = "running"
2784
              else:
2785
                val = "ERROR_up"
2786
            else:
2787
              if instance.status != "down":
2788
                val = "ERROR_down"
2789
              else:
2790
                val = "ADMIN_down"
2791
        elif field == "oper_ram":
2792
          if instance.primary_node in bad_nodes:
2793
            val = None
2794
          elif instance.name in live_data:
2795
            val = live_data[instance.name].get("memory", "?")
2796
          else:
2797
            val = "-"
2798
        elif field == "disk_template":
2799
          val = instance.disk_template
2800
        elif field == "ip":
2801
          val = instance.nics[0].ip
2802
        elif field == "bridge":
2803
          val = instance.nics[0].bridge
2804
        elif field == "mac":
2805
          val = instance.nics[0].mac
2806
        elif field == "sda_size" or field == "sdb_size":
2807
          disk = instance.FindDisk(field[:3])
2808
          if disk is None:
2809
            val = None
2810
          else:
2811
            val = disk.size
2812
        elif field == "tags":
2813
          val = list(instance.GetTags())
2814
        elif field == "serial_no":
2815
          val = instance.serial_no
2816
        elif field == "network_port":
2817
          val = instance.network_port
2818
        elif field == "hypervisor":
2819
          val = instance.hypervisor
2820
        elif field == "hvparams":
2821
          val = i_hv
2822
        elif (field.startswith(HVPREFIX) and
2823
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2824
          val = i_hv.get(field[len(HVPREFIX):], None)
2825
        elif field == "beparams":
2826
          val = i_be
2827
        elif (field.startswith(BEPREFIX) and
2828
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2829
          val = i_be.get(field[len(BEPREFIX):], None)
2830
        else:
2831
          raise errors.ParameterError(field)
2832
        iout.append(val)
2833
      output.append(iout)
2834

    
2835
    return output
2836

    
2837

    
2838
class LUFailoverInstance(LogicalUnit):
2839
  """Failover an instance.
2840

2841
  """
2842
  HPATH = "instance-failover"
2843
  HTYPE = constants.HTYPE_INSTANCE
2844
  _OP_REQP = ["instance_name", "ignore_consistency"]
2845
  REQ_BGL = False
2846

    
2847
  def ExpandNames(self):
2848
    self._ExpandAndLockInstance()
2849
    self.needed_locks[locking.LEVEL_NODE] = []
2850
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2851

    
2852
  def DeclareLocks(self, level):
2853
    if level == locking.LEVEL_NODE:
2854
      self._LockInstancesNodes()
2855

    
2856
  def BuildHooksEnv(self):
2857
    """Build hooks env.
2858

2859
    This runs on master, primary and secondary nodes of the instance.
2860

2861
    """
2862
    env = {
2863
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2864
      }
2865
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2866
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2867
    return env, nl, nl
2868

    
2869
  def CheckPrereq(self):
2870
    """Check prerequisites.
2871

2872
    This checks that the instance is in the cluster.
2873

2874
    """
2875
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2876
    assert self.instance is not None, \
2877
      "Cannot retrieve locked instance %s" % self.op.instance_name
2878

    
2879
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2880
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2881
      raise errors.OpPrereqError("Instance's disk layout is not"
2882
                                 " network mirrored, cannot failover.")
2883

    
2884
    secondary_nodes = instance.secondary_nodes
2885
    if not secondary_nodes:
2886
      raise errors.ProgrammerError("no secondary node but using "
2887
                                   "a mirrored disk template")
2888

    
2889
    target_node = secondary_nodes[0]
2890
    # check memory requirements on the secondary node
2891
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2892
                         instance.name, bep[constants.BE_MEMORY],
2893
                         instance.hypervisor)
2894

    
2895
    # check bridge existance
2896
    brlist = [nic.bridge for nic in instance.nics]
2897
    if not self.rpc.call_bridges_exist(target_node, brlist):
2898
      raise errors.OpPrereqError("One or more target bridges %s does not"
2899
                                 " exist on destination node '%s'" %
2900
                                 (brlist, target_node))
2901

    
2902
  def Exec(self, feedback_fn):
2903
    """Failover an instance.
2904

2905
    The failover is done by shutting it down on its present node and
2906
    starting it on the secondary.
2907

2908
    """
2909
    instance = self.instance
2910

    
2911
    source_node = instance.primary_node
2912
    target_node = instance.secondary_nodes[0]
2913

    
2914
    feedback_fn("* checking disk consistency between source and target")
2915
    for dev in instance.disks:
2916
      # for drbd, these are drbd over lvm
2917
      if not _CheckDiskConsistency(self, dev, target_node, False):
2918
        if instance.status == "up" and not self.op.ignore_consistency:
2919
          raise errors.OpExecError("Disk %s is degraded on target node,"
2920
                                   " aborting failover." % dev.iv_name)
2921

    
2922
    feedback_fn("* shutting down instance on source node")
2923
    logger.Info("Shutting down instance %s on node %s" %
2924
                (instance.name, source_node))
2925

    
2926
    if not self.rpc.call_instance_shutdown(source_node, instance):
2927
      if self.op.ignore_consistency:
2928
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2929
                     " anyway. Please make sure node %s is down"  %
2930
                     (instance.name, source_node, source_node))
2931
      else:
2932
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2933
                                 (instance.name, source_node))
2934

    
2935
    feedback_fn("* deactivating the instance's disks on source node")
2936
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2937
      raise errors.OpExecError("Can't shut down the instance's disks.")
2938

    
2939
    instance.primary_node = target_node
2940
    # distribute new instance config to the other nodes
2941
    self.cfg.Update(instance)
2942

    
2943
    # Only start the instance if it's marked as up
2944
    if instance.status == "up":
2945
      feedback_fn("* activating the instance's disks on target node")
2946
      logger.Info("Starting instance %s on node %s" %
2947
                  (instance.name, target_node))
2948

    
2949
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2950
                                               ignore_secondaries=True)
2951
      if not disks_ok:
2952
        _ShutdownInstanceDisks(self, instance)
2953
        raise errors.OpExecError("Can't activate the instance's disks")
2954

    
2955
      feedback_fn("* starting the instance on the target node")
2956
      if not self.rpc.call_instance_start(target_node, instance, None):
2957
        _ShutdownInstanceDisks(self, instance)
2958
        raise errors.OpExecError("Could not start instance %s on node %s." %
2959
                                 (instance.name, target_node))
2960

    
2961

    
2962
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2963
  """Create a tree of block devices on the primary node.
2964

2965
  This always creates all devices.
2966

2967
  """
2968
  if device.children:
2969
    for child in device.children:
2970
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2971
        return False
2972

    
2973
  lu.cfg.SetDiskID(device, node)
2974
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2975
                                       instance.name, True, info)
2976
  if not new_id:
2977
    return False
2978
  if device.physical_id is None:
2979
    device.physical_id = new_id
2980
  return True
2981

    
2982

    
2983
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2984
  """Create a tree of block devices on a secondary node.
2985

2986
  If this device type has to be created on secondaries, create it and
2987
  all its children.
2988

2989
  If not, just recurse to children keeping the same 'force' value.
2990

2991
  """
2992
  if device.CreateOnSecondary():
2993
    force = True
2994
  if device.children:
2995
    for child in device.children:
2996
      if not _CreateBlockDevOnSecondary(lu, node, instance,
2997
                                        child, force, info):
2998
        return False
2999

    
3000
  if not force:
3001
    return True
3002
  lu.cfg.SetDiskID(device, node)
3003
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3004
                                       instance.name, False, info)
3005
  if not new_id:
3006
    return False
3007
  if device.physical_id is None:
3008
    device.physical_id = new_id
3009
  return True
3010

    
3011

    
3012
def _GenerateUniqueNames(lu, exts):
3013
  """Generate a suitable LV name.
3014

3015
  This will generate a logical volume name for the given instance.
3016

3017
  """
3018
  results = []
3019
  for val in exts:
3020
    new_id = lu.cfg.GenerateUniqueID()
3021
    results.append("%s%s" % (new_id, val))
3022
  return results
3023

    
3024

    
3025
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3026
                         p_minor, s_minor):
3027
  """Generate a drbd8 device complete with its children.
3028

3029
  """
3030
  port = lu.cfg.AllocatePort()
3031
  vgname = lu.cfg.GetVGName()
3032
  shared_secret = lu.cfg.GenerateDRBDSecret()
3033
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3034
                          logical_id=(vgname, names[0]))
3035
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3036
                          logical_id=(vgname, names[1]))
3037
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3038
                          logical_id=(primary, secondary, port,
3039
                                      p_minor, s_minor,
3040
                                      shared_secret),
3041
                          children=[dev_data, dev_meta],
3042
                          iv_name=iv_name)
3043
  return drbd_dev
3044

    
3045

    
3046
def _GenerateDiskTemplate(lu, template_name,
3047
                          instance_name, primary_node,
3048
                          secondary_nodes, disk_sz, swap_sz,
3049
                          file_storage_dir, file_driver):
3050
  """Generate the entire disk layout for a given template type.
3051

3052
  """
3053
  #TODO: compute space requirements
3054

    
3055
  vgname = lu.cfg.GetVGName()
3056
  if template_name == constants.DT_DISKLESS:
3057
    disks = []
3058
  elif template_name == constants.DT_PLAIN:
3059
    if len(secondary_nodes) != 0:
3060
      raise errors.ProgrammerError("Wrong template configuration")
3061

    
3062
    names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3063
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3064
                           logical_id=(vgname, names[0]),
3065
                           iv_name = "sda")
3066
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3067
                           logical_id=(vgname, names[1]),
3068
                           iv_name = "sdb")
3069
    disks = [sda_dev, sdb_dev]
3070
  elif template_name == constants.DT_DRBD8:
3071
    if len(secondary_nodes) != 1:
3072
      raise errors.ProgrammerError("Wrong template configuration")
3073
    remote_node = secondary_nodes[0]
3074
    (minor_pa, minor_pb,
3075
     minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3076
      [primary_node, primary_node, remote_node, remote_node], instance_name)
3077

    
3078
    names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3079
                                      ".sdb_data", ".sdb_meta"])
3080
    drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3081
                                        disk_sz, names[0:2], "sda",
3082
                                        minor_pa, minor_sa)
3083
    drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3084
                                        swap_sz, names[2:4], "sdb",
3085
                                        minor_pb, minor_sb)
3086
    disks = [drbd_sda_dev, drbd_sdb_dev]
3087
  elif template_name == constants.DT_FILE:
3088
    if len(secondary_nodes) != 0:
3089
      raise errors.ProgrammerError("Wrong template configuration")
3090

    
3091
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3092
                                iv_name="sda", logical_id=(file_driver,
3093
                                "%s/sda" % file_storage_dir))
3094
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3095
                                iv_name="sdb", logical_id=(file_driver,
3096
                                "%s/sdb" % file_storage_dir))
3097
    disks = [file_sda_dev, file_sdb_dev]
3098
  else:
3099
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3100
  return disks
3101

    
3102

    
3103
def _GetInstanceInfoText(instance):
3104
  """Compute that text that should be added to the disk's metadata.
3105

3106
  """
3107
  return "originstname+%s" % instance.name
3108

    
3109

    
3110
def _CreateDisks(lu, instance):
3111
  """Create all disks for an instance.
3112

3113
  This abstracts away some work from AddInstance.
3114

3115
  Args:
3116
    instance: the instance object
3117

3118
  Returns:
3119
    True or False showing the success of the creation process
3120

3121
  """
3122
  info = _GetInstanceInfoText(instance)
3123

    
3124
  if instance.disk_template == constants.DT_FILE:
3125
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3126
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3127
                                                 file_storage_dir)
3128

    
3129
    if not result:
3130
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3131
      return False
3132

    
3133
    if not result[0]:
3134
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3135
      return False
3136

    
3137
  for device in instance.disks:
3138
    logger.Info("creating volume %s for instance %s" %
3139
                (device.iv_name, instance.name))
3140
    #HARDCODE
3141
    for secondary_node in instance.secondary_nodes:
3142
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3143
                                        device, False, info):
3144
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3145
                     (device.iv_name, device, secondary_node))
3146
        return False
3147
    #HARDCODE
3148
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3149
                                    instance, device, info):
3150
      logger.Error("failed to create volume %s on primary!" %
3151
                   device.iv_name)
3152
      return False
3153

    
3154
  return True
3155

    
3156

    
3157
def _RemoveDisks(lu, instance):
3158
  """Remove all disks for an instance.
3159

3160
  This abstracts away some work from `AddInstance()` and
3161
  `RemoveInstance()`. Note that in case some of the devices couldn't
3162
  be removed, the removal will continue with the other ones (compare
3163
  with `_CreateDisks()`).
3164

3165
  Args:
3166
    instance: the instance object
3167

3168
  Returns:
3169
    True or False showing the success of the removal proces
3170

3171
  """
3172
  logger.Info("removing block devices for instance %s" % instance.name)
3173

    
3174
  result = True
3175
  for device in instance.disks:
3176
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3177
      lu.cfg.SetDiskID(disk, node)
3178
      if not lu.rpc.call_blockdev_remove(node, disk):
3179
        logger.Error("could not remove block device %s on node %s,"
3180
                     " continuing anyway" %
3181
                     (device.iv_name, node))
3182
        result = False
3183

    
3184
  if instance.disk_template == constants.DT_FILE:
3185
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3186
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3187
                                               file_storage_dir):
3188
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3189
      result = False
3190

    
3191
  return result
3192

    
3193

    
3194
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3195
  """Compute disk size requirements in the volume group
3196

3197
  This is currently hard-coded for the two-drive layout.
3198

3199
  """
3200
  # Required free disk space as a function of disk and swap space
3201
  req_size_dict = {
3202
    constants.DT_DISKLESS: None,
3203
    constants.DT_PLAIN: disk_size + swap_size,
3204
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3205
    constants.DT_DRBD8: disk_size + swap_size + 256,
3206
    constants.DT_FILE: None,
3207
  }
3208

    
3209
  if disk_template not in req_size_dict:
3210
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3211
                                 " is unknown" %  disk_template)
3212

    
3213
  return req_size_dict[disk_template]
3214

    
3215

    
3216
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3217
  """Hypervisor parameter validation.
3218

3219
  This function abstract the hypervisor parameter validation to be
3220
  used in both instance create and instance modify.
3221

3222
  @type lu: L{LogicalUnit}
3223
  @param lu: the logical unit for which we check
3224
  @type nodenames: list
3225
  @param nodenames: the list of nodes on which we should check
3226
  @type hvname: string
3227
  @param hvname: the name of the hypervisor we should use
3228
  @type hvparams: dict
3229
  @param hvparams: the parameters which we need to check
3230
  @raise errors.OpPrereqError: if the parameters are not valid
3231

3232
  """
3233
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3234
                                                  hvname,
3235
                                                  hvparams)
3236
  for node in nodenames:
3237
    info = hvinfo.get(node, None)
3238
    if not info or not isinstance(info, (tuple, list)):
3239
      raise errors.OpPrereqError("Cannot get current information"
3240
                                 " from node '%s' (%s)" % (node, info))
3241
    if not info[0]:
3242
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3243
                                 " %s" % info[1])
3244

    
3245

    
3246
class LUCreateInstance(LogicalUnit):
3247
  """Create an instance.
3248

3249
  """
3250
  HPATH = "instance-add"
3251
  HTYPE = constants.HTYPE_INSTANCE
3252
  _OP_REQP = ["instance_name", "disk_size",
3253
              "disk_template", "swap_size", "mode", "start",
3254
              "wait_for_sync", "ip_check", "mac",
3255
              "hvparams", "beparams"]
3256
  REQ_BGL = False
3257

    
3258
  def _ExpandNode(self, node):
3259
    """Expands and checks one node name.
3260

3261
    """
3262
    node_full = self.cfg.ExpandNodeName(node)
3263
    if node_full is None:
3264
      raise errors.OpPrereqError("Unknown node %s" % node)
3265
    return node_full
3266

    
3267
  def ExpandNames(self):
3268
    """ExpandNames for CreateInstance.
3269

3270
    Figure out the right locks for instance creation.
3271

3272
    """
3273
    self.needed_locks = {}
3274

    
3275
    # set optional parameters to none if they don't exist
3276
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3277
      if not hasattr(self.op, attr):
3278
        setattr(self.op, attr, None)
3279

    
3280
    # cheap checks, mostly valid constants given
3281

    
3282
    # verify creation mode
3283
    if self.op.mode not in (constants.INSTANCE_CREATE,
3284
                            constants.INSTANCE_IMPORT):
3285
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3286
                                 self.op.mode)
3287

    
3288
    # disk template and mirror node verification
3289
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3290
      raise errors.OpPrereqError("Invalid disk template name")
3291

    
3292
    if self.op.hypervisor is None:
3293
      self.op.hypervisor = self.cfg.GetHypervisorType()
3294

    
3295
    cluster = self.cfg.GetClusterInfo()
3296
    enabled_hvs = cluster.enabled_hypervisors
3297
    if self.op.hypervisor not in enabled_hvs:
3298
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3299
                                 " cluster (%s)" % (self.op.hypervisor,
3300
                                  ",".join(enabled_hvs)))
3301

    
3302
    # check hypervisor parameter syntax (locally)
3303

    
3304
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3305
                                  self.op.hvparams)
3306
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3307
    hv_type.CheckParameterSyntax(filled_hvp)
3308

    
3309
    # fill and remember the beparams dict
3310
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3311
                                    self.op.beparams)
3312

    
3313
    #### instance parameters check
3314

    
3315
    # instance name verification
3316
    hostname1 = utils.HostInfo(self.op.instance_name)
3317
    self.op.instance_name = instance_name = hostname1.name
3318

    
3319
    # this is just a preventive check, but someone might still add this
3320
    # instance in the meantime, and creation will fail at lock-add time
3321
    if instance_name in self.cfg.GetInstanceList():
3322
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3323
                                 instance_name)
3324

    
3325
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3326

    
3327
    # ip validity checks
3328
    ip = getattr(self.op, "ip", None)
3329
    if ip is None or ip.lower() == "none":
3330
      inst_ip = None
3331
    elif ip.lower() == "auto":
3332
      inst_ip = hostname1.ip
3333
    else:
3334
      if not utils.IsValidIP(ip):
3335
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3336
                                   " like a valid IP" % ip)
3337
      inst_ip = ip
3338
    self.inst_ip = self.op.ip = inst_ip
3339
    # used in CheckPrereq for ip ping check
3340
    self.check_ip = hostname1.ip
3341

    
3342
    # MAC address verification
3343
    if self.op.mac != "auto":
3344
      if not utils.IsValidMac(self.op.mac.lower()):
3345
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3346
                                   self.op.mac)
3347

    
3348
    # file storage checks
3349
    if (self.op.file_driver and
3350
        not self.op.file_driver in constants.FILE_DRIVER):
3351
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3352
                                 self.op.file_driver)
3353

    
3354
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3355
      raise errors.OpPrereqError("File storage directory path not absolute")
3356

    
3357
    ### Node/iallocator related checks
3358
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3359
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3360
                                 " node must be given")
3361

    
3362
    if self.op.iallocator:
3363
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3364
    else:
3365
      self.op.pnode = self._ExpandNode(self.op.pnode)
3366
      nodelist = [self.op.pnode]
3367
      if self.op.snode is not None:
3368
        self.op.snode = self._ExpandNode(self.op.snode)
3369
        nodelist.append(self.op.snode)
3370
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3371

    
3372
    # in case of import lock the source node too
3373
    if self.op.mode == constants.INSTANCE_IMPORT:
3374
      src_node = getattr(self.op, "src_node", None)
3375
      src_path = getattr(self.op, "src_path", None)
3376

    
3377
      if src_node is None or src_path is None:
3378
        raise errors.OpPrereqError("Importing an instance requires source"
3379
                                   " node and path options")
3380

    
3381
      if not os.path.isabs(src_path):
3382
        raise errors.OpPrereqError("The source path must be absolute")
3383

    
3384
      self.op.src_node = src_node = self._ExpandNode(src_node)
3385
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3386
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3387

    
3388
    else: # INSTANCE_CREATE
3389
      if getattr(self.op, "os_type", None) is None:
3390
        raise errors.OpPrereqError("No guest OS specified")
3391

    
3392
  def _RunAllocator(self):
3393
    """Run the allocator based on input opcode.
3394

3395
    """
3396
    disks = [{"size": self.op.disk_size, "mode": "w"},
3397
             {"size": self.op.swap_size, "mode": "w"}]
3398
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3399
             "bridge": self.op.bridge}]
3400
    ial = IAllocator(self,
3401
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3402
                     name=self.op.instance_name,
3403
                     disk_template=self.op.disk_template,
3404
                     tags=[],
3405
                     os=self.op.os_type,
3406
                     vcpus=self.be_full[constants.BE_VCPUS],
3407
                     mem_size=self.be_full[constants.BE_MEMORY],
3408
                     disks=disks,
3409
                     nics=nics,
3410
                     )
3411

    
3412
    ial.Run(self.op.iallocator)
3413

    
3414
    if not ial.success:
3415
      raise errors.OpPrereqError("Can't compute nodes using"
3416
                                 " iallocator '%s': %s" % (self.op.iallocator,
3417
                                                           ial.info))
3418
    if len(ial.nodes) != ial.required_nodes:
3419
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3420
                                 " of nodes (%s), required %s" %
3421
                                 (self.op.iallocator, len(ial.nodes),
3422
                                  ial.required_nodes))
3423
    self.op.pnode = ial.nodes[0]
3424
    logger.ToStdout("Selected nodes for the instance: %s" %
3425
                    (", ".join(ial.nodes),))
3426
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3427
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3428
    if ial.required_nodes == 2:
3429
      self.op.snode = ial.nodes[1]
3430

    
3431
  def BuildHooksEnv(self):
3432
    """Build hooks env.
3433

3434
    This runs on master, primary and secondary nodes of the instance.
3435

3436
    """
3437
    env = {
3438
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3439
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3440
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3441
      "INSTANCE_ADD_MODE": self.op.mode,
3442
      }
3443
    if self.op.mode == constants.INSTANCE_IMPORT:
3444
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3445
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3446
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3447

    
3448
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3449
      primary_node=self.op.pnode,
3450
      secondary_nodes=self.secondaries,
3451
      status=self.instance_status,
3452
      os_type=self.op.os_type,
3453
      memory=self.be_full[constants.BE_MEMORY],
3454
      vcpus=self.be_full[constants.BE_VCPUS],
3455
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3456
    ))
3457

    
3458
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3459
          self.secondaries)
3460
    return env, nl, nl
3461

    
3462

    
3463
  def CheckPrereq(self):
3464
    """Check prerequisites.
3465

3466
    """
3467
    if (not self.cfg.GetVGName() and
3468
        self.op.disk_template not in constants.DTS_NOT_LVM):
3469
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3470
                                 " instances")
3471

    
3472

    
3473
    if self.op.mode == constants.INSTANCE_IMPORT:
3474
      src_node = self.op.src_node
3475
      src_path = self.op.src_path
3476

    
3477
      export_info = self.rpc.call_export_info(src_node, src_path)
3478

    
3479
      if not export_info:
3480
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3481

    
3482
      if not export_info.has_section(constants.INISECT_EXP):
3483
        raise errors.ProgrammerError("Corrupted export config")
3484

    
3485
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3486
      if (int(ei_version) != constants.EXPORT_VERSION):
3487
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3488
                                   (ei_version, constants.EXPORT_VERSION))
3489

    
3490
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3491
        raise errors.OpPrereqError("Can't import instance with more than"
3492
                                   " one data disk")
3493

    
3494
      # FIXME: are the old os-es, disk sizes, etc. useful?
3495
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3496
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3497
                                                         'disk0_dump'))
3498
      self.src_image = diskimage
3499

    
3500
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3501

    
3502
    if self.op.start and not self.op.ip_check:
3503
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3504
                                 " adding an instance in start mode")
3505

    
3506
    if self.op.ip_check:
3507
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3508
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3509
                                   (self.check_ip, self.op.instance_name))
3510

    
3511
    # bridge verification
3512
    bridge = getattr(self.op, "bridge", None)
3513
    if bridge is None:
3514
      self.op.bridge = self.cfg.GetDefBridge()
3515
    else:
3516
      self.op.bridge = bridge
3517

    
3518
    #### allocator run
3519

    
3520
    if self.op.iallocator is not None:
3521
      self._RunAllocator()
3522

    
3523
    #### node related checks
3524

    
3525
    # check primary node
3526
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3527
    assert self.pnode is not None, \
3528
      "Cannot retrieve locked node %s" % self.op.pnode
3529
    self.secondaries = []
3530

    
3531
    # mirror node verification
3532
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3533
      if self.op.snode is None:
3534
        raise errors.OpPrereqError("The networked disk templates need"
3535
                                   " a mirror node")
3536
      if self.op.snode == pnode.name:
3537
        raise errors.OpPrereqError("The secondary node cannot be"
3538
                                   " the primary node.")
3539
      self.secondaries.append(self.op.snode)
3540

    
3541
    nodenames = [pnode.name] + self.secondaries
3542

    
3543
    req_size = _ComputeDiskSize(self.op.disk_template,
3544
                                self.op.disk_size, self.op.swap_size)
3545

    
3546
    # Check lv size requirements
3547
    if req_size is not None:
3548
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3549
                                         self.op.hypervisor)
3550
      for node in nodenames:
3551
        info = nodeinfo.get(node, None)
3552
        if not info:
3553
          raise errors.OpPrereqError("Cannot get current information"
3554
                                     " from node '%s'" % node)
3555
        vg_free = info.get('vg_free', None)
3556
        if not isinstance(vg_free, int):
3557
          raise errors.OpPrereqError("Can't compute free disk space on"
3558
                                     " node %s" % node)
3559
        if req_size > info['vg_free']:
3560
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3561
                                     " %d MB available, %d MB required" %
3562
                                     (node, info['vg_free'], req_size))
3563

    
3564
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3565

    
3566
    # os verification
3567
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3568
    if not os_obj:
3569
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3570
                                 " primary node"  % self.op.os_type)
3571

    
3572
    # bridge check on primary node
3573
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3574
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3575
                                 " destination node '%s'" %
3576
                                 (self.op.bridge, pnode.name))
3577

    
3578
    # memory check on primary node
3579
    if self.op.start:
3580
      _CheckNodeFreeMemory(self, self.pnode.name,
3581
                           "creating instance %s" % self.op.instance_name,
3582
                           self.be_full[constants.BE_MEMORY],
3583
                           self.op.hypervisor)
3584

    
3585
    if self.op.start:
3586
      self.instance_status = 'up'
3587
    else:
3588
      self.instance_status = 'down'
3589

    
3590
  def Exec(self, feedback_fn):
3591
    """Create and add the instance to the cluster.
3592

3593
    """
3594
    instance = self.op.instance_name
3595
    pnode_name = self.pnode.name
3596

    
3597
    if self.op.mac == "auto":
3598
      mac_address = self.cfg.GenerateMAC()
3599
    else:
3600
      mac_address = self.op.mac
3601

    
3602
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3603
    if self.inst_ip is not None:
3604
      nic.ip = self.inst_ip
3605

    
3606
    ht_kind = self.op.hypervisor
3607
    if ht_kind in constants.HTS_REQ_PORT:
3608
      network_port = self.cfg.AllocatePort()
3609
    else:
3610
      network_port = None
3611

    
3612
    ##if self.op.vnc_bind_address is None:
3613
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3614

    
3615
    # this is needed because os.path.join does not accept None arguments
3616
    if self.op.file_storage_dir is None:
3617
      string_file_storage_dir = ""
3618
    else:
3619
      string_file_storage_dir = self.op.file_storage_dir
3620

    
3621
    # build the full file storage dir path
3622
    file_storage_dir = os.path.normpath(os.path.join(
3623
                                        self.cfg.GetFileStorageDir(),
3624
                                        string_file_storage_dir, instance))
3625

    
3626

    
3627
    disks = _GenerateDiskTemplate(self,
3628
                                  self.op.disk_template,
3629
                                  instance, pnode_name,
3630
                                  self.secondaries, self.op.disk_size,
3631
                                  self.op.swap_size,
3632
                                  file_storage_dir,
3633
                                  self.op.file_driver)
3634

    
3635
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3636
                            primary_node=pnode_name,
3637
                            nics=[nic], disks=disks,
3638
                            disk_template=self.op.disk_template,
3639
                            status=self.instance_status,
3640
                            network_port=network_port,
3641
                            beparams=self.op.beparams,
3642
                            hvparams=self.op.hvparams,
3643
                            hypervisor=self.op.hypervisor,
3644
                            )
3645

    
3646
    feedback_fn("* creating instance disks...")
3647
    if not _CreateDisks(self, iobj):
3648
      _RemoveDisks(self, iobj)
3649
      self.cfg.ReleaseDRBDMinors(instance)
3650
      raise errors.OpExecError("Device creation failed, reverting...")
3651

    
3652
    feedback_fn("adding instance %s to cluster config" % instance)
3653

    
3654
    self.cfg.AddInstance(iobj)
3655
    # Declare that we don't want to remove the instance lock anymore, as we've
3656
    # added the instance to the config
3657
    del self.remove_locks[locking.LEVEL_INSTANCE]
3658
    # Remove the temp. assignements for the instance's drbds
3659
    self.cfg.ReleaseDRBDMinors(instance)
3660

    
3661
    if self.op.wait_for_sync:
3662
      disk_abort = not _WaitForSync(self, iobj)
3663
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3664
      # make sure the disks are not degraded (still sync-ing is ok)
3665
      time.sleep(15)
3666
      feedback_fn("* checking mirrors status")
3667
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3668
    else:
3669
      disk_abort = False
3670

    
3671
    if disk_abort:
3672
      _RemoveDisks(self, iobj)
3673
      self.cfg.RemoveInstance(iobj.name)
3674
      # Make sure the instance lock gets removed
3675
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3676
      raise errors.OpExecError("There are some degraded disks for"
3677
                               " this instance")
3678

    
3679
    feedback_fn("creating os for instance %s on node %s" %
3680
                (instance, pnode_name))
3681

    
3682
    if iobj.disk_template != constants.DT_DISKLESS:
3683
      if self.op.mode == constants.INSTANCE_CREATE:
3684
        feedback_fn("* running the instance OS create scripts...")
3685
        if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3686
          raise errors.OpExecError("could not add os for instance %s"
3687
                                   " on node %s" %
3688
                                   (instance, pnode_name))
3689

    
3690
      elif self.op.mode == constants.INSTANCE_IMPORT:
3691
        feedback_fn("* running the instance OS import scripts...")
3692
        src_node = self.op.src_node
3693
        src_image = self.src_image
3694
        cluster_name = self.cfg.GetClusterName()
3695
        if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3696
                                                src_node, src_image,
3697
                                                cluster_name):
3698
          raise errors.OpExecError("Could not import os for instance"
3699
                                   " %s on node %s" %
3700
                                   (instance, pnode_name))
3701
      else:
3702
        # also checked in the prereq part
3703
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3704
                                     % self.op.mode)
3705

    
3706
    if self.op.start:
3707
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3708
      feedback_fn("* starting instance...")
3709
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3710
        raise errors.OpExecError("Could not start instance")
3711

    
3712

    
3713
class LUConnectConsole(NoHooksLU):
3714
  """Connect to an instance's console.
3715

3716
  This is somewhat special in that it returns the command line that
3717
  you need to run on the master node in order to connect to the
3718
  console.
3719

3720
  """
3721
  _OP_REQP = ["instance_name"]
3722
  REQ_BGL = False
3723

    
3724
  def ExpandNames(self):
3725
    self._ExpandAndLockInstance()
3726

    
3727
  def CheckPrereq(self):
3728
    """Check prerequisites.
3729

3730
    This checks that the instance is in the cluster.
3731

3732
    """
3733
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3734
    assert self.instance is not None, \
3735
      "Cannot retrieve locked instance %s" % self.op.instance_name
3736

    
3737
  def Exec(self, feedback_fn):
3738
    """Connect to the console of an instance
3739

3740
    """
3741
    instance = self.instance
3742
    node = instance.primary_node
3743

    
3744
    node_insts = self.rpc.call_instance_list([node],
3745
                                             [instance.hypervisor])[node]
3746
    if node_insts is False:
3747
      raise errors.OpExecError("Can't connect to node %s." % node)
3748

    
3749
    if instance.name not in node_insts:
3750
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3751

    
3752
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3753

    
3754
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3755
    console_cmd = hyper.GetShellCommandForConsole(instance)
3756

    
3757
    # build ssh cmdline
3758
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3759

    
3760

    
3761
class LUReplaceDisks(LogicalUnit):
3762
  """Replace the disks of an instance.
3763

3764
  """
3765
  HPATH = "mirrors-replace"
3766
  HTYPE = constants.HTYPE_INSTANCE
3767
  _OP_REQP = ["instance_name", "mode", "disks"]
3768
  REQ_BGL = False
3769

    
3770
  def ExpandNames(self):
3771
    self._ExpandAndLockInstance()
3772

    
3773
    if not hasattr(self.op, "remote_node"):
3774
      self.op.remote_node = None
3775

    
3776
    ia_name = getattr(self.op, "iallocator", None)
3777
    if ia_name is not None:
3778
      if self.op.remote_node is not None:
3779
        raise errors.OpPrereqError("Give either the iallocator or the new"
3780
                                   " secondary, not both")
3781
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3782
    elif self.op.remote_node is not None:
3783
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3784
      if remote_node is None:
3785
        raise errors.OpPrereqError("Node '%s' not known" %
3786
                                   self.op.remote_node)
3787
      self.op.remote_node = remote_node
3788
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3789
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3790
    else:
3791
      self.needed_locks[locking.LEVEL_NODE] = []
3792
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3793

    
3794
  def DeclareLocks(self, level):
3795
    # If we're not already locking all nodes in the set we have to declare the
3796
    # instance's primary/secondary nodes.
3797
    if (level == locking.LEVEL_NODE and
3798
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3799
      self._LockInstancesNodes()
3800

    
3801
  def _RunAllocator(self):
3802
    """Compute a new secondary node using an IAllocator.
3803

3804
    """
3805
    ial = IAllocator(self,
3806
                     mode=constants.IALLOCATOR_MODE_RELOC,
3807
                     name=self.op.instance_name,
3808
                     relocate_from=[self.sec_node])
3809

    
3810
    ial.Run(self.op.iallocator)
3811

    
3812
    if not ial.success:
3813
      raise errors.OpPrereqError("Can't compute nodes using"
3814
                                 " iallocator '%s': %s" % (self.op.iallocator,
3815
                                                           ial.info))
3816
    if len(ial.nodes) != ial.required_nodes:
3817
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3818
                                 " of nodes (%s), required %s" %
3819
                                 (len(ial.nodes), ial.required_nodes))
3820
    self.op.remote_node = ial.nodes[0]
3821
    logger.ToStdout("Selected new secondary for the instance: %s" %
3822
                    self.op.remote_node)
3823

    
3824
  def BuildHooksEnv(self):
3825
    """Build hooks env.
3826

3827
    This runs on the master, the primary and all the secondaries.
3828

3829
    """
3830
    env = {
3831
      "MODE": self.op.mode,
3832
      "NEW_SECONDARY": self.op.remote_node,
3833
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3834
      }
3835
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3836
    nl = [
3837
      self.cfg.GetMasterNode(),
3838
      self.instance.primary_node,
3839
      ]
3840
    if self.op.remote_node is not None:
3841
      nl.append(self.op.remote_node)
3842
    return env, nl, nl
3843

    
3844
  def CheckPrereq(self):
3845
    """Check prerequisites.
3846

3847
    This checks that the instance is in the cluster.
3848

3849
    """
3850
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3851
    assert instance is not None, \
3852
      "Cannot retrieve locked instance %s" % self.op.instance_name
3853
    self.instance = instance
3854

    
3855
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3856
      raise errors.OpPrereqError("Instance's disk layout is not"
3857
                                 " network mirrored.")
3858

    
3859
    if len(instance.secondary_nodes) != 1:
3860
      raise errors.OpPrereqError("The instance has a strange layout,"
3861
                                 " expected one secondary but found %d" %
3862
                                 len(instance.secondary_nodes))
3863

    
3864
    self.sec_node = instance.secondary_nodes[0]
3865

    
3866
    ia_name = getattr(self.op, "iallocator", None)
3867
    if ia_name is not None:
3868
      self._RunAllocator()
3869

    
3870
    remote_node = self.op.remote_node
3871
    if remote_node is not None:
3872
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3873
      assert self.remote_node_info is not None, \
3874
        "Cannot retrieve locked node %s" % remote_node
3875
    else:
3876
      self.remote_node_info = None
3877
    if remote_node == instance.primary_node:
3878
      raise errors.OpPrereqError("The specified node is the primary node of"
3879
                                 " the instance.")
3880
    elif remote_node == self.sec_node:
3881
      if self.op.mode == constants.REPLACE_DISK_SEC:
3882
        # this is for DRBD8, where we can't execute the same mode of
3883
        # replacement as for drbd7 (no different port allocated)
3884
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3885
                                   " replacement")
3886
    if instance.disk_template == constants.DT_DRBD8:
3887
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3888
          remote_node is not None):
3889
        # switch to replace secondary mode
3890
        self.op.mode = constants.REPLACE_DISK_SEC
3891

    
3892
      if self.op.mode == constants.REPLACE_DISK_ALL:
3893
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3894
                                   " secondary disk replacement, not"
3895
                                   " both at once")
3896
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3897
        if remote_node is not None:
3898
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3899
                                     " the secondary while doing a primary"
3900
                                     " node disk replacement")
3901
        self.tgt_node = instance.primary_node
3902
        self.oth_node = instance.secondary_nodes[0]
3903
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3904
        self.new_node = remote_node # this can be None, in which case
3905
                                    # we don't change the secondary
3906
        self.tgt_node = instance.secondary_nodes[0]
3907
        self.oth_node = instance.primary_node
3908
      else:
3909
        raise errors.ProgrammerError("Unhandled disk replace mode")
3910

    
3911
    for name in self.op.disks:
3912
      if instance.FindDisk(name) is None:
3913
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3914
                                   (name, instance.name))
3915

    
3916
  def _ExecD8DiskOnly(self, feedback_fn):
3917
    """Replace a disk on the primary or secondary for dbrd8.
3918

3919
    The algorithm for replace is quite complicated:
3920
      - for each disk to be replaced:
3921
        - create new LVs on the target node with unique names
3922
        - detach old LVs from the drbd device
3923
        - rename old LVs to name_replaced.<time_t>
3924
        - rename new LVs to old LVs
3925
        - attach the new LVs (with the old names now) to the drbd device
3926
      - wait for sync across all devices
3927
      - for each modified disk:
3928
        - remove old LVs (which have the name name_replaces.<time_t>)
3929

3930
    Failures are not very well handled.
3931

3932
    """
3933
    steps_total = 6
3934
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3935
    instance = self.instance
3936
    iv_names = {}
3937
    vgname = self.cfg.GetVGName()
3938
    # start of work
3939
    cfg = self.cfg
3940
    tgt_node = self.tgt_node
3941
    oth_node = self.oth_node
3942

    
3943
    # Step: check device activation
3944
    self.proc.LogStep(1, steps_total, "check device existence")
3945
    info("checking volume groups")
3946
    my_vg = cfg.GetVGName()
3947
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3948
    if not results:
3949
      raise errors.OpExecError("Can't list volume groups on the nodes")
3950
    for node in oth_node, tgt_node:
3951
      res = results.get(node, False)
3952
      if not res or my_vg not in res:
3953
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3954
                                 (my_vg, node))
3955
    for dev in instance.disks:
3956
      if not dev.iv_name in self.op.disks:
3957
        continue
3958
      for node in tgt_node, oth_node:
3959
        info("checking %s on %s" % (dev.iv_name, node))
3960
        cfg.SetDiskID(dev, node)
3961
        if not self.rpc.call_blockdev_find(node, dev):
3962
          raise errors.OpExecError("Can't find device %s on node %s" %
3963
                                   (dev.iv_name, node))
3964

    
3965
    # Step: check other node consistency
3966
    self.proc.LogStep(2, steps_total, "check peer consistency")
3967
    for dev in instance.disks:
3968
      if not dev.iv_name in self.op.disks:
3969
        continue
3970
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3971
      if not _CheckDiskConsistency(self, dev, oth_node,
3972
                                   oth_node==instance.primary_node):
3973
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3974
                                 " to replace disks on this node (%s)" %
3975
                                 (oth_node, tgt_node))
3976

    
3977
    # Step: create new storage
3978
    self.proc.LogStep(3, steps_total, "allocate new storage")
3979
    for dev in instance.disks:
3980
      if not dev.iv_name in self.op.disks:
3981
        continue
3982
      size = dev.size
3983
      cfg.SetDiskID(dev, tgt_node)
3984
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3985
      names = _GenerateUniqueNames(self, lv_names)
3986
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3987
                             logical_id=(vgname, names[0]))
3988
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3989
                             logical_id=(vgname, names[1]))
3990
      new_lvs = [lv_data, lv_meta]
3991
      old_lvs = dev.children
3992
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3993
      info("creating new local storage on %s for %s" %
3994
           (tgt_node, dev.iv_name))
3995
      # since we *always* want to create this LV, we use the
3996
      # _Create...OnPrimary (which forces the creation), even if we
3997
      # are talking about the secondary node
3998
      for new_lv in new_lvs:
3999
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4000
                                        _GetInstanceInfoText(instance)):
4001
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4002
                                   " node '%s'" %
4003
                                   (new_lv.logical_id[1], tgt_node))
4004

    
4005
    # Step: for each lv, detach+rename*2+attach
4006
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4007
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4008
      info("detaching %s drbd from local storage" % dev.iv_name)
4009
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4010
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4011
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4012
      #dev.children = []
4013
      #cfg.Update(instance)
4014

    
4015
      # ok, we created the new LVs, so now we know we have the needed
4016
      # storage; as such, we proceed on the target node to rename
4017
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4018
      # using the assumption that logical_id == physical_id (which in
4019
      # turn is the unique_id on that node)
4020

    
4021
      # FIXME(iustin): use a better name for the replaced LVs
4022
      temp_suffix = int(time.time())
4023
      ren_fn = lambda d, suff: (d.physical_id[0],
4024
                                d.physical_id[1] + "_replaced-%s" % suff)
4025
      # build the rename list based on what LVs exist on the node
4026
      rlist = []
4027
      for to_ren in old_lvs:
4028
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4029
        if find_res is not None: # device exists
4030
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4031

    
4032
      info("renaming the old LVs on the target node")
4033
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4034
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4035
      # now we rename the new LVs to the old LVs
4036
      info("renaming the new LVs on the target node")
4037
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4038
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4039
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4040

    
4041
      for old, new in zip(old_lvs, new_lvs):
4042
        new.logical_id = old.logical_id
4043
        cfg.SetDiskID(new, tgt_node)
4044

    
4045
      for disk in old_lvs:
4046
        disk.logical_id = ren_fn(disk, temp_suffix)
4047
        cfg.SetDiskID(disk, tgt_node)
4048

    
4049
      # now that the new lvs have the old name, we can add them to the device
4050
      info("adding new mirror component on %s" % tgt_node)
4051
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4052
        for new_lv in new_lvs:
4053
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4054
            warning("Can't rollback device %s", hint="manually cleanup unused"
4055
                    " logical volumes")
4056
        raise errors.OpExecError("Can't add local storage to drbd")
4057

    
4058
      dev.children = new_lvs
4059
      cfg.Update(instance)
4060

    
4061
    # Step: wait for sync
4062

    
4063
    # this can fail as the old devices are degraded and _WaitForSync
4064
    # does a combined result over all disks, so we don't check its
4065
    # return value
4066
    self.proc.LogStep(5, steps_total, "sync devices")
4067
    _WaitForSync(self, instance, unlock=True)
4068

    
4069
    # so check manually all the devices
4070
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4071
      cfg.SetDiskID(dev, instance.primary_node)
4072
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4073
      if is_degr:
4074
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4075

    
4076
    # Step: remove old storage
4077
    self.proc.LogStep(6, steps_total, "removing old storage")
4078
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4079
      info("remove logical volumes for %s" % name)
4080
      for lv in old_lvs:
4081
        cfg.SetDiskID(lv, tgt_node)
4082
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4083
          warning("Can't remove old LV", hint="manually remove unused LVs")
4084
          continue
4085

    
4086
  def _ExecD8Secondary(self, feedback_fn):
4087
    """Replace the secondary node for drbd8.
4088

4089
    The algorithm for replace is quite complicated:
4090
      - for all disks of the instance:
4091
        - create new LVs on the new node with same names
4092
        - shutdown the drbd device on the old secondary
4093
        - disconnect the drbd network on the primary
4094
        - create the drbd device on the new secondary
4095
        - network attach the drbd on the primary, using an artifice:
4096
          the drbd code for Attach() will connect to the network if it
4097
          finds a device which is connected to the good local disks but
4098
          not network enabled
4099
      - wait for sync across all devices
4100
      - remove all disks from the old secondary
4101

4102
    Failures are not very well handled.
4103

4104
    """
4105
    steps_total = 6
4106
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4107
    instance = self.instance
4108
    iv_names = {}
4109
    vgname = self.cfg.GetVGName()
4110
    # start of work
4111
    cfg = self.cfg
4112
    old_node = self.tgt_node
4113
    new_node = self.new_node
4114
    pri_node = instance.primary_node
4115

    
4116
    # Step: check device activation
4117
    self.proc.LogStep(1, steps_total, "check device existence")
4118
    info("checking volume groups")
4119
    my_vg = cfg.GetVGName()
4120
    results = self.rpc.call_vg_list([pri_node, new_node])
4121
    if not results:
4122
      raise errors.OpExecError("Can't list volume groups on the nodes")
4123
    for node in pri_node, new_node:
4124
      res = results.get(node, False)
4125
      if not res or my_vg not in res:
4126
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4127
                                 (my_vg, node))
4128
    for dev in instance.disks:
4129
      if not dev.iv_name in self.op.disks:
4130
        continue
4131
      info("checking %s on %s" % (dev.iv_name, pri_node))
4132
      cfg.SetDiskID(dev, pri_node)
4133
      if not self.rpc.call_blockdev_find(pri_node, dev):
4134
        raise errors.OpExecError("Can't find device %s on node %s" %
4135
                                 (dev.iv_name, pri_node))
4136

    
4137
    # Step: check other node consistency
4138
    self.proc.LogStep(2, steps_total, "check peer consistency")
4139
    for dev in instance.disks:
4140
      if not dev.iv_name in self.op.disks:
4141
        continue
4142
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4143
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4144
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4145
                                 " unsafe to replace the secondary" %
4146
                                 pri_node)
4147

    
4148
    # Step: create new storage
4149
    self.proc.LogStep(3, steps_total, "allocate new storage")
4150
    for dev in instance.disks:
4151
      size = dev.size
4152
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4153
      # since we *always* want to create this LV, we use the
4154
      # _Create...OnPrimary (which forces the creation), even if we
4155
      # are talking about the secondary node
4156
      for new_lv in dev.children:
4157
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4158
                                        _GetInstanceInfoText(instance)):
4159
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4160
                                   " node '%s'" %
4161
                                   (new_lv.logical_id[1], new_node))
4162

    
4163

    
4164
    # Step 4: dbrd minors and drbd setups changes
4165
    # after this, we must manually remove the drbd minors on both the
4166
    # error and the success paths
4167
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4168
                                   instance.name)
4169
    logging.debug("Allocated minors %s" % (minors,))
4170
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4171
    for dev, new_minor in zip(instance.disks, minors):
4172
      size = dev.size
4173
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4174
      # create new devices on new_node
4175
      if pri_node == dev.logical_id[0]:
4176
        new_logical_id = (pri_node, new_node,
4177
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4178
                          dev.logical_id[5])
4179
      else:
4180
        new_logical_id = (new_node, pri_node,
4181
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4182
                          dev.logical_id[5])
4183
      iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4184
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4185
                    new_logical_id)
4186
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4187
                              logical_id=new_logical_id,
4188
                              children=dev.children)
4189
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4190
                                        new_drbd, False,
4191
                                        _GetInstanceInfoText(instance)):
4192
        self.cfg.ReleaseDRBDMinors(instance.name)
4193
        raise errors.OpExecError("Failed to create new DRBD on"
4194
                                 " node '%s'" % new_node)
4195

    
4196
    for dev in instance.disks:
4197
      # we have new devices, shutdown the drbd on the old secondary
4198
      info("shutting down drbd for %s on old node" % dev.iv_name)
4199
      cfg.SetDiskID(dev, old_node)
4200
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4201
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4202
                hint="Please cleanup this device manually as soon as possible")
4203

    
4204
    info("detaching primary drbds from the network (=> standalone)")
4205
    done = 0
4206
    for dev in instance.disks:
4207
      cfg.SetDiskID(dev, pri_node)
4208
      # set the network part of the physical (unique in bdev terms) id
4209
      # to None, meaning detach from network
4210
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4211
      # and 'find' the device, which will 'fix' it to match the
4212
      # standalone state
4213
      if self.rpc.call_blockdev_find(pri_node, dev):
4214
        done += 1
4215
      else:
4216
        warning("Failed to detach drbd %s from network, unusual case" %
4217
                dev.iv_name)
4218

    
4219
    if not done:
4220
      # no detaches succeeded (very unlikely)
4221
      self.cfg.ReleaseDRBDMinors(instance.name)
4222
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4223

    
4224
    # if we managed to detach at least one, we update all the disks of
4225
    # the instance to point to the new secondary
4226
    info("updating instance configuration")
4227
    for dev, _, new_logical_id in iv_names.itervalues():
4228
      dev.logical_id = new_logical_id
4229
      cfg.SetDiskID(dev, pri_node)
4230
    cfg.Update(instance)
4231
    # we can remove now the temp minors as now the new values are
4232
    # written to the config file (and therefore stable)
4233
    self.cfg.ReleaseDRBDMinors(instance.name)
4234

    
4235
    # and now perform the drbd attach
4236
    info("attaching primary drbds to new secondary (standalone => connected)")
4237
    failures = []
4238
    for dev in instance.disks:
4239
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4240
      # since the attach is smart, it's enough to 'find' the device,
4241
      # it will automatically activate the network, if the physical_id
4242
      # is correct
4243
      cfg.SetDiskID(dev, pri_node)
4244
      logging.debug("Disk to attach: %s", dev)
4245
      if not self.rpc.call_blockdev_find(pri_node, dev):
4246
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4247
                "please do a gnt-instance info to see the status of disks")
4248

    
4249
    # this can fail as the old devices are degraded and _WaitForSync
4250
    # does a combined result over all disks, so we don't check its
4251
    # return value
4252
    self.proc.LogStep(5, steps_total, "sync devices")
4253
    _WaitForSync(self, instance, unlock=True)
4254

    
4255
    # so check manually all the devices
4256
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4257
      cfg.SetDiskID(dev, pri_node)
4258
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4259
      if is_degr:
4260
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4261

    
4262
    self.proc.LogStep(6, steps_total, "removing old storage")
4263
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4264
      info("remove logical volumes for %s" % name)
4265
      for lv in old_lvs:
4266
        cfg.SetDiskID(lv, old_node)
4267
        if not self.rpc.call_blockdev_remove(old_node, lv):
4268
          warning("Can't remove LV on old secondary",
4269
                  hint="Cleanup stale volumes by hand")
4270

    
4271
  def Exec(self, feedback_fn):
4272
    """Execute disk replacement.
4273

4274
    This dispatches the disk replacement to the appropriate handler.
4275

4276
    """
4277
    instance = self.instance
4278

    
4279
    # Activate the instance disks if we're replacing them on a down instance
4280
    if instance.status == "down":
4281
      _StartInstanceDisks(self, instance, True)
4282

    
4283
    if instance.disk_template == constants.DT_DRBD8:
4284
      if self.op.remote_node is None:
4285
        fn = self._ExecD8DiskOnly
4286
      else:
4287
        fn = self._ExecD8Secondary
4288
    else:
4289
      raise errors.ProgrammerError("Unhandled disk replacement case")
4290

    
4291
    ret = fn(feedback_fn)
4292

    
4293
    # Deactivate the instance disks if we're replacing them on a down instance
4294
    if instance.status == "down":
4295
      _SafeShutdownInstanceDisks(self, instance)
4296

    
4297
    return ret
4298

    
4299

    
4300
class LUGrowDisk(LogicalUnit):
4301
  """Grow a disk of an instance.
4302

4303
  """
4304
  HPATH = "disk-grow"
4305
  HTYPE = constants.HTYPE_INSTANCE
4306
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4307
  REQ_BGL = False
4308

    
4309
  def ExpandNames(self):
4310
    self._ExpandAndLockInstance()
4311
    self.needed_locks[locking.LEVEL_NODE] = []
4312
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4313

    
4314
  def DeclareLocks(self, level):
4315
    if level == locking.LEVEL_NODE:
4316
      self._LockInstancesNodes()
4317

    
4318
  def BuildHooksEnv(self):
4319
    """Build hooks env.
4320

4321
    This runs on the master, the primary and all the secondaries.
4322

4323
    """
4324
    env = {
4325
      "DISK": self.op.disk,
4326
      "AMOUNT": self.op.amount,
4327
      }
4328
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4329
    nl = [
4330
      self.cfg.GetMasterNode(),
4331
      self.instance.primary_node,
4332
      ]
4333
    return env, nl, nl
4334

    
4335
  def CheckPrereq(self):
4336
    """Check prerequisites.
4337

4338
    This checks that the instance is in the cluster.
4339

4340
    """
4341
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4342
    assert instance is not None, \
4343
      "Cannot retrieve locked instance %s" % self.op.instance_name
4344

    
4345
    self.instance = instance
4346

    
4347
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4348
      raise errors.OpPrereqError("Instance's disk layout does not support"
4349
                                 " growing.")
4350

    
4351
    if instance.FindDisk(self.op.disk) is None:
4352
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4353
                                 (self.op.disk, instance.name))
4354

    
4355
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4356
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4357
                                       instance.hypervisor)
4358
    for node in nodenames:
4359
      info = nodeinfo.get(node, None)
4360
      if not info:
4361
        raise errors.OpPrereqError("Cannot get current information"
4362
                                   " from node '%s'" % node)
4363
      vg_free = info.get('vg_free', None)
4364
      if not isinstance(vg_free, int):
4365
        raise errors.OpPrereqError("Can't compute free disk space on"
4366
                                   " node %s" % node)
4367
      if self.op.amount > info['vg_free']:
4368
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4369
                                   " %d MiB available, %d MiB required" %
4370
                                   (node, info['vg_free'], self.op.amount))
4371

    
4372
  def Exec(self, feedback_fn):
4373
    """Execute disk grow.
4374

4375
    """
4376
    instance = self.instance
4377
    disk = instance.FindDisk(self.op.disk)
4378
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4379
      self.cfg.SetDiskID(disk, node)
4380
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4381
      if (not result or not isinstance(result, (list, tuple)) or
4382
          len(result) != 2):
4383
        raise errors.OpExecError("grow request failed to node %s" % node)
4384
      elif not result[0]:
4385
        raise errors.OpExecError("grow request failed to node %s: %s" %
4386
                                 (node, result[1]))
4387
    disk.RecordGrow(self.op.amount)
4388
    self.cfg.Update(instance)
4389
    if self.op.wait_for_sync:
4390
      disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4391
      if disk_abort:
4392
        logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4393
                     " Please check the instance.")
4394

    
4395

    
4396
class LUQueryInstanceData(NoHooksLU):
4397
  """Query runtime instance data.
4398

4399
  """
4400
  _OP_REQP = ["instances", "static"]
4401
  REQ_BGL = False
4402

    
4403
  def ExpandNames(self):
4404
    self.needed_locks = {}
4405
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4406

    
4407
    if not isinstance(self.op.instances, list):
4408
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4409

    
4410
    if self.op.instances:
4411
      self.wanted_names = []
4412
      for name in self.op.instances:
4413
        full_name = self.cfg.ExpandInstanceName(name)
4414
        if full_name is None:
4415
          raise errors.OpPrereqError("Instance '%s' not known" %
4416
                                     self.op.instance_name)
4417
        self.wanted_names.append(full_name)
4418
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4419
    else:
4420
      self.wanted_names = None
4421
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4422

    
4423
    self.needed_locks[locking.LEVEL_NODE] = []
4424
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4425

    
4426
  def DeclareLocks(self, level):
4427
    if level == locking.LEVEL_NODE:
4428
      self._LockInstancesNodes()
4429

    
4430
  def CheckPrereq(self):
4431
    """Check prerequisites.
4432

4433
    This only checks the optional instance list against the existing names.
4434

4435
    """
4436
    if self.wanted_names is None:
4437
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4438

    
4439
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4440
                             in self.wanted_names]
4441
    return
4442

    
4443
  def _ComputeDiskStatus(self, instance, snode, dev):
4444
    """Compute block device status.
4445

4446
    """
4447
    static = self.op.static
4448
    if not static:
4449
      self.cfg.SetDiskID(dev, instance.primary_node)
4450
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4451
    else:
4452
      dev_pstatus = None
4453

    
4454
    if dev.dev_type in constants.LDS_DRBD:
4455
      # we change the snode then (otherwise we use the one passed in)
4456
      if dev.logical_id[0] == instance.primary_node:
4457
        snode = dev.logical_id[1]
4458
      else:
4459
        snode = dev.logical_id[0]
4460

    
4461
    if snode and not static:
4462
      self.cfg.SetDiskID(dev, snode)
4463
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4464
    else:
4465
      dev_sstatus = None
4466

    
4467
    if dev.children:
4468
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4469
                      for child in dev.children]
4470
    else:
4471
      dev_children = []
4472

    
4473
    data = {
4474
      "iv_name": dev.iv_name,
4475
      "dev_type": dev.dev_type,
4476
      "logical_id": dev.logical_id,
4477
      "physical_id": dev.physical_id,
4478
      "pstatus": dev_pstatus,
4479
      "sstatus": dev_sstatus,
4480
      "children": dev_children,
4481
      }
4482

    
4483
    return data
4484

    
4485
  def Exec(self, feedback_fn):
4486
    """Gather and return data"""
4487
    result = {}
4488

    
4489
    cluster = self.cfg.GetClusterInfo()
4490

    
4491
    for instance in self.wanted_instances:
4492
      if not self.op.static:
4493
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4494
                                                  instance.name,
4495
                                                  instance.hypervisor)
4496
        if remote_info and "state" in remote_info:
4497
          remote_state = "up"
4498
        else:
4499
          remote_state = "down"
4500
      else:
4501
        remote_state = None
4502
      if instance.status == "down":
4503
        config_state = "down"
4504
      else:
4505
        config_state = "up"
4506

    
4507
      disks = [self._ComputeDiskStatus(instance, None, device)
4508
               for device in instance.disks]
4509

    
4510
      idict = {
4511
        "name": instance.name,
4512
        "config_state": config_state,
4513
        "run_state": remote_state,
4514
        "pnode": instance.primary_node,
4515
        "snodes": instance.secondary_nodes,
4516
        "os": instance.os,
4517
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4518
        "disks": disks,
4519
        "hypervisor": instance.hypervisor,
4520
        "network_port": instance.network_port,
4521
        "hv_instance": instance.hvparams,
4522
        "hv_actual": cluster.FillHV(instance),
4523
        "be_instance": instance.beparams,
4524
        "be_actual": cluster.FillBE(instance),
4525
        }
4526

    
4527
      result[instance.name] = idict
4528

    
4529
    return result
4530

    
4531

    
4532
class LUSetInstanceParams(LogicalUnit):
4533
  """Modifies an instances's parameters.
4534

4535
  """
4536
  HPATH = "instance-modify"
4537
  HTYPE = constants.HTYPE_INSTANCE
4538
  _OP_REQP = ["instance_name", "hvparams"]
4539
  REQ_BGL = False
4540

    
4541
  def ExpandNames(self):
4542
    self._ExpandAndLockInstance()
4543
    self.needed_locks[locking.LEVEL_NODE] = []
4544
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4545

    
4546

    
4547
  def DeclareLocks(self, level):
4548
    if level == locking.LEVEL_NODE:
4549
      self._LockInstancesNodes()
4550

    
4551
  def BuildHooksEnv(self):
4552
    """Build hooks env.
4553

4554
    This runs on the master, primary and secondaries.
4555

4556
    """
4557
    args = dict()
4558
    if constants.BE_MEMORY in self.be_new:
4559
      args['memory'] = self.be_new[constants.BE_MEMORY]
4560
    if constants.BE_VCPUS in self.be_new:
4561
      args['vcpus'] = self.be_bnew[constants.BE_VCPUS]
4562
    if self.do_ip or self.do_bridge or self.mac:
4563
      if self.do_ip:
4564
        ip = self.ip
4565
      else:
4566
        ip = self.instance.nics[0].ip
4567
      if self.bridge:
4568
        bridge = self.bridge
4569
      else:
4570
        bridge = self.instance.nics[0].bridge
4571
      if self.mac:
4572
        mac = self.mac
4573
      else:
4574
        mac = self.instance.nics[0].mac
4575
      args['nics'] = [(ip, bridge, mac)]
4576
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4577
    nl = [self.cfg.GetMasterNode(),
4578
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4579
    return env, nl, nl
4580

    
4581
  def CheckPrereq(self):
4582
    """Check prerequisites.
4583

4584
    This only checks the instance list against the existing names.
4585

4586
    """
4587
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4588
    # a separate CheckArguments function, if we implement one, so the operation
4589
    # can be aborted without waiting for any lock, should it have an error...
4590
    self.ip = getattr(self.op, "ip", None)
4591
    self.mac = getattr(self.op, "mac", None)
4592
    self.bridge = getattr(self.op, "bridge", None)
4593
    self.kernel_path = getattr(self.op, "kernel_path", None)
4594
    self.initrd_path = getattr(self.op, "initrd_path", None)
4595
    self.force = getattr(self.op, "force", None)
4596
    all_parms = [self.ip, self.bridge, self.mac]
4597
    if (all_parms.count(None) == len(all_parms) and
4598
        not self.op.hvparams and
4599
        not self.op.beparams):
4600
      raise errors.OpPrereqError("No changes submitted")
4601
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4602
      val = self.op.beparams.get(item, None)
4603
      if val is not None:
4604
        try:
4605
          val = int(val)
4606
        except ValueError, err:
4607
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4608
        self.op.beparams[item] = val
4609
    if self.ip is not None:
4610
      self.do_ip = True
4611
      if self.ip.lower() == "none":
4612
        self.ip = None
4613
      else:
4614
        if not utils.IsValidIP(self.ip):
4615
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4616
    else:
4617
      self.do_ip = False
4618
    self.do_bridge = (self.bridge is not None)
4619
    if self.mac is not None:
4620
      if self.cfg.IsMacInUse(self.mac):
4621
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4622
                                   self.mac)
4623
      if not utils.IsValidMac(self.mac):
4624
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4625

    
4626
    # checking the new params on the primary/secondary nodes
4627

    
4628
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4629
    assert self.instance is not None, \
4630
      "Cannot retrieve locked instance %s" % self.op.instance_name
4631
    pnode = self.instance.primary_node
4632
    nodelist = [pnode]
4633
    nodelist.extend(instance.secondary_nodes)
4634

    
4635
    # hvparams processing
4636
    if self.op.hvparams:
4637
      i_hvdict = copy.deepcopy(instance.hvparams)
4638
      for key, val in self.op.hvparams.iteritems():
4639
        if val is None:
4640
          try:
4641
            del i_hvdict[key]
4642
          except KeyError:
4643
            pass
4644
        else:
4645
          i_hvdict[key] = val
4646
      cluster = self.cfg.GetClusterInfo()
4647
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4648
                                i_hvdict)
4649
      # local check
4650
      hypervisor.GetHypervisor(
4651
        instance.hypervisor).CheckParameterSyntax(hv_new)
4652
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4653
      self.hv_new = hv_new # the new actual values
4654
      self.hv_inst = i_hvdict # the new dict (without defaults)
4655
    else:
4656
      self.hv_new = self.hv_inst = {}
4657

    
4658
    # beparams processing
4659
    if self.op.beparams:
4660
      i_bedict = copy.deepcopy(instance.beparams)
4661
      for key, val in self.op.beparams.iteritems():
4662
        if val is None:
4663
          try:
4664
            del i_bedict[key]
4665
          except KeyError:
4666
            pass
4667
        else:
4668
          i_bedict[key] = val
4669
      cluster = self.cfg.GetClusterInfo()
4670
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4671
                                i_bedict)
4672
      self.be_new = be_new # the new actual values
4673
      self.be_inst = i_bedict # the new dict (without defaults)
4674
    else:
4675
      self.hv_new = self.hv_inst = {}
4676

    
4677
    self.warn = []
4678

    
4679
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4680
      mem_check_list = [pnode]
4681
      if be_new[constants.BE_AUTO_BALANCE]:
4682
        # either we changed auto_balance to yes or it was from before
4683
        mem_check_list.extend(instance.secondary_nodes)
4684
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4685
                                                  instance.hypervisor)
4686
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4687
                                         instance.hypervisor)
4688

    
4689
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4690
        # Assume the primary node is unreachable and go ahead
4691
        self.warn.append("Can't get info from primary node %s" % pnode)
4692
      else:
4693
        if instance_info:
4694
          current_mem = instance_info['memory']
4695
        else:
4696
          # Assume instance not running
4697
          # (there is a slight race condition here, but it's not very probable,
4698
          # and we have no other way to check)
4699
          current_mem = 0
4700
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4701
                    nodeinfo[pnode]['memory_free'])
4702
        if miss_mem > 0:
4703
          raise errors.OpPrereqError("This change will prevent the instance"
4704
                                     " from starting, due to %d MB of memory"
4705
                                     " missing on its primary node" % miss_mem)
4706

    
4707
      if be_new[constants.BE_AUTO_BALANCE]:
4708
        for node in instance.secondary_nodes:
4709
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4710
            self.warn.append("Can't get info from secondary node %s" % node)
4711
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4712
            self.warn.append("Not enough memory to failover instance to"
4713
                             " secondary node %s" % node)
4714

    
4715
    return
4716

    
4717
  def Exec(self, feedback_fn):
4718
    """Modifies an instance.
4719

4720
    All parameters take effect only at the next restart of the instance.
4721
    """
4722
    # Process here the warnings from CheckPrereq, as we don't have a
4723
    # feedback_fn there.
4724
    for warn in self.warn:
4725
      feedback_fn("WARNING: %s" % warn)
4726

    
4727
    result = []
4728
    instance = self.instance
4729
    if self.do_ip:
4730
      instance.nics[0].ip = self.ip
4731
      result.append(("ip", self.ip))
4732
    if self.bridge:
4733
      instance.nics[0].bridge = self.bridge
4734
      result.append(("bridge", self.bridge))
4735
    if self.mac:
4736
      instance.nics[0].mac = self.mac
4737
      result.append(("mac", self.mac))
4738
    if self.op.hvparams:
4739
      instance.hvparams = self.hv_new
4740
      for key, val in self.op.hvparams.iteritems():
4741
        result.append(("hv/%s" % key, val))
4742
    if self.op.beparams:
4743
      instance.beparams = self.be_inst
4744
      for key, val in self.op.beparams.iteritems():
4745
        result.append(("be/%s" % key, val))
4746

    
4747
    self.cfg.Update(instance)
4748

    
4749
    return result
4750

    
4751

    
4752
class LUQueryExports(NoHooksLU):
4753
  """Query the exports list
4754

4755
  """
4756
  _OP_REQP = ['nodes']
4757
  REQ_BGL = False
4758

    
4759
  def ExpandNames(self):
4760
    self.needed_locks = {}
4761
    self.share_locks[locking.LEVEL_NODE] = 1
4762
    if not self.op.nodes:
4763
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4764
    else:
4765
      self.needed_locks[locking.LEVEL_NODE] = \
4766
        _GetWantedNodes(self, self.op.nodes)
4767

    
4768
  def CheckPrereq(self):
4769
    """Check prerequisites.
4770

4771
    """
4772
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4773

    
4774
  def Exec(self, feedback_fn):
4775
    """Compute the list of all the exported system images.
4776

4777
    Returns:
4778
      a dictionary with the structure node->(export-list)
4779
      where export-list is a list of the instances exported on
4780
      that node.
4781

4782
    """
4783
    return self.rpc.call_export_list(self.nodes)
4784

    
4785

    
4786
class LUExportInstance(LogicalUnit):
4787
  """Export an instance to an image in the cluster.
4788

4789
  """
4790
  HPATH = "instance-export"
4791
  HTYPE = constants.HTYPE_INSTANCE
4792
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4793
  REQ_BGL = False
4794

    
4795
  def ExpandNames(self):
4796
    self._ExpandAndLockInstance()
4797
    # FIXME: lock only instance primary and destination node
4798
    #
4799
    # Sad but true, for now we have do lock all nodes, as we don't know where
4800
    # the previous export might be, and and in this LU we search for it and
4801
    # remove it from its current node. In the future we could fix this by:
4802
    #  - making a tasklet to search (share-lock all), then create the new one,
4803
    #    then one to remove, after
4804
    #  - removing the removal operation altoghether
4805
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4806

    
4807
  def DeclareLocks(self, level):
4808
    """Last minute lock declaration."""
4809
    # All nodes are locked anyway, so nothing to do here.
4810

    
4811
  def BuildHooksEnv(self):
4812
    """Build hooks env.
4813

4814
    This will run on the master, primary node and target node.
4815

4816
    """
4817
    env = {
4818
      "EXPORT_NODE": self.op.target_node,
4819
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4820
      }
4821
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4822
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4823
          self.op.target_node]
4824
    return env, nl, nl
4825

    
4826
  def CheckPrereq(self):
4827
    """Check prerequisites.
4828

4829
    This checks that the instance and node names are valid.
4830

4831
    """
4832
    instance_name = self.op.instance_name
4833
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4834
    assert self.instance is not None, \
4835
          "Cannot retrieve locked instance %s" % self.op.instance_name
4836

    
4837
    self.dst_node = self.cfg.GetNodeInfo(
4838
      self.cfg.ExpandNodeName(self.op.target_node))
4839

    
4840
    assert self.dst_node is not None, \
4841
          "Cannot retrieve locked node %s" % self.op.target_node
4842

    
4843
    # instance disk type verification
4844
    for disk in self.instance.disks:
4845
      if disk.dev_type == constants.LD_FILE:
4846
        raise errors.OpPrereqError("Export not supported for instances with"
4847
                                   " file-based disks")
4848

    
4849
  def Exec(self, feedback_fn):
4850
    """Export an instance to an image in the cluster.
4851

4852
    """
4853
    instance = self.instance
4854
    dst_node = self.dst_node
4855
    src_node = instance.primary_node
4856
    if self.op.shutdown:
4857
      # shutdown the instance, but not the disks
4858
      if not self.rpc.call_instance_shutdown(src_node, instance):
4859
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4860
                                 (instance.name, src_node))
4861

    
4862
    vgname = self.cfg.GetVGName()
4863

    
4864
    snap_disks = []
4865

    
4866
    try:
4867
      for disk in instance.disks:
4868
        if disk.iv_name == "sda":
4869
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4870
          new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4871

    
4872
          if not new_dev_name:
4873
            logger.Error("could not snapshot block device %s on node %s" %
4874
                         (disk.logical_id[1], src_node))
4875
          else:
4876
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4877
                                      logical_id=(vgname, new_dev_name),
4878
                                      physical_id=(vgname, new_dev_name),
4879
                                      iv_name=disk.iv_name)
4880
            snap_disks.append(new_dev)
4881

    
4882
    finally:
4883
      if self.op.shutdown and instance.status == "up":
4884
        if not self.rpc.call_instance_start(src_node, instance, None):
4885
          _ShutdownInstanceDisks(self, instance)
4886
          raise errors.OpExecError("Could not start instance")
4887

    
4888
    # TODO: check for size
4889

    
4890
    cluster_name = self.cfg.GetClusterName()
4891
    for dev in snap_disks:
4892
      if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4893
                                      instance, cluster_name):
4894
        logger.Error("could not export block device %s from node %s to node %s"
4895
                     % (dev.logical_id[1], src_node, dst_node.name))
4896
      if not self.rpc.call_blockdev_remove(src_node, dev):
4897
        logger.Error("could not remove snapshot block device %s from node %s" %
4898
                     (dev.logical_id[1], src_node))
4899

    
4900
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4901
      logger.Error("could not finalize export for instance %s on node %s" %
4902
                   (instance.name, dst_node.name))
4903

    
4904
    nodelist = self.cfg.GetNodeList()
4905
    nodelist.remove(dst_node.name)
4906

    
4907
    # on one-node clusters nodelist will be empty after the removal
4908
    # if we proceed the backup would be removed because OpQueryExports
4909
    # substitutes an empty list with the full cluster node list.
4910
    if nodelist:
4911
      exportlist = self.rpc.call_export_list(nodelist)
4912
      for node in exportlist:
4913
        if instance.name in exportlist[node]:
4914
          if not self.rpc.call_export_remove(node, instance.name):
4915
            logger.Error("could not remove older export for instance %s"
4916
                         " on node %s" % (instance.name, node))
4917

    
4918

    
4919
class LURemoveExport(NoHooksLU):
4920
  """Remove exports related to the named instance.
4921

4922
  """
4923
  _OP_REQP = ["instance_name"]
4924
  REQ_BGL = False
4925

    
4926
  def ExpandNames(self):
4927
    self.needed_locks = {}
4928
    # We need all nodes to be locked in order for RemoveExport to work, but we
4929
    # don't need to lock the instance itself, as nothing will happen to it (and
4930
    # we can remove exports also for a removed instance)
4931
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4932

    
4933
  def CheckPrereq(self):
4934
    """Check prerequisites.
4935
    """
4936
    pass
4937

    
4938
  def Exec(self, feedback_fn):
4939
    """Remove any export.
4940

4941
    """
4942
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4943
    # If the instance was not found we'll try with the name that was passed in.
4944
    # This will only work if it was an FQDN, though.
4945
    fqdn_warn = False
4946
    if not instance_name:
4947
      fqdn_warn = True
4948
      instance_name = self.op.instance_name
4949

    
4950
    exportlist = self.rpc.call_export_list(self.acquired_locks[
4951
      locking.LEVEL_NODE])
4952
    found = False
4953
    for node in exportlist:
4954
      if instance_name in exportlist[node]:
4955
        found = True
4956
        if not self.rpc.call_export_remove(node, instance_name):
4957
          logger.Error("could not remove export for instance %s"
4958
                       " on node %s" % (instance_name, node))
4959

    
4960
    if fqdn_warn and not found:
4961
      feedback_fn("Export not found. If trying to remove an export belonging"
4962
                  " to a deleted instance please use its Fully Qualified"
4963
                  " Domain Name.")
4964

    
4965

    
4966
class TagsLU(NoHooksLU):
4967
  """Generic tags LU.
4968

4969
  This is an abstract class which is the parent of all the other tags LUs.
4970

4971
  """
4972

    
4973
  def ExpandNames(self):
4974
    self.needed_locks = {}
4975
    if self.op.kind == constants.TAG_NODE:
4976
      name = self.cfg.ExpandNodeName(self.op.name)
4977
      if name is None:
4978
        raise errors.OpPrereqError("Invalid node name (%s)" %
4979
                                   (self.op.name,))
4980
      self.op.name = name
4981
      self.needed_locks[locking.LEVEL_NODE] = name
4982
    elif self.op.kind == constants.TAG_INSTANCE:
4983
      name = self.cfg.ExpandInstanceName(self.op.name)
4984
      if name is None:
4985
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4986
                                   (self.op.name,))
4987
      self.op.name = name
4988
      self.needed_locks[locking.LEVEL_INSTANCE] = name
4989

    
4990
  def CheckPrereq(self):
4991
    """Check prerequisites.
4992

4993
    """
4994
    if self.op.kind == constants.TAG_CLUSTER:
4995
      self.target = self.cfg.GetClusterInfo()
4996
    elif self.op.kind == constants.TAG_NODE:
4997
      self.target = self.cfg.GetNodeInfo(self.op.name)
4998
    elif self.op.kind == constants.TAG_INSTANCE:
4999
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5000
    else:
5001
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5002
                                 str(self.op.kind))
5003

    
5004

    
5005
class LUGetTags(TagsLU):
5006
  """Returns the tags of a given object.
5007

5008
  """
5009
  _OP_REQP = ["kind", "name"]
5010
  REQ_BGL = False
5011

    
5012
  def Exec(self, feedback_fn):
5013
    """Returns the tag list.
5014

5015
    """
5016
    return list(self.target.GetTags())
5017

    
5018

    
5019
class LUSearchTags(NoHooksLU):
5020
  """Searches the tags for a given pattern.
5021

5022
  """
5023
  _OP_REQP = ["pattern"]
5024
  REQ_BGL = False
5025

    
5026
  def ExpandNames(self):
5027
    self.needed_locks = {}
5028

    
5029
  def CheckPrereq(self):
5030
    """Check prerequisites.
5031

5032
    This checks the pattern passed for validity by compiling it.
5033

5034
    """
5035
    try:
5036
      self.re = re.compile(self.op.pattern)
5037
    except re.error, err:
5038
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5039
                                 (self.op.pattern, err))
5040

    
5041
  def Exec(self, feedback_fn):
5042
    """Returns the tag list.
5043

5044
    """
5045
    cfg = self.cfg
5046
    tgts = [("/cluster", cfg.GetClusterInfo())]
5047
    ilist = cfg.GetAllInstancesInfo().values()
5048
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5049
    nlist = cfg.GetAllNodesInfo().values()
5050
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5051
    results = []
5052
    for path, target in tgts:
5053
      for tag in target.GetTags():
5054
        if self.re.search(tag):
5055
          results.append((path, tag))
5056
    return results
5057

    
5058

    
5059
class LUAddTags(TagsLU):
5060
  """Sets a tag on a given object.
5061

5062
  """
5063
  _OP_REQP = ["kind", "name", "tags"]
5064
  REQ_BGL = False
5065

    
5066
  def CheckPrereq(self):
5067
    """Check prerequisites.
5068

5069
    This checks the type and length of the tag name and value.
5070

5071
    """
5072
    TagsLU.CheckPrereq(self)
5073
    for tag in self.op.tags:
5074
      objects.TaggableObject.ValidateTag(tag)
5075

    
5076
  def Exec(self, feedback_fn):
5077
    """Sets the tag.
5078

5079
    """
5080
    try:
5081
      for tag in self.op.tags:
5082
        self.target.AddTag(tag)
5083
    except errors.TagError, err:
5084
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5085
    try:
5086
      self.cfg.Update(self.target)
5087
    except errors.ConfigurationError:
5088
      raise errors.OpRetryError("There has been a modification to the"
5089
                                " config file and the operation has been"
5090
                                " aborted. Please retry.")
5091

    
5092

    
5093
class LUDelTags(TagsLU):
5094
  """Delete a list of tags from a given object.
5095

5096
  """
5097
  _OP_REQP = ["kind", "name", "tags"]
5098
  REQ_BGL = False
5099

    
5100
  def CheckPrereq(self):
5101
    """Check prerequisites.
5102

5103
    This checks that we have the given tag.
5104

5105
    """
5106
    TagsLU.CheckPrereq(self)
5107
    for tag in self.op.tags:
5108
      objects.TaggableObject.ValidateTag(tag)
5109
    del_tags = frozenset(self.op.tags)
5110
    cur_tags = self.target.GetTags()
5111
    if not del_tags <= cur_tags:
5112
      diff_tags = del_tags - cur_tags
5113
      diff_names = ["'%s'" % tag for tag in diff_tags]
5114
      diff_names.sort()
5115
      raise errors.OpPrereqError("Tag(s) %s not found" %
5116
                                 (",".join(diff_names)))
5117

    
5118
  def Exec(self, feedback_fn):
5119
    """Remove the tag from the object.
5120

5121
    """
5122
    for tag in self.op.tags:
5123
      self.target.RemoveTag(tag)
5124
    try:
5125
      self.cfg.Update(self.target)
5126
    except errors.ConfigurationError:
5127
      raise errors.OpRetryError("There has been a modification to the"
5128
                                " config file and the operation has been"
5129
                                " aborted. Please retry.")
5130

    
5131

    
5132
class LUTestDelay(NoHooksLU):
5133
  """Sleep for a specified amount of time.
5134

5135
  This LU sleeps on the master and/or nodes for a specified amount of
5136
  time.
5137

5138
  """
5139
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5140
  REQ_BGL = False
5141

    
5142
  def ExpandNames(self):
5143
    """Expand names and set required locks.
5144

5145
    This expands the node list, if any.
5146

5147
    """
5148
    self.needed_locks = {}
5149
    if self.op.on_nodes:
5150
      # _GetWantedNodes can be used here, but is not always appropriate to use
5151
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5152
      # more information.
5153
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5154
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5155

    
5156
  def CheckPrereq(self):
5157
    """Check prerequisites.
5158

5159
    """
5160

    
5161
  def Exec(self, feedback_fn):
5162
    """Do the actual sleep.
5163

5164
    """
5165
    if self.op.on_master:
5166
      if not utils.TestDelay(self.op.duration):
5167
        raise errors.OpExecError("Error during master delay test")
5168
    if self.op.on_nodes:
5169
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5170
      if not result:
5171
        raise errors.OpExecError("Complete failure from rpc call")
5172
      for node, node_result in result.items():
5173
        if not node_result:
5174
          raise errors.OpExecError("Failure during rpc call to node %s,"
5175
                                   " result: %s" % (node, node_result))
5176

    
5177

    
5178
class IAllocator(object):
5179
  """IAllocator framework.
5180

5181
  An IAllocator instance has three sets of attributes:
5182
    - cfg that is needed to query the cluster
5183
    - input data (all members of the _KEYS class attribute are required)
5184
    - four buffer attributes (in|out_data|text), that represent the
5185
      input (to the external script) in text and data structure format,
5186
      and the output from it, again in two formats
5187
    - the result variables from the script (success, info, nodes) for
5188
      easy usage
5189

5190
  """
5191
  _ALLO_KEYS = [
5192
    "mem_size", "disks", "disk_template",
5193
    "os", "tags", "nics", "vcpus",
5194
    ]
5195
  _RELO_KEYS = [
5196
    "relocate_from",
5197
    ]
5198

    
5199
  def __init__(self, lu, mode, name, **kwargs):
5200
    self.lu = lu
5201
    # init buffer variables
5202
    self.in_text = self.out_text = self.in_data = self.out_data = None
5203
    # init all input fields so that pylint is happy
5204
    self.mode = mode
5205
    self.name = name
5206
    self.mem_size = self.disks = self.disk_template = None
5207
    self.os = self.tags = self.nics = self.vcpus = None
5208
    self.relocate_from = None
5209
    # computed fields
5210
    self.required_nodes = None
5211
    # init result fields
5212
    self.success = self.info = self.nodes = None
5213
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5214
      keyset = self._ALLO_KEYS
5215
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5216
      keyset = self._RELO_KEYS
5217
    else:
5218
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5219
                                   " IAllocator" % self.mode)
5220
    for key in kwargs:
5221
      if key not in keyset:
5222
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5223
                                     " IAllocator" % key)
5224
      setattr(self, key, kwargs[key])
5225
    for key in keyset:
5226
      if key not in kwargs:
5227
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5228
                                     " IAllocator" % key)
5229
    self._BuildInputData()
5230

    
5231
  def _ComputeClusterData(self):
5232
    """Compute the generic allocator input data.
5233

5234
    This is the data that is independent of the actual operation.
5235

5236
    """
5237
    cfg = self.lu.cfg
5238
    cluster_info = cfg.GetClusterInfo()
5239
    # cluster data
5240
    data = {
5241
      "version": 1,
5242
      "cluster_name": cfg.GetClusterName(),
5243
      "cluster_tags": list(cluster_info.GetTags()),
5244
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5245
      # we don't have job IDs
5246
      }
5247

    
5248
    i_list = []
5249
    cluster = self.cfg.GetClusterInfo()
5250
    for iname in cfg.GetInstanceList():
5251
      i_obj = cfg.GetInstanceInfo(iname)
5252
      i_list.append((i_obj, cluster.FillBE(i_obj)))
5253

    
5254
    # node data
5255
    node_results = {}
5256
    node_list = cfg.GetNodeList()
5257
    # FIXME: here we have only one hypervisor information, but
5258
    # instance can belong to different hypervisors
5259
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5260
                                           cfg.GetHypervisorType())
5261
    for nname in node_list:
5262
      ninfo = cfg.GetNodeInfo(nname)
5263
      if nname not in node_data or not isinstance(node_data[nname], dict):
5264
        raise errors.OpExecError("Can't get data for node %s" % nname)
5265
      remote_info = node_data[nname]
5266
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5267
                   'vg_size', 'vg_free', 'cpu_total']:
5268
        if attr not in remote_info:
5269
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5270
                                   (nname, attr))
5271
        try:
5272
          remote_info[attr] = int(remote_info[attr])
5273
        except ValueError, err:
5274
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5275
                                   " %s" % (nname, attr, str(err)))
5276
      # compute memory used by primary instances
5277
      i_p_mem = i_p_up_mem = 0
5278
      for iinfo, beinfo in i_list:
5279
        if iinfo.primary_node == nname:
5280
          i_p_mem += beinfo[constants.BE_MEMORY]
5281
          if iinfo.status == "up":
5282
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5283

    
5284
      # compute memory used by instances
5285
      pnr = {
5286
        "tags": list(ninfo.GetTags()),
5287
        "total_memory": remote_info['memory_total'],
5288
        "reserved_memory": remote_info['memory_dom0'],
5289
        "free_memory": remote_info['memory_free'],
5290
        "i_pri_memory": i_p_mem,
5291
        "i_pri_up_memory": i_p_up_mem,
5292
        "total_disk": remote_info['vg_size'],
5293
        "free_disk": remote_info['vg_free'],
5294
        "primary_ip": ninfo.primary_ip,
5295
        "secondary_ip": ninfo.secondary_ip,
5296
        "total_cpus": remote_info['cpu_total'],
5297
        }
5298
      node_results[nname] = pnr
5299
    data["nodes"] = node_results
5300

    
5301
    # instance data
5302
    instance_data = {}
5303
    for iinfo, beinfo in i_list:
5304
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5305
                  for n in iinfo.nics]
5306
      pir = {
5307
        "tags": list(iinfo.GetTags()),
5308
        "should_run": iinfo.status == "up",
5309
        "vcpus": beinfo[constants.BE_VCPUS],
5310
        "memory": beinfo[constants.BE_MEMORY],
5311
        "os": iinfo.os,
5312
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5313
        "nics": nic_data,
5314
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5315
        "disk_template": iinfo.disk_template,
5316
        "hypervisor": iinfo.hypervisor,
5317
        }
5318
      instance_data[iinfo.name] = pir
5319

    
5320
    data["instances"] = instance_data
5321

    
5322
    self.in_data = data
5323

    
5324
  def _AddNewInstance(self):
5325
    """Add new instance data to allocator structure.
5326

5327
    This in combination with _AllocatorGetClusterData will create the
5328
    correct structure needed as input for the allocator.
5329

5330
    The checks for the completeness of the opcode must have already been
5331
    done.
5332

5333
    """
5334
    data = self.in_data
5335
    if len(self.disks) != 2:
5336
      raise errors.OpExecError("Only two-disk configurations supported")
5337

    
5338
    disk_space = _ComputeDiskSize(self.disk_template,
5339
                                  self.disks[0]["size"], self.disks[1]["size"])
5340

    
5341
    if self.disk_template in constants.DTS_NET_MIRROR:
5342
      self.required_nodes = 2
5343
    else:
5344
      self.required_nodes = 1
5345
    request = {
5346
      "type": "allocate",
5347
      "name": self.name,
5348
      "disk_template": self.disk_template,
5349
      "tags": self.tags,
5350
      "os": self.os,
5351
      "vcpus": self.vcpus,
5352
      "memory": self.mem_size,
5353
      "disks": self.disks,
5354
      "disk_space_total": disk_space,
5355
      "nics": self.nics,
5356
      "required_nodes": self.required_nodes,
5357
      }
5358
    data["request"] = request
5359

    
5360
  def _AddRelocateInstance(self):
5361
    """Add relocate instance data to allocator structure.
5362

5363
    This in combination with _IAllocatorGetClusterData will create the
5364
    correct structure needed as input for the allocator.
5365

5366
    The checks for the completeness of the opcode must have already been
5367
    done.
5368

5369
    """
5370
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5371
    if instance is None:
5372
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5373
                                   " IAllocator" % self.name)
5374

    
5375
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5376
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5377

    
5378
    if len(instance.secondary_nodes) != 1:
5379
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5380

    
5381
    self.required_nodes = 1
5382

    
5383
    disk_space = _ComputeDiskSize(instance.disk_template,
5384
                                  instance.disks[0].size,
5385
                                  instance.disks[1].size)
5386

    
5387
    request = {
5388
      "type": "relocate",
5389
      "name": self.name,
5390
      "disk_space_total": disk_space,
5391
      "required_nodes": self.required_nodes,
5392
      "relocate_from": self.relocate_from,
5393
      }
5394
    self.in_data["request"] = request
5395

    
5396
  def _BuildInputData(self):
5397
    """Build input data structures.
5398

5399
    """
5400
    self._ComputeClusterData()
5401

    
5402
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5403
      self._AddNewInstance()
5404
    else:
5405
      self._AddRelocateInstance()
5406

    
5407
    self.in_text = serializer.Dump(self.in_data)
5408

    
5409
  def Run(self, name, validate=True, call_fn=None):
5410
    """Run an instance allocator and return the results.
5411

5412
    """
5413
    if call_fn is None:
5414
      call_fn = self.lu.rpc.call_iallocator_runner
5415
    data = self.in_text
5416

    
5417
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5418

    
5419
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5420
      raise errors.OpExecError("Invalid result from master iallocator runner")
5421

    
5422
    rcode, stdout, stderr, fail = result
5423

    
5424
    if rcode == constants.IARUN_NOTFOUND:
5425
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5426
    elif rcode == constants.IARUN_FAILURE:
5427
      raise errors.OpExecError("Instance allocator call failed: %s,"
5428
                               " output: %s" % (fail, stdout+stderr))
5429
    self.out_text = stdout
5430
    if validate:
5431
      self._ValidateResult()
5432

    
5433
  def _ValidateResult(self):
5434
    """Process the allocator results.
5435

5436
    This will process and if successful save the result in
5437
    self.out_data and the other parameters.
5438

5439
    """
5440
    try:
5441
      rdict = serializer.Load(self.out_text)
5442
    except Exception, err:
5443
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5444

    
5445
    if not isinstance(rdict, dict):
5446
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5447

    
5448
    for key in "success", "info", "nodes":
5449
      if key not in rdict:
5450
        raise errors.OpExecError("Can't parse iallocator results:"
5451
                                 " missing key '%s'" % key)
5452
      setattr(self, key, rdict[key])
5453

    
5454
    if not isinstance(rdict["nodes"], list):
5455
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5456
                               " is not a list")
5457
    self.out_data = rdict
5458

    
5459

    
5460
class LUTestAllocator(NoHooksLU):
5461
  """Run allocator tests.
5462

5463
  This LU runs the allocator tests
5464

5465
  """
5466
  _OP_REQP = ["direction", "mode", "name"]
5467

    
5468
  def CheckPrereq(self):
5469
    """Check prerequisites.
5470

5471
    This checks the opcode parameters depending on the director and mode test.
5472

5473
    """
5474
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5475
      for attr in ["name", "mem_size", "disks", "disk_template",
5476
                   "os", "tags", "nics", "vcpus"]:
5477
        if not hasattr(self.op, attr):
5478
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5479
                                     attr)
5480
      iname = self.cfg.ExpandInstanceName(self.op.name)
5481
      if iname is not None:
5482
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5483
                                   iname)
5484
      if not isinstance(self.op.nics, list):
5485
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5486
      for row in self.op.nics:
5487
        if (not isinstance(row, dict) or
5488
            "mac" not in row or
5489
            "ip" not in row or
5490
            "bridge" not in row):
5491
          raise errors.OpPrereqError("Invalid contents of the"
5492
                                     " 'nics' parameter")
5493
      if not isinstance(self.op.disks, list):
5494
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5495
      if len(self.op.disks) != 2:
5496
        raise errors.OpPrereqError("Only two-disk configurations supported")
5497
      for row in self.op.disks:
5498
        if (not isinstance(row, dict) or
5499
            "size" not in row or
5500
            not isinstance(row["size"], int) or
5501
            "mode" not in row or
5502
            row["mode"] not in ['r', 'w']):
5503
          raise errors.OpPrereqError("Invalid contents of the"
5504
                                     " 'disks' parameter")
5505
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5506
      if not hasattr(self.op, "name"):
5507
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5508
      fname = self.cfg.ExpandInstanceName(self.op.name)
5509
      if fname is None:
5510
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5511
                                   self.op.name)
5512
      self.op.name = fname
5513
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5514
    else:
5515
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5516
                                 self.op.mode)
5517

    
5518
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5519
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5520
        raise errors.OpPrereqError("Missing allocator name")
5521
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5522
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5523
                                 self.op.direction)
5524

    
5525
  def Exec(self, feedback_fn):
5526
    """Run the allocator test.
5527

5528
    """
5529
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5530
      ial = IAllocator(self,
5531
                       mode=self.op.mode,
5532
                       name=self.op.name,
5533
                       mem_size=self.op.mem_size,
5534
                       disks=self.op.disks,
5535
                       disk_template=self.op.disk_template,
5536
                       os=self.op.os,
5537
                       tags=self.op.tags,
5538
                       nics=self.op.nics,
5539
                       vcpus=self.op.vcpus,
5540
                       )
5541
    else:
5542
      ial = IAllocator(self,
5543
                       mode=self.op.mode,
5544
                       name=self.op.name,
5545
                       relocate_from=list(self.relocate_from),
5546
                       )
5547

    
5548
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5549
      result = ial.in_text
5550
    else:
5551
      ial.Run(self.op.allocator, validate=False)
5552
      result = ial.out_text
5553
    return result