Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 23828f1c

History | View | Annotate | Download (189.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_BGL = True
69

    
70
  def __init__(self, processor, op, context, rpc):
71
    """Constructor for LogicalUnit.
72

73
    This needs to be overriden in derived classes in order to check op
74
    validity.
75

76
    """
77
    self.proc = processor
78
    self.op = op
79
    self.cfg = context.cfg
80
    self.context = context
81
    self.rpc = rpc
82
    # Dicts used to declare locking needs to mcpu
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    self.add_locks = {}
87
    self.remove_locks = {}
88
    # Used to force good behavior when calling helper functions
89
    self.recalculate_locks = {}
90
    self.__ssh = None
91

    
92
    for attr_name in self._OP_REQP:
93
      attr_val = getattr(op, attr_name, None)
94
      if attr_val is None:
95
        raise errors.OpPrereqError("Required parameter '%s' missing" %
96
                                   attr_name)
97

    
98
    if not self.cfg.IsCluster():
99
      raise errors.OpPrereqError("Cluster not initialized yet,"
100
                                 " use 'gnt-cluster init' first.")
101
    if self.REQ_MASTER:
102
      master = self.cfg.GetMasterNode()
103
      if master != utils.HostInfo().name:
104
        raise errors.OpPrereqError("Commands must be run on the master"
105
                                   " node %s" % master)
106

    
107
  def __GetSSH(self):
108
    """Returns the SshRunner object
109

110
    """
111
    if not self.__ssh:
112
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113
    return self.__ssh
114

    
115
  ssh = property(fget=__GetSSH)
116

    
117
  def ExpandNames(self):
118
    """Expand names for this LU.
119

120
    This method is called before starting to execute the opcode, and it should
121
    update all the parameters of the opcode to their canonical form (e.g. a
122
    short node name must be fully expanded after this method has successfully
123
    completed). This way locking, hooks, logging, ecc. can work correctly.
124

125
    LUs which implement this method must also populate the self.needed_locks
126
    member, as a dict with lock levels as keys, and a list of needed lock names
127
    as values. Rules:
128
      - Use an empty dict if you don't need any lock
129
      - If you don't need any lock at a particular level omit that level
130
      - Don't put anything for the BGL level
131
      - If you want all locks at a level use locking.ALL_SET as a value
132

133
    If you need to share locks (rather than acquire them exclusively) at one
134
    level you can modify self.share_locks, setting a true value (usually 1) for
135
    that level. By default locks are not shared.
136

137
    Examples:
138
    # Acquire all nodes and one instance
139
    self.needed_locks = {
140
      locking.LEVEL_NODE: locking.ALL_SET,
141
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
142
    }
143
    # Acquire just two nodes
144
    self.needed_locks = {
145
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146
    }
147
    # Acquire no locks
148
    self.needed_locks = {} # No, you can't leave it to the default value None
149

150
    """
151
    # The implementation of this method is mandatory only if the new LU is
152
    # concurrent, so that old LUs don't need to be changed all at the same
153
    # time.
154
    if self.REQ_BGL:
155
      self.needed_locks = {} # Exclusive LUs don't need locks.
156
    else:
157
      raise NotImplementedError
158

    
159
  def DeclareLocks(self, level):
160
    """Declare LU locking needs for a level
161

162
    While most LUs can just declare their locking needs at ExpandNames time,
163
    sometimes there's the need to calculate some locks after having acquired
164
    the ones before. This function is called just before acquiring locks at a
165
    particular level, but after acquiring the ones at lower levels, and permits
166
    such calculations. It can be used to modify self.needed_locks, and by
167
    default it does nothing.
168

169
    This function is only called if you have something already set in
170
    self.needed_locks for the level.
171

172
    @param level: Locking level which is going to be locked
173
    @type level: member of ganeti.locking.LEVELS
174

175
    """
176

    
177
  def CheckPrereq(self):
178
    """Check prerequisites for this LU.
179

180
    This method should check that the prerequisites for the execution
181
    of this LU are fulfilled. It can do internode communication, but
182
    it should be idempotent - no cluster or system changes are
183
    allowed.
184

185
    The method should raise errors.OpPrereqError in case something is
186
    not fulfilled. Its return value is ignored.
187

188
    This method should also update all the parameters of the opcode to
189
    their canonical form if it hasn't been done by ExpandNames before.
190

191
    """
192
    raise NotImplementedError
193

    
194
  def Exec(self, feedback_fn):
195
    """Execute the LU.
196

197
    This method should implement the actual work. It should raise
198
    errors.OpExecError for failures that are somewhat dealt with in
199
    code, or expected.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def BuildHooksEnv(self):
205
    """Build hooks environment for this LU.
206

207
    This method should return a three-node tuple consisting of: a dict
208
    containing the environment that will be used for running the
209
    specific hook for this LU, a list of node names on which the hook
210
    should run before the execution, and a list of node names on which
211
    the hook should run after the execution.
212

213
    The keys of the dict must not have 'GANETI_' prefixed as this will
214
    be handled in the hooks runner. Also note additional keys will be
215
    added by the hooks runner. If the LU doesn't define any
216
    environment, an empty dict (and not None) should be returned.
217

218
    No nodes should be returned as an empty list (and not None).
219

220
    Note that if the HPATH for a LU class is None, this function will
221
    not be called.
222

223
    """
224
    raise NotImplementedError
225

    
226
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227
    """Notify the LU about the results of its hooks.
228

229
    This method is called every time a hooks phase is executed, and notifies
230
    the Logical Unit about the hooks' result. The LU can then use it to alter
231
    its result based on the hooks.  By default the method does nothing and the
232
    previous result is passed back unchanged but any LU can define it if it
233
    wants to use the local cluster hook-scripts somehow.
234

235
    Args:
236
      phase: the hooks phase that has just been run
237
      hooks_results: the results of the multi-node hooks rpc call
238
      feedback_fn: function to send feedback back to the caller
239
      lu_result: the previous result this LU had, or None in the PRE phase.
240

241
    """
242
    return lu_result
243

    
244
  def _ExpandAndLockInstance(self):
245
    """Helper function to expand and lock an instance.
246

247
    Many LUs that work on an instance take its name in self.op.instance_name
248
    and need to expand it and then declare the expanded name for locking. This
249
    function does it, and then updates self.op.instance_name to the expanded
250
    name. It also initializes needed_locks as a dict, if this hasn't been done
251
    before.
252

253
    """
254
    if self.needed_locks is None:
255
      self.needed_locks = {}
256
    else:
257
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258
        "_ExpandAndLockInstance called with instance-level locks set"
259
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260
    if expanded_name is None:
261
      raise errors.OpPrereqError("Instance '%s' not known" %
262
                                  self.op.instance_name)
263
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264
    self.op.instance_name = expanded_name
265

    
266
  def _LockInstancesNodes(self, primary_only=False):
267
    """Helper function to declare instances' nodes for locking.
268

269
    This function should be called after locking one or more instances to lock
270
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271
    with all primary or secondary nodes for instances already locked and
272
    present in self.needed_locks[locking.LEVEL_INSTANCE].
273

274
    It should be called from DeclareLocks, and for safety only works if
275
    self.recalculate_locks[locking.LEVEL_NODE] is set.
276

277
    In the future it may grow parameters to just lock some instance's nodes, or
278
    to just lock primaries or secondary nodes, if needed.
279

280
    If should be called in DeclareLocks in a way similar to:
281

282
    if level == locking.LEVEL_NODE:
283
      self._LockInstancesNodes()
284

285
    @type primary_only: boolean
286
    @param primary_only: only lock primary nodes of locked instances
287

288
    """
289
    assert locking.LEVEL_NODE in self.recalculate_locks, \
290
      "_LockInstancesNodes helper function called with no nodes to recalculate"
291

    
292
    # TODO: check if we're really been called with the instance locks held
293

    
294
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
295
    # future we might want to have different behaviors depending on the value
296
    # of self.recalculate_locks[locking.LEVEL_NODE]
297
    wanted_nodes = []
298
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
299
      instance = self.context.cfg.GetInstanceInfo(instance_name)
300
      wanted_nodes.append(instance.primary_node)
301
      if not primary_only:
302
        wanted_nodes.extend(instance.secondary_nodes)
303

    
304
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
305
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
306
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
307
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
308

    
309
    del self.recalculate_locks[locking.LEVEL_NODE]
310

    
311

    
312
class NoHooksLU(LogicalUnit):
313
  """Simple LU which runs no hooks.
314

315
  This LU is intended as a parent for other LogicalUnits which will
316
  run no hooks, in order to reduce duplicate code.
317

318
  """
319
  HPATH = None
320
  HTYPE = None
321

    
322

    
323
def _GetWantedNodes(lu, nodes):
324
  """Returns list of checked and expanded node names.
325

326
  Args:
327
    nodes: List of nodes (strings) or None for all
328

329
  """
330
  if not isinstance(nodes, list):
331
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
332

    
333
  if not nodes:
334
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
335
      " non-empty list of nodes whose name is to be expanded.")
336

    
337
  wanted = []
338
  for name in nodes:
339
    node = lu.cfg.ExpandNodeName(name)
340
    if node is None:
341
      raise errors.OpPrereqError("No such node name '%s'" % name)
342
    wanted.append(node)
343

    
344
  return utils.NiceSort(wanted)
345

    
346

    
347
def _GetWantedInstances(lu, instances):
348
  """Returns list of checked and expanded instance names.
349

350
  Args:
351
    instances: List of instances (strings) or None for all
352

353
  """
354
  if not isinstance(instances, list):
355
    raise errors.OpPrereqError("Invalid argument type 'instances'")
356

    
357
  if instances:
358
    wanted = []
359

    
360
    for name in instances:
361
      instance = lu.cfg.ExpandInstanceName(name)
362
      if instance is None:
363
        raise errors.OpPrereqError("No such instance name '%s'" % name)
364
      wanted.append(instance)
365

    
366
  else:
367
    wanted = lu.cfg.GetInstanceList()
368
  return utils.NiceSort(wanted)
369

    
370

    
371
def _CheckOutputFields(static, dynamic, selected):
372
  """Checks whether all selected fields are valid.
373

374
  Args:
375
    static: Static fields
376
    dynamic: Dynamic fields
377

378
  """
379
  static_fields = frozenset(static)
380
  dynamic_fields = frozenset(dynamic)
381

    
382
  all_fields = static_fields | dynamic_fields
383

    
384
  if not all_fields.issuperset(selected):
385
    raise errors.OpPrereqError("Unknown output fields selected: %s"
386
                               % ",".join(frozenset(selected).
387
                                          difference(all_fields)))
388

    
389

    
390
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
391
                          memory, vcpus, nics):
392
  """Builds instance related env variables for hooks from single variables.
393

394
  Args:
395
    secondary_nodes: List of secondary nodes as strings
396
  """
397
  env = {
398
    "OP_TARGET": name,
399
    "INSTANCE_NAME": name,
400
    "INSTANCE_PRIMARY": primary_node,
401
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
402
    "INSTANCE_OS_TYPE": os_type,
403
    "INSTANCE_STATUS": status,
404
    "INSTANCE_MEMORY": memory,
405
    "INSTANCE_VCPUS": vcpus,
406
  }
407

    
408
  if nics:
409
    nic_count = len(nics)
410
    for idx, (ip, bridge, mac) in enumerate(nics):
411
      if ip is None:
412
        ip = ""
413
      env["INSTANCE_NIC%d_IP" % idx] = ip
414
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
415
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
416
  else:
417
    nic_count = 0
418

    
419
  env["INSTANCE_NIC_COUNT"] = nic_count
420

    
421
  return env
422

    
423

    
424
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
425
  """Builds instance related env variables for hooks from an object.
426

427
  Args:
428
    instance: objects.Instance object of instance
429
    override: dict of values to override
430
  """
431
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
432
  args = {
433
    'name': instance.name,
434
    'primary_node': instance.primary_node,
435
    'secondary_nodes': instance.secondary_nodes,
436
    'os_type': instance.os,
437
    'status': instance.os,
438
    'memory': bep[constants.BE_MEMORY],
439
    'vcpus': bep[constants.BE_VCPUS],
440
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441
  }
442
  if override:
443
    args.update(override)
444
  return _BuildInstanceHookEnv(**args)
445

    
446

    
447
def _CheckInstanceBridgesExist(lu, instance):
448
  """Check that the brigdes needed by an instance exist.
449

450
  """
451
  # check bridges existance
452
  brlist = [nic.bridge for nic in instance.nics]
453
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
454
    raise errors.OpPrereqError("one or more target bridges %s does not"
455
                               " exist on destination node '%s'" %
456
                               (brlist, instance.primary_node))
457

    
458

    
459
class LUDestroyCluster(NoHooksLU):
460
  """Logical unit for destroying the cluster.
461

462
  """
463
  _OP_REQP = []
464

    
465
  def CheckPrereq(self):
466
    """Check prerequisites.
467

468
    This checks whether the cluster is empty.
469

470
    Any errors are signalled by raising errors.OpPrereqError.
471

472
    """
473
    master = self.cfg.GetMasterNode()
474

    
475
    nodelist = self.cfg.GetNodeList()
476
    if len(nodelist) != 1 or nodelist[0] != master:
477
      raise errors.OpPrereqError("There are still %d node(s) in"
478
                                 " this cluster." % (len(nodelist) - 1))
479
    instancelist = self.cfg.GetInstanceList()
480
    if instancelist:
481
      raise errors.OpPrereqError("There are still %d instance(s) in"
482
                                 " this cluster." % len(instancelist))
483

    
484
  def Exec(self, feedback_fn):
485
    """Destroys the cluster.
486

487
    """
488
    master = self.cfg.GetMasterNode()
489
    if not self.rpc.call_node_stop_master(master, False):
490
      raise errors.OpExecError("Could not disable the master role")
491
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492
    utils.CreateBackup(priv_key)
493
    utils.CreateBackup(pub_key)
494
    return master
495

    
496

    
497
class LUVerifyCluster(LogicalUnit):
498
  """Verifies the cluster status.
499

500
  """
501
  HPATH = "cluster-verify"
502
  HTYPE = constants.HTYPE_CLUSTER
503
  _OP_REQP = ["skip_checks"]
504
  REQ_BGL = False
505

    
506
  def ExpandNames(self):
507
    self.needed_locks = {
508
      locking.LEVEL_NODE: locking.ALL_SET,
509
      locking.LEVEL_INSTANCE: locking.ALL_SET,
510
    }
511
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512

    
513
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514
                  remote_version, feedback_fn):
515
    """Run multiple tests against a node.
516

517
    Test list:
518
      - compares ganeti version
519
      - checks vg existance and size > 20G
520
      - checks config file checksum
521
      - checks ssh to other nodes
522

523
    Args:
524
      node: name of the node to check
525
      file_list: required list of files
526
      local_cksum: dictionary of local files and their checksums
527

528
    """
529
    # compares ganeti version
530
    local_version = constants.PROTOCOL_VERSION
531
    if not remote_version:
532
      feedback_fn("  - ERROR: connection to %s failed" % (node))
533
      return True
534

    
535
    if local_version != remote_version:
536
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537
                      (local_version, node, remote_version))
538
      return True
539

    
540
    # checks vg existance and size > 20G
541

    
542
    bad = False
543
    if not vglist:
544
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545
                      (node,))
546
      bad = True
547
    else:
548
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549
                                            constants.MIN_VG_SIZE)
550
      if vgstatus:
551
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552
        bad = True
553

    
554
    if not node_result:
555
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
556
      return True
557

    
558
    # checks config file checksum
559
    # checks ssh to any
560

    
561
    if 'filelist' not in node_result:
562
      bad = True
563
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
564
    else:
565
      remote_cksum = node_result['filelist']
566
      for file_name in file_list:
567
        if file_name not in remote_cksum:
568
          bad = True
569
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
570
        elif remote_cksum[file_name] != local_cksum[file_name]:
571
          bad = True
572
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
573

    
574
    if 'nodelist' not in node_result:
575
      bad = True
576
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
577
    else:
578
      if node_result['nodelist']:
579
        bad = True
580
        for node in node_result['nodelist']:
581
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
582
                          (node, node_result['nodelist'][node]))
583
    if 'node-net-test' not in node_result:
584
      bad = True
585
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
586
    else:
587
      if node_result['node-net-test']:
588
        bad = True
589
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
590
        for node in nlist:
591
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
592
                          (node, node_result['node-net-test'][node]))
593

    
594
    hyp_result = node_result.get('hypervisor', None)
595
    if isinstance(hyp_result, dict):
596
      for hv_name, hv_result in hyp_result.iteritems():
597
        if hv_result is not None:
598
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
599
                      (hv_name, hv_result))
600
    return bad
601

    
602
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
603
                      node_instance, feedback_fn):
604
    """Verify an instance.
605

606
    This function checks to see if the required block devices are
607
    available on the instance's node.
608

609
    """
610
    bad = False
611

    
612
    node_current = instanceconfig.primary_node
613

    
614
    node_vol_should = {}
615
    instanceconfig.MapLVsByNode(node_vol_should)
616

    
617
    for node in node_vol_should:
618
      for volume in node_vol_should[node]:
619
        if node not in node_vol_is or volume not in node_vol_is[node]:
620
          feedback_fn("  - ERROR: volume %s missing on node %s" %
621
                          (volume, node))
622
          bad = True
623

    
624
    if not instanceconfig.status == 'down':
625
      if (node_current not in node_instance or
626
          not instance in node_instance[node_current]):
627
        feedback_fn("  - ERROR: instance %s not running on node %s" %
628
                        (instance, node_current))
629
        bad = True
630

    
631
    for node in node_instance:
632
      if (not node == node_current):
633
        if instance in node_instance[node]:
634
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
635
                          (instance, node))
636
          bad = True
637

    
638
    return bad
639

    
640
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
641
    """Verify if there are any unknown volumes in the cluster.
642

643
    The .os, .swap and backup volumes are ignored. All other volumes are
644
    reported as unknown.
645

646
    """
647
    bad = False
648

    
649
    for node in node_vol_is:
650
      for volume in node_vol_is[node]:
651
        if node not in node_vol_should or volume not in node_vol_should[node]:
652
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
653
                      (volume, node))
654
          bad = True
655
    return bad
656

    
657
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
658
    """Verify the list of running instances.
659

660
    This checks what instances are running but unknown to the cluster.
661

662
    """
663
    bad = False
664
    for node in node_instance:
665
      for runninginstance in node_instance[node]:
666
        if runninginstance not in instancelist:
667
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
668
                          (runninginstance, node))
669
          bad = True
670
    return bad
671

    
672
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
673
    """Verify N+1 Memory Resilience.
674

675
    Check that if one single node dies we can still start all the instances it
676
    was primary for.
677

678
    """
679
    bad = False
680

    
681
    for node, nodeinfo in node_info.iteritems():
682
      # This code checks that every node which is now listed as secondary has
683
      # enough memory to host all instances it is supposed to should a single
684
      # other node in the cluster fail.
685
      # FIXME: not ready for failover to an arbitrary node
686
      # FIXME: does not support file-backed instances
687
      # WARNING: we currently take into account down instances as well as up
688
      # ones, considering that even if they're down someone might want to start
689
      # them even in the event of a node failure.
690
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
691
        needed_mem = 0
692
        for instance in instances:
693
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
694
          if bep[constants.BE_AUTO_BALANCE]:
695
            needed_mem += bep[constants.BE_MEMORY]
696
        if nodeinfo['mfree'] < needed_mem:
697
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
698
                      " failovers should node %s fail" % (node, prinode))
699
          bad = True
700
    return bad
701

    
702
  def CheckPrereq(self):
703
    """Check prerequisites.
704

705
    Transform the list of checks we're going to skip into a set and check that
706
    all its members are valid.
707

708
    """
709
    self.skip_set = frozenset(self.op.skip_checks)
710
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
711
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
712

    
713
  def BuildHooksEnv(self):
714
    """Build hooks env.
715

716
    Cluster-Verify hooks just rone in the post phase and their failure makes
717
    the output be logged in the verify output and the verification to fail.
718

719
    """
720
    all_nodes = self.cfg.GetNodeList()
721
    # TODO: populate the environment with useful information for verify hooks
722
    env = {}
723
    return env, [], all_nodes
724

    
725
  def Exec(self, feedback_fn):
726
    """Verify integrity of cluster, performing various test on nodes.
727

728
    """
729
    bad = False
730
    feedback_fn("* Verifying global settings")
731
    for msg in self.cfg.VerifyConfig():
732
      feedback_fn("  - ERROR: %s" % msg)
733

    
734
    vg_name = self.cfg.GetVGName()
735
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
736
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
737
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
738
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
739
    i_non_redundant = [] # Non redundant instances
740
    i_non_a_balanced = [] # Non auto-balanced instances
741
    node_volume = {}
742
    node_instance = {}
743
    node_info = {}
744
    instance_cfg = {}
745

    
746
    # FIXME: verify OS list
747
    # do local checksums
748
    file_names = []
749
    file_names.append(constants.SSL_CERT_FILE)
750
    file_names.append(constants.CLUSTER_CONF_FILE)
751
    local_checksums = utils.FingerprintFiles(file_names)
752

    
753
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
754
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
755
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
756
    all_vglist = self.rpc.call_vg_list(nodelist)
757
    node_verify_param = {
758
      'filelist': file_names,
759
      'nodelist': nodelist,
760
      'hypervisor': hypervisors,
761
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
762
                        for node in nodeinfo]
763
      }
764
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
765
                                           self.cfg.GetClusterName())
766
    all_rversion = self.rpc.call_version(nodelist)
767
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
768
                                        self.cfg.GetHypervisorType())
769

    
770
    cluster = self.cfg.GetClusterInfo()
771
    for node in nodelist:
772
      feedback_fn("* Verifying node %s" % node)
773
      result = self._VerifyNode(node, file_names, local_checksums,
774
                                all_vglist[node], all_nvinfo[node],
775
                                all_rversion[node], feedback_fn)
776
      bad = bad or result
777

    
778
      # node_volume
779
      volumeinfo = all_volumeinfo[node]
780

    
781
      if isinstance(volumeinfo, basestring):
782
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
783
                    (node, volumeinfo[-400:].encode('string_escape')))
784
        bad = True
785
        node_volume[node] = {}
786
      elif not isinstance(volumeinfo, dict):
787
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
788
        bad = True
789
        continue
790
      else:
791
        node_volume[node] = volumeinfo
792

    
793
      # node_instance
794
      nodeinstance = all_instanceinfo[node]
795
      if type(nodeinstance) != list:
796
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
797
        bad = True
798
        continue
799

    
800
      node_instance[node] = nodeinstance
801

    
802
      # node_info
803
      nodeinfo = all_ninfo[node]
804
      if not isinstance(nodeinfo, dict):
805
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
806
        bad = True
807
        continue
808

    
809
      try:
810
        node_info[node] = {
811
          "mfree": int(nodeinfo['memory_free']),
812
          "dfree": int(nodeinfo['vg_free']),
813
          "pinst": [],
814
          "sinst": [],
815
          # dictionary holding all instances this node is secondary for,
816
          # grouped by their primary node. Each key is a cluster node, and each
817
          # value is a list of instances which have the key as primary and the
818
          # current node as secondary.  this is handy to calculate N+1 memory
819
          # availability if you can only failover from a primary to its
820
          # secondary.
821
          "sinst-by-pnode": {},
822
        }
823
      except ValueError:
824
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
825
        bad = True
826
        continue
827

    
828
    node_vol_should = {}
829

    
830
    for instance in instancelist:
831
      feedback_fn("* Verifying instance %s" % instance)
832
      inst_config = self.cfg.GetInstanceInfo(instance)
833
      result =  self._VerifyInstance(instance, inst_config, node_volume,
834
                                     node_instance, feedback_fn)
835
      bad = bad or result
836

    
837
      inst_config.MapLVsByNode(node_vol_should)
838

    
839
      instance_cfg[instance] = inst_config
840

    
841
      pnode = inst_config.primary_node
842
      if pnode in node_info:
843
        node_info[pnode]['pinst'].append(instance)
844
      else:
845
        feedback_fn("  - ERROR: instance %s, connection to primary node"
846
                    " %s failed" % (instance, pnode))
847
        bad = True
848

    
849
      # If the instance is non-redundant we cannot survive losing its primary
850
      # node, so we are not N+1 compliant. On the other hand we have no disk
851
      # templates with more than one secondary so that situation is not well
852
      # supported either.
853
      # FIXME: does not support file-backed instances
854
      if len(inst_config.secondary_nodes) == 0:
855
        i_non_redundant.append(instance)
856
      elif len(inst_config.secondary_nodes) > 1:
857
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
858
                    % instance)
859

    
860
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
861
        i_non_a_balanced.append(instance)
862

    
863
      for snode in inst_config.secondary_nodes:
864
        if snode in node_info:
865
          node_info[snode]['sinst'].append(instance)
866
          if pnode not in node_info[snode]['sinst-by-pnode']:
867
            node_info[snode]['sinst-by-pnode'][pnode] = []
868
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
869
        else:
870
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
871
                      " %s failed" % (instance, snode))
872

    
873
    feedback_fn("* Verifying orphan volumes")
874
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
875
                                       feedback_fn)
876
    bad = bad or result
877

    
878
    feedback_fn("* Verifying remaining instances")
879
    result = self._VerifyOrphanInstances(instancelist, node_instance,
880
                                         feedback_fn)
881
    bad = bad or result
882

    
883
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
884
      feedback_fn("* Verifying N+1 Memory redundancy")
885
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
886
      bad = bad or result
887

    
888
    feedback_fn("* Other Notes")
889
    if i_non_redundant:
890
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
891
                  % len(i_non_redundant))
892

    
893
    if i_non_a_balanced:
894
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
895
                  % len(i_non_a_balanced))
896

    
897
    return not bad
898

    
899
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
900
    """Analize the post-hooks' result, handle it, and send some
901
    nicely-formatted feedback back to the user.
902

903
    Args:
904
      phase: the hooks phase that has just been run
905
      hooks_results: the results of the multi-node hooks rpc call
906
      feedback_fn: function to send feedback back to the caller
907
      lu_result: previous Exec result
908

909
    """
910
    # We only really run POST phase hooks, and are only interested in
911
    # their results
912
    if phase == constants.HOOKS_PHASE_POST:
913
      # Used to change hooks' output to proper indentation
914
      indent_re = re.compile('^', re.M)
915
      feedback_fn("* Hooks Results")
916
      if not hooks_results:
917
        feedback_fn("  - ERROR: general communication failure")
918
        lu_result = 1
919
      else:
920
        for node_name in hooks_results:
921
          show_node_header = True
922
          res = hooks_results[node_name]
923
          if res is False or not isinstance(res, list):
924
            feedback_fn("    Communication failure")
925
            lu_result = 1
926
            continue
927
          for script, hkr, output in res:
928
            if hkr == constants.HKR_FAIL:
929
              # The node header is only shown once, if there are
930
              # failing hooks on that node
931
              if show_node_header:
932
                feedback_fn("  Node %s:" % node_name)
933
                show_node_header = False
934
              feedback_fn("    ERROR: Script %s failed, output:" % script)
935
              output = indent_re.sub('      ', output)
936
              feedback_fn("%s" % output)
937
              lu_result = 1
938

    
939
      return lu_result
940

    
941

    
942
class LUVerifyDisks(NoHooksLU):
943
  """Verifies the cluster disks status.
944

945
  """
946
  _OP_REQP = []
947
  REQ_BGL = False
948

    
949
  def ExpandNames(self):
950
    self.needed_locks = {
951
      locking.LEVEL_NODE: locking.ALL_SET,
952
      locking.LEVEL_INSTANCE: locking.ALL_SET,
953
    }
954
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
955

    
956
  def CheckPrereq(self):
957
    """Check prerequisites.
958

959
    This has no prerequisites.
960

961
    """
962
    pass
963

    
964
  def Exec(self, feedback_fn):
965
    """Verify integrity of cluster disks.
966

967
    """
968
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
969

    
970
    vg_name = self.cfg.GetVGName()
971
    nodes = utils.NiceSort(self.cfg.GetNodeList())
972
    instances = [self.cfg.GetInstanceInfo(name)
973
                 for name in self.cfg.GetInstanceList()]
974

    
975
    nv_dict = {}
976
    for inst in instances:
977
      inst_lvs = {}
978
      if (inst.status != "up" or
979
          inst.disk_template not in constants.DTS_NET_MIRROR):
980
        continue
981
      inst.MapLVsByNode(inst_lvs)
982
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
983
      for node, vol_list in inst_lvs.iteritems():
984
        for vol in vol_list:
985
          nv_dict[(node, vol)] = inst
986

    
987
    if not nv_dict:
988
      return result
989

    
990
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
991

    
992
    to_act = set()
993
    for node in nodes:
994
      # node_volume
995
      lvs = node_lvs[node]
996

    
997
      if isinstance(lvs, basestring):
998
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
999
        res_nlvm[node] = lvs
1000
      elif not isinstance(lvs, dict):
1001
        logger.Info("connection to node %s failed or invalid data returned" %
1002
                    (node,))
1003
        res_nodes.append(node)
1004
        continue
1005

    
1006
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1007
        inst = nv_dict.pop((node, lv_name), None)
1008
        if (not lv_online and inst is not None
1009
            and inst.name not in res_instances):
1010
          res_instances.append(inst.name)
1011

    
1012
    # any leftover items in nv_dict are missing LVs, let's arrange the
1013
    # data better
1014
    for key, inst in nv_dict.iteritems():
1015
      if inst.name not in res_missing:
1016
        res_missing[inst.name] = []
1017
      res_missing[inst.name].append(key)
1018

    
1019
    return result
1020

    
1021

    
1022
class LURenameCluster(LogicalUnit):
1023
  """Rename the cluster.
1024

1025
  """
1026
  HPATH = "cluster-rename"
1027
  HTYPE = constants.HTYPE_CLUSTER
1028
  _OP_REQP = ["name"]
1029

    
1030
  def BuildHooksEnv(self):
1031
    """Build hooks env.
1032

1033
    """
1034
    env = {
1035
      "OP_TARGET": self.cfg.GetClusterName(),
1036
      "NEW_NAME": self.op.name,
1037
      }
1038
    mn = self.cfg.GetMasterNode()
1039
    return env, [mn], [mn]
1040

    
1041
  def CheckPrereq(self):
1042
    """Verify that the passed name is a valid one.
1043

1044
    """
1045
    hostname = utils.HostInfo(self.op.name)
1046

    
1047
    new_name = hostname.name
1048
    self.ip = new_ip = hostname.ip
1049
    old_name = self.cfg.GetClusterName()
1050
    old_ip = self.cfg.GetMasterIP()
1051
    if new_name == old_name and new_ip == old_ip:
1052
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1053
                                 " cluster has changed")
1054
    if new_ip != old_ip:
1055
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1056
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1057
                                   " reachable on the network. Aborting." %
1058
                                   new_ip)
1059

    
1060
    self.op.name = new_name
1061

    
1062
  def Exec(self, feedback_fn):
1063
    """Rename the cluster.
1064

1065
    """
1066
    clustername = self.op.name
1067
    ip = self.ip
1068

    
1069
    # shutdown the master IP
1070
    master = self.cfg.GetMasterNode()
1071
    if not self.rpc.call_node_stop_master(master, False):
1072
      raise errors.OpExecError("Could not disable the master role")
1073

    
1074
    try:
1075
      # modify the sstore
1076
      # TODO: sstore
1077
      ss.SetKey(ss.SS_MASTER_IP, ip)
1078
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1079

    
1080
      # Distribute updated ss config to all nodes
1081
      myself = self.cfg.GetNodeInfo(master)
1082
      dist_nodes = self.cfg.GetNodeList()
1083
      if myself.name in dist_nodes:
1084
        dist_nodes.remove(myself.name)
1085

    
1086
      logger.Debug("Copying updated ssconf data to all nodes")
1087
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1088
        fname = ss.KeyToFilename(keyname)
1089
        result = self.rpc.call_upload_file(dist_nodes, fname)
1090
        for to_node in dist_nodes:
1091
          if not result[to_node]:
1092
            logger.Error("copy of file %s to node %s failed" %
1093
                         (fname, to_node))
1094
    finally:
1095
      if not self.rpc.call_node_start_master(master, False):
1096
        logger.Error("Could not re-enable the master role on the master,"
1097
                     " please restart manually.")
1098

    
1099

    
1100
def _RecursiveCheckIfLVMBased(disk):
1101
  """Check if the given disk or its children are lvm-based.
1102

1103
  Args:
1104
    disk: ganeti.objects.Disk object
1105

1106
  Returns:
1107
    boolean indicating whether a LD_LV dev_type was found or not
1108

1109
  """
1110
  if disk.children:
1111
    for chdisk in disk.children:
1112
      if _RecursiveCheckIfLVMBased(chdisk):
1113
        return True
1114
  return disk.dev_type == constants.LD_LV
1115

    
1116

    
1117
class LUSetClusterParams(LogicalUnit):
1118
  """Change the parameters of the cluster.
1119

1120
  """
1121
  HPATH = "cluster-modify"
1122
  HTYPE = constants.HTYPE_CLUSTER
1123
  _OP_REQP = []
1124
  REQ_BGL = False
1125

    
1126
  def ExpandNames(self):
1127
    # FIXME: in the future maybe other cluster params won't require checking on
1128
    # all nodes to be modified.
1129
    self.needed_locks = {
1130
      locking.LEVEL_NODE: locking.ALL_SET,
1131
    }
1132
    self.share_locks[locking.LEVEL_NODE] = 1
1133

    
1134
  def BuildHooksEnv(self):
1135
    """Build hooks env.
1136

1137
    """
1138
    env = {
1139
      "OP_TARGET": self.cfg.GetClusterName(),
1140
      "NEW_VG_NAME": self.op.vg_name,
1141
      }
1142
    mn = self.cfg.GetMasterNode()
1143
    return env, [mn], [mn]
1144

    
1145
  def CheckPrereq(self):
1146
    """Check prerequisites.
1147

1148
    This checks whether the given params don't conflict and
1149
    if the given volume group is valid.
1150

1151
    """
1152
    # FIXME: This only works because there is only one parameter that can be
1153
    # changed or removed.
1154
    if self.op.vg_name is not None and not self.op.vg_name:
1155
      instances = self.cfg.GetAllInstancesInfo().values()
1156
      for inst in instances:
1157
        for disk in inst.disks:
1158
          if _RecursiveCheckIfLVMBased(disk):
1159
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1160
                                       " lvm-based instances exist")
1161

    
1162
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1163

    
1164
    # if vg_name not None, checks given volume group on all nodes
1165
    if self.op.vg_name:
1166
      vglist = self.rpc.call_vg_list(node_list)
1167
      for node in node_list:
1168
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1169
                                              constants.MIN_VG_SIZE)
1170
        if vgstatus:
1171
          raise errors.OpPrereqError("Error on node '%s': %s" %
1172
                                     (node, vgstatus))
1173

    
1174
    self.cluster = cluster = self.cfg.GetClusterInfo()
1175
    # beparams changes do not need validation (we can't validate?),
1176
    # but we still process here
1177
    if self.op.beparams:
1178
      self.new_beparams = cluster.FillDict(
1179
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1180

    
1181
    # hypervisor list/parameters
1182
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1183
    if self.op.hvparams:
1184
      if not isinstance(self.op.hvparams, dict):
1185
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1186
      for hv_name, hv_dict in self.op.hvparams.items():
1187
        if hv_name not in self.new_hvparams:
1188
          self.new_hvparams[hv_name] = hv_dict
1189
        else:
1190
          self.new_hvparams[hv_name].update(hv_dict)
1191

    
1192
    if self.op.enabled_hypervisors is not None:
1193
      self.hv_list = self.op.enabled_hypervisors
1194
    else:
1195
      self.hv_list = cluster.enabled_hypervisors
1196

    
1197
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1198
      # either the enabled list has changed, or the parameters have, validate
1199
      for hv_name, hv_params in self.new_hvparams.items():
1200
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1201
            (self.op.enabled_hypervisors and
1202
             hv_name in self.op.enabled_hypervisors)):
1203
          # either this is a new hypervisor, or its parameters have changed
1204
          hv_class = hypervisor.GetHypervisor(hv_name)
1205
          hv_class.CheckParameterSyntax(hv_params)
1206
          _CheckHVParams(self, node_list, hv_name, hv_params)
1207

    
1208
  def Exec(self, feedback_fn):
1209
    """Change the parameters of the cluster.
1210

1211
    """
1212
    if self.op.vg_name is not None:
1213
      if self.op.vg_name != self.cfg.GetVGName():
1214
        self.cfg.SetVGName(self.op.vg_name)
1215
      else:
1216
        feedback_fn("Cluster LVM configuration already in desired"
1217
                    " state, not changing")
1218
    if self.op.hvparams:
1219
      self.cluster.hvparams = self.new_hvparams
1220
    if self.op.enabled_hypervisors is not None:
1221
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1222
    if self.op.beparams:
1223
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1224
    self.cfg.Update(self.cluster)
1225

    
1226

    
1227
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1228
  """Sleep and poll for an instance's disk to sync.
1229

1230
  """
1231
  if not instance.disks:
1232
    return True
1233

    
1234
  if not oneshot:
1235
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1236

    
1237
  node = instance.primary_node
1238

    
1239
  for dev in instance.disks:
1240
    lu.cfg.SetDiskID(dev, node)
1241

    
1242
  retries = 0
1243
  while True:
1244
    max_time = 0
1245
    done = True
1246
    cumul_degraded = False
1247
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1248
    if not rstats:
1249
      lu.proc.LogWarning("Can't get any data from node %s" % node)
1250
      retries += 1
1251
      if retries >= 10:
1252
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1253
                                 " aborting." % node)
1254
      time.sleep(6)
1255
      continue
1256
    retries = 0
1257
    for i in range(len(rstats)):
1258
      mstat = rstats[i]
1259
      if mstat is None:
1260
        lu.proc.LogWarning("Can't compute data for node %s/%s" %
1261
                           (node, instance.disks[i].iv_name))
1262
        continue
1263
      # we ignore the ldisk parameter
1264
      perc_done, est_time, is_degraded, _ = mstat
1265
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1266
      if perc_done is not None:
1267
        done = False
1268
        if est_time is not None:
1269
          rem_time = "%d estimated seconds remaining" % est_time
1270
          max_time = est_time
1271
        else:
1272
          rem_time = "no time estimate"
1273
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1274
                        (instance.disks[i].iv_name, perc_done, rem_time))
1275
    if done or oneshot:
1276
      break
1277

    
1278
    time.sleep(min(60, max_time))
1279

    
1280
  if done:
1281
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1282
  return not cumul_degraded
1283

    
1284

    
1285
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1286
  """Check that mirrors are not degraded.
1287

1288
  The ldisk parameter, if True, will change the test from the
1289
  is_degraded attribute (which represents overall non-ok status for
1290
  the device(s)) to the ldisk (representing the local storage status).
1291

1292
  """
1293
  lu.cfg.SetDiskID(dev, node)
1294
  if ldisk:
1295
    idx = 6
1296
  else:
1297
    idx = 5
1298

    
1299
  result = True
1300
  if on_primary or dev.AssembleOnSecondary():
1301
    rstats = lu.rpc.call_blockdev_find(node, dev)
1302
    if not rstats:
1303
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1304
      result = False
1305
    else:
1306
      result = result and (not rstats[idx])
1307
  if dev.children:
1308
    for child in dev.children:
1309
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1310

    
1311
  return result
1312

    
1313

    
1314
class LUDiagnoseOS(NoHooksLU):
1315
  """Logical unit for OS diagnose/query.
1316

1317
  """
1318
  _OP_REQP = ["output_fields", "names"]
1319
  REQ_BGL = False
1320

    
1321
  def ExpandNames(self):
1322
    if self.op.names:
1323
      raise errors.OpPrereqError("Selective OS query not supported")
1324

    
1325
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1326
    _CheckOutputFields(static=[],
1327
                       dynamic=self.dynamic_fields,
1328
                       selected=self.op.output_fields)
1329

    
1330
    # Lock all nodes, in shared mode
1331
    self.needed_locks = {}
1332
    self.share_locks[locking.LEVEL_NODE] = 1
1333
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1334

    
1335
  def CheckPrereq(self):
1336
    """Check prerequisites.
1337

1338
    """
1339

    
1340
  @staticmethod
1341
  def _DiagnoseByOS(node_list, rlist):
1342
    """Remaps a per-node return list into an a per-os per-node dictionary
1343

1344
      Args:
1345
        node_list: a list with the names of all nodes
1346
        rlist: a map with node names as keys and OS objects as values
1347

1348
      Returns:
1349
        map: a map with osnames as keys and as value another map, with
1350
             nodes as
1351
             keys and list of OS objects as values
1352
             e.g. {"debian-etch": {"node1": [<object>,...],
1353
                                   "node2": [<object>,]}
1354
                  }
1355

1356
    """
1357
    all_os = {}
1358
    for node_name, nr in rlist.iteritems():
1359
      if not nr:
1360
        continue
1361
      for os_obj in nr:
1362
        if os_obj.name not in all_os:
1363
          # build a list of nodes for this os containing empty lists
1364
          # for each node in node_list
1365
          all_os[os_obj.name] = {}
1366
          for nname in node_list:
1367
            all_os[os_obj.name][nname] = []
1368
        all_os[os_obj.name][node_name].append(os_obj)
1369
    return all_os
1370

    
1371
  def Exec(self, feedback_fn):
1372
    """Compute the list of OSes.
1373

1374
    """
1375
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1376
    node_data = self.rpc.call_os_diagnose(node_list)
1377
    if node_data == False:
1378
      raise errors.OpExecError("Can't gather the list of OSes")
1379
    pol = self._DiagnoseByOS(node_list, node_data)
1380
    output = []
1381
    for os_name, os_data in pol.iteritems():
1382
      row = []
1383
      for field in self.op.output_fields:
1384
        if field == "name":
1385
          val = os_name
1386
        elif field == "valid":
1387
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1388
        elif field == "node_status":
1389
          val = {}
1390
          for node_name, nos_list in os_data.iteritems():
1391
            val[node_name] = [(v.status, v.path) for v in nos_list]
1392
        else:
1393
          raise errors.ParameterError(field)
1394
        row.append(val)
1395
      output.append(row)
1396

    
1397
    return output
1398

    
1399

    
1400
class LURemoveNode(LogicalUnit):
1401
  """Logical unit for removing a node.
1402

1403
  """
1404
  HPATH = "node-remove"
1405
  HTYPE = constants.HTYPE_NODE
1406
  _OP_REQP = ["node_name"]
1407

    
1408
  def BuildHooksEnv(self):
1409
    """Build hooks env.
1410

1411
    This doesn't run on the target node in the pre phase as a failed
1412
    node would then be impossible to remove.
1413

1414
    """
1415
    env = {
1416
      "OP_TARGET": self.op.node_name,
1417
      "NODE_NAME": self.op.node_name,
1418
      }
1419
    all_nodes = self.cfg.GetNodeList()
1420
    all_nodes.remove(self.op.node_name)
1421
    return env, all_nodes, all_nodes
1422

    
1423
  def CheckPrereq(self):
1424
    """Check prerequisites.
1425

1426
    This checks:
1427
     - the node exists in the configuration
1428
     - it does not have primary or secondary instances
1429
     - it's not the master
1430

1431
    Any errors are signalled by raising errors.OpPrereqError.
1432

1433
    """
1434
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1435
    if node is None:
1436
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1437

    
1438
    instance_list = self.cfg.GetInstanceList()
1439

    
1440
    masternode = self.cfg.GetMasterNode()
1441
    if node.name == masternode:
1442
      raise errors.OpPrereqError("Node is the master node,"
1443
                                 " you need to failover first.")
1444

    
1445
    for instance_name in instance_list:
1446
      instance = self.cfg.GetInstanceInfo(instance_name)
1447
      if node.name == instance.primary_node:
1448
        raise errors.OpPrereqError("Instance %s still running on the node,"
1449
                                   " please remove first." % instance_name)
1450
      if node.name in instance.secondary_nodes:
1451
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1452
                                   " please remove first." % instance_name)
1453
    self.op.node_name = node.name
1454
    self.node = node
1455

    
1456
  def Exec(self, feedback_fn):
1457
    """Removes the node from the cluster.
1458

1459
    """
1460
    node = self.node
1461
    logger.Info("stopping the node daemon and removing configs from node %s" %
1462
                node.name)
1463

    
1464
    self.context.RemoveNode(node.name)
1465

    
1466
    self.rpc.call_node_leave_cluster(node.name)
1467

    
1468

    
1469
class LUQueryNodes(NoHooksLU):
1470
  """Logical unit for querying nodes.
1471

1472
  """
1473
  _OP_REQP = ["output_fields", "names"]
1474
  REQ_BGL = False
1475

    
1476
  def ExpandNames(self):
1477
    self.dynamic_fields = frozenset([
1478
      "dtotal", "dfree",
1479
      "mtotal", "mnode", "mfree",
1480
      "bootid",
1481
      "ctotal",
1482
      ])
1483

    
1484
    self.static_fields = frozenset([
1485
      "name", "pinst_cnt", "sinst_cnt",
1486
      "pinst_list", "sinst_list",
1487
      "pip", "sip", "tags",
1488
      "serial_no",
1489
      ])
1490

    
1491
    _CheckOutputFields(static=self.static_fields,
1492
                       dynamic=self.dynamic_fields,
1493
                       selected=self.op.output_fields)
1494

    
1495
    self.needed_locks = {}
1496
    self.share_locks[locking.LEVEL_NODE] = 1
1497

    
1498
    if self.op.names:
1499
      self.wanted = _GetWantedNodes(self, self.op.names)
1500
    else:
1501
      self.wanted = locking.ALL_SET
1502

    
1503
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1504
    if self.do_locking:
1505
      # if we don't request only static fields, we need to lock the nodes
1506
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1507

    
1508

    
1509
  def CheckPrereq(self):
1510
    """Check prerequisites.
1511

1512
    """
1513
    # The validation of the node list is done in the _GetWantedNodes,
1514
    # if non empty, and if empty, there's no validation to do
1515
    pass
1516

    
1517
  def Exec(self, feedback_fn):
1518
    """Computes the list of nodes and their attributes.
1519

1520
    """
1521
    all_info = self.cfg.GetAllNodesInfo()
1522
    if self.do_locking:
1523
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1524
    elif self.wanted != locking.ALL_SET:
1525
      nodenames = self.wanted
1526
      missing = set(nodenames).difference(all_info.keys())
1527
      if missing:
1528
        raise errors.OpExecError(
1529
          "Some nodes were removed before retrieving their data: %s" % missing)
1530
    else:
1531
      nodenames = all_info.keys()
1532

    
1533
    nodenames = utils.NiceSort(nodenames)
1534
    nodelist = [all_info[name] for name in nodenames]
1535

    
1536
    # begin data gathering
1537

    
1538
    if self.dynamic_fields.intersection(self.op.output_fields):
1539
      live_data = {}
1540
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1541
                                          self.cfg.GetHypervisorType())
1542
      for name in nodenames:
1543
        nodeinfo = node_data.get(name, None)
1544
        if nodeinfo:
1545
          live_data[name] = {
1546
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1547
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1548
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1549
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1550
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1551
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1552
            "bootid": nodeinfo['bootid'],
1553
            }
1554
        else:
1555
          live_data[name] = {}
1556
    else:
1557
      live_data = dict.fromkeys(nodenames, {})
1558

    
1559
    node_to_primary = dict([(name, set()) for name in nodenames])
1560
    node_to_secondary = dict([(name, set()) for name in nodenames])
1561

    
1562
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1563
                             "sinst_cnt", "sinst_list"))
1564
    if inst_fields & frozenset(self.op.output_fields):
1565
      instancelist = self.cfg.GetInstanceList()
1566

    
1567
      for instance_name in instancelist:
1568
        inst = self.cfg.GetInstanceInfo(instance_name)
1569
        if inst.primary_node in node_to_primary:
1570
          node_to_primary[inst.primary_node].add(inst.name)
1571
        for secnode in inst.secondary_nodes:
1572
          if secnode in node_to_secondary:
1573
            node_to_secondary[secnode].add(inst.name)
1574

    
1575
    # end data gathering
1576

    
1577
    output = []
1578
    for node in nodelist:
1579
      node_output = []
1580
      for field in self.op.output_fields:
1581
        if field == "name":
1582
          val = node.name
1583
        elif field == "pinst_list":
1584
          val = list(node_to_primary[node.name])
1585
        elif field == "sinst_list":
1586
          val = list(node_to_secondary[node.name])
1587
        elif field == "pinst_cnt":
1588
          val = len(node_to_primary[node.name])
1589
        elif field == "sinst_cnt":
1590
          val = len(node_to_secondary[node.name])
1591
        elif field == "pip":
1592
          val = node.primary_ip
1593
        elif field == "sip":
1594
          val = node.secondary_ip
1595
        elif field == "tags":
1596
          val = list(node.GetTags())
1597
        elif field == "serial_no":
1598
          val = node.serial_no
1599
        elif field in self.dynamic_fields:
1600
          val = live_data[node.name].get(field, None)
1601
        else:
1602
          raise errors.ParameterError(field)
1603
        node_output.append(val)
1604
      output.append(node_output)
1605

    
1606
    return output
1607

    
1608

    
1609
class LUQueryNodeVolumes(NoHooksLU):
1610
  """Logical unit for getting volumes on node(s).
1611

1612
  """
1613
  _OP_REQP = ["nodes", "output_fields"]
1614
  REQ_BGL = False
1615

    
1616
  def ExpandNames(self):
1617
    _CheckOutputFields(static=["node"],
1618
                       dynamic=["phys", "vg", "name", "size", "instance"],
1619
                       selected=self.op.output_fields)
1620

    
1621
    self.needed_locks = {}
1622
    self.share_locks[locking.LEVEL_NODE] = 1
1623
    if not self.op.nodes:
1624
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1625
    else:
1626
      self.needed_locks[locking.LEVEL_NODE] = \
1627
        _GetWantedNodes(self, self.op.nodes)
1628

    
1629
  def CheckPrereq(self):
1630
    """Check prerequisites.
1631

1632
    This checks that the fields required are valid output fields.
1633

1634
    """
1635
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1636

    
1637
  def Exec(self, feedback_fn):
1638
    """Computes the list of nodes and their attributes.
1639

1640
    """
1641
    nodenames = self.nodes
1642
    volumes = self.rpc.call_node_volumes(nodenames)
1643

    
1644
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1645
             in self.cfg.GetInstanceList()]
1646

    
1647
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1648

    
1649
    output = []
1650
    for node in nodenames:
1651
      if node not in volumes or not volumes[node]:
1652
        continue
1653

    
1654
      node_vols = volumes[node][:]
1655
      node_vols.sort(key=lambda vol: vol['dev'])
1656

    
1657
      for vol in node_vols:
1658
        node_output = []
1659
        for field in self.op.output_fields:
1660
          if field == "node":
1661
            val = node
1662
          elif field == "phys":
1663
            val = vol['dev']
1664
          elif field == "vg":
1665
            val = vol['vg']
1666
          elif field == "name":
1667
            val = vol['name']
1668
          elif field == "size":
1669
            val = int(float(vol['size']))
1670
          elif field == "instance":
1671
            for inst in ilist:
1672
              if node not in lv_by_node[inst]:
1673
                continue
1674
              if vol['name'] in lv_by_node[inst][node]:
1675
                val = inst.name
1676
                break
1677
            else:
1678
              val = '-'
1679
          else:
1680
            raise errors.ParameterError(field)
1681
          node_output.append(str(val))
1682

    
1683
        output.append(node_output)
1684

    
1685
    return output
1686

    
1687

    
1688
class LUAddNode(LogicalUnit):
1689
  """Logical unit for adding node to the cluster.
1690

1691
  """
1692
  HPATH = "node-add"
1693
  HTYPE = constants.HTYPE_NODE
1694
  _OP_REQP = ["node_name"]
1695

    
1696
  def BuildHooksEnv(self):
1697
    """Build hooks env.
1698

1699
    This will run on all nodes before, and on all nodes + the new node after.
1700

1701
    """
1702
    env = {
1703
      "OP_TARGET": self.op.node_name,
1704
      "NODE_NAME": self.op.node_name,
1705
      "NODE_PIP": self.op.primary_ip,
1706
      "NODE_SIP": self.op.secondary_ip,
1707
      }
1708
    nodes_0 = self.cfg.GetNodeList()
1709
    nodes_1 = nodes_0 + [self.op.node_name, ]
1710
    return env, nodes_0, nodes_1
1711

    
1712
  def CheckPrereq(self):
1713
    """Check prerequisites.
1714

1715
    This checks:
1716
     - the new node is not already in the config
1717
     - it is resolvable
1718
     - its parameters (single/dual homed) matches the cluster
1719

1720
    Any errors are signalled by raising errors.OpPrereqError.
1721

1722
    """
1723
    node_name = self.op.node_name
1724
    cfg = self.cfg
1725

    
1726
    dns_data = utils.HostInfo(node_name)
1727

    
1728
    node = dns_data.name
1729
    primary_ip = self.op.primary_ip = dns_data.ip
1730
    secondary_ip = getattr(self.op, "secondary_ip", None)
1731
    if secondary_ip is None:
1732
      secondary_ip = primary_ip
1733
    if not utils.IsValidIP(secondary_ip):
1734
      raise errors.OpPrereqError("Invalid secondary IP given")
1735
    self.op.secondary_ip = secondary_ip
1736

    
1737
    node_list = cfg.GetNodeList()
1738
    if not self.op.readd and node in node_list:
1739
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1740
                                 node)
1741
    elif self.op.readd and node not in node_list:
1742
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1743

    
1744
    for existing_node_name in node_list:
1745
      existing_node = cfg.GetNodeInfo(existing_node_name)
1746

    
1747
      if self.op.readd and node == existing_node_name:
1748
        if (existing_node.primary_ip != primary_ip or
1749
            existing_node.secondary_ip != secondary_ip):
1750
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1751
                                     " address configuration as before")
1752
        continue
1753

    
1754
      if (existing_node.primary_ip == primary_ip or
1755
          existing_node.secondary_ip == primary_ip or
1756
          existing_node.primary_ip == secondary_ip or
1757
          existing_node.secondary_ip == secondary_ip):
1758
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1759
                                   " existing node %s" % existing_node.name)
1760

    
1761
    # check that the type of the node (single versus dual homed) is the
1762
    # same as for the master
1763
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1764
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1765
    newbie_singlehomed = secondary_ip == primary_ip
1766
    if master_singlehomed != newbie_singlehomed:
1767
      if master_singlehomed:
1768
        raise errors.OpPrereqError("The master has no private ip but the"
1769
                                   " new node has one")
1770
      else:
1771
        raise errors.OpPrereqError("The master has a private ip but the"
1772
                                   " new node doesn't have one")
1773

    
1774
    # checks reachablity
1775
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1776
      raise errors.OpPrereqError("Node not reachable by ping")
1777

    
1778
    if not newbie_singlehomed:
1779
      # check reachability from my secondary ip to newbie's secondary ip
1780
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1781
                           source=myself.secondary_ip):
1782
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1783
                                   " based ping to noded port")
1784

    
1785
    self.new_node = objects.Node(name=node,
1786
                                 primary_ip=primary_ip,
1787
                                 secondary_ip=secondary_ip)
1788

    
1789
  def Exec(self, feedback_fn):
1790
    """Adds the new node to the cluster.
1791

1792
    """
1793
    new_node = self.new_node
1794
    node = new_node.name
1795

    
1796
    # check connectivity
1797
    result = self.rpc.call_version([node])[node]
1798
    if result:
1799
      if constants.PROTOCOL_VERSION == result:
1800
        logger.Info("communication to node %s fine, sw version %s match" %
1801
                    (node, result))
1802
      else:
1803
        raise errors.OpExecError("Version mismatch master version %s,"
1804
                                 " node version %s" %
1805
                                 (constants.PROTOCOL_VERSION, result))
1806
    else:
1807
      raise errors.OpExecError("Cannot get version from the new node")
1808

    
1809
    # setup ssh on node
1810
    logger.Info("copy ssh key to node %s" % node)
1811
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1812
    keyarray = []
1813
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1814
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1815
                priv_key, pub_key]
1816

    
1817
    for i in keyfiles:
1818
      f = open(i, 'r')
1819
      try:
1820
        keyarray.append(f.read())
1821
      finally:
1822
        f.close()
1823

    
1824
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1825
                                    keyarray[2],
1826
                                    keyarray[3], keyarray[4], keyarray[5])
1827

    
1828
    if not result:
1829
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1830

    
1831
    # Add node to our /etc/hosts, and add key to known_hosts
1832
    utils.AddHostToEtcHosts(new_node.name)
1833

    
1834
    if new_node.secondary_ip != new_node.primary_ip:
1835
      if not self.rpc.call_node_has_ip_address(new_node.name,
1836
                                               new_node.secondary_ip):
1837
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1838
                                 " you gave (%s). Please fix and re-run this"
1839
                                 " command." % new_node.secondary_ip)
1840

    
1841
    node_verify_list = [self.cfg.GetMasterNode()]
1842
    node_verify_param = {
1843
      'nodelist': [node],
1844
      # TODO: do a node-net-test as well?
1845
    }
1846

    
1847
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1848
                                       self.cfg.GetClusterName())
1849
    for verifier in node_verify_list:
1850
      if not result[verifier]:
1851
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1852
                                 " for remote verification" % verifier)
1853
      if result[verifier]['nodelist']:
1854
        for failed in result[verifier]['nodelist']:
1855
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1856
                      (verifier, result[verifier]['nodelist'][failed]))
1857
        raise errors.OpExecError("ssh/hostname verification failed.")
1858

    
1859
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1860
    # including the node just added
1861
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1862
    dist_nodes = self.cfg.GetNodeList()
1863
    if not self.op.readd:
1864
      dist_nodes.append(node)
1865
    if myself.name in dist_nodes:
1866
      dist_nodes.remove(myself.name)
1867

    
1868
    logger.Debug("Copying hosts and known_hosts to all nodes")
1869
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1870
      result = self.rpc.call_upload_file(dist_nodes, fname)
1871
      for to_node in dist_nodes:
1872
        if not result[to_node]:
1873
          logger.Error("copy of file %s to node %s failed" %
1874
                       (fname, to_node))
1875

    
1876
    to_copy = []
1877
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1878
      to_copy.append(constants.VNC_PASSWORD_FILE)
1879
    for fname in to_copy:
1880
      result = self.rpc.call_upload_file([node], fname)
1881
      if not result[node]:
1882
        logger.Error("could not copy file %s to node %s" % (fname, node))
1883

    
1884
    if self.op.readd:
1885
      self.context.ReaddNode(new_node)
1886
    else:
1887
      self.context.AddNode(new_node)
1888

    
1889

    
1890
class LUQueryClusterInfo(NoHooksLU):
1891
  """Query cluster configuration.
1892

1893
  """
1894
  _OP_REQP = []
1895
  REQ_MASTER = False
1896
  REQ_BGL = False
1897

    
1898
  def ExpandNames(self):
1899
    self.needed_locks = {}
1900

    
1901
  def CheckPrereq(self):
1902
    """No prerequsites needed for this LU.
1903

1904
    """
1905
    pass
1906

    
1907
  def Exec(self, feedback_fn):
1908
    """Return cluster config.
1909

1910
    """
1911
    cluster = self.cfg.GetClusterInfo()
1912
    result = {
1913
      "software_version": constants.RELEASE_VERSION,
1914
      "protocol_version": constants.PROTOCOL_VERSION,
1915
      "config_version": constants.CONFIG_VERSION,
1916
      "os_api_version": constants.OS_API_VERSION,
1917
      "export_version": constants.EXPORT_VERSION,
1918
      "architecture": (platform.architecture()[0], platform.machine()),
1919
      "name": cluster.cluster_name,
1920
      "master": cluster.master_node,
1921
      "hypervisor_type": cluster.hypervisor,
1922
      "enabled_hypervisors": cluster.enabled_hypervisors,
1923
      "hvparams": cluster.hvparams,
1924
      "beparams": cluster.beparams,
1925
      }
1926

    
1927
    return result
1928

    
1929

    
1930
class LUQueryConfigValues(NoHooksLU):
1931
  """Return configuration values.
1932

1933
  """
1934
  _OP_REQP = []
1935
  REQ_BGL = False
1936

    
1937
  def ExpandNames(self):
1938
    self.needed_locks = {}
1939

    
1940
    static_fields = ["cluster_name", "master_node", "drain_flag"]
1941
    _CheckOutputFields(static=static_fields,
1942
                       dynamic=[],
1943
                       selected=self.op.output_fields)
1944

    
1945
  def CheckPrereq(self):
1946
    """No prerequisites.
1947

1948
    """
1949
    pass
1950

    
1951
  def Exec(self, feedback_fn):
1952
    """Dump a representation of the cluster config to the standard output.
1953

1954
    """
1955
    values = []
1956
    for field in self.op.output_fields:
1957
      if field == "cluster_name":
1958
        entry = self.cfg.GetClusterName()
1959
      elif field == "master_node":
1960
        entry = self.cfg.GetMasterNode()
1961
      elif field == "drain_flag":
1962
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1963
      else:
1964
        raise errors.ParameterError(field)
1965
      values.append(entry)
1966
    return values
1967

    
1968

    
1969
class LUActivateInstanceDisks(NoHooksLU):
1970
  """Bring up an instance's disks.
1971

1972
  """
1973
  _OP_REQP = ["instance_name"]
1974
  REQ_BGL = False
1975

    
1976
  def ExpandNames(self):
1977
    self._ExpandAndLockInstance()
1978
    self.needed_locks[locking.LEVEL_NODE] = []
1979
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1980

    
1981
  def DeclareLocks(self, level):
1982
    if level == locking.LEVEL_NODE:
1983
      self._LockInstancesNodes()
1984

    
1985
  def CheckPrereq(self):
1986
    """Check prerequisites.
1987

1988
    This checks that the instance is in the cluster.
1989

1990
    """
1991
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1992
    assert self.instance is not None, \
1993
      "Cannot retrieve locked instance %s" % self.op.instance_name
1994

    
1995
  def Exec(self, feedback_fn):
1996
    """Activate the disks.
1997

1998
    """
1999
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2000
    if not disks_ok:
2001
      raise errors.OpExecError("Cannot activate block devices")
2002

    
2003
    return disks_info
2004

    
2005

    
2006
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2007
  """Prepare the block devices for an instance.
2008

2009
  This sets up the block devices on all nodes.
2010

2011
  Args:
2012
    instance: a ganeti.objects.Instance object
2013
    ignore_secondaries: if true, errors on secondary nodes won't result
2014
                        in an error return from the function
2015

2016
  Returns:
2017
    false if the operation failed
2018
    list of (host, instance_visible_name, node_visible_name) if the operation
2019
         suceeded with the mapping from node devices to instance devices
2020
  """
2021
  device_info = []
2022
  disks_ok = True
2023
  iname = instance.name
2024
  # With the two passes mechanism we try to reduce the window of
2025
  # opportunity for the race condition of switching DRBD to primary
2026
  # before handshaking occured, but we do not eliminate it
2027

    
2028
  # The proper fix would be to wait (with some limits) until the
2029
  # connection has been made and drbd transitions from WFConnection
2030
  # into any other network-connected state (Connected, SyncTarget,
2031
  # SyncSource, etc.)
2032

    
2033
  # 1st pass, assemble on all nodes in secondary mode
2034
  for inst_disk in instance.disks:
2035
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2036
      lu.cfg.SetDiskID(node_disk, node)
2037
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2038
      if not result:
2039
        logger.Error("could not prepare block device %s on node %s"
2040
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2041
        if not ignore_secondaries:
2042
          disks_ok = False
2043

    
2044
  # FIXME: race condition on drbd migration to primary
2045

    
2046
  # 2nd pass, do only the primary node
2047
  for inst_disk in instance.disks:
2048
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2049
      if node != instance.primary_node:
2050
        continue
2051
      lu.cfg.SetDiskID(node_disk, node)
2052
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2053
      if not result:
2054
        logger.Error("could not prepare block device %s on node %s"
2055
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2056
        disks_ok = False
2057
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2058

    
2059
  # leave the disks configured for the primary node
2060
  # this is a workaround that would be fixed better by
2061
  # improving the logical/physical id handling
2062
  for disk in instance.disks:
2063
    lu.cfg.SetDiskID(disk, instance.primary_node)
2064

    
2065
  return disks_ok, device_info
2066

    
2067

    
2068
def _StartInstanceDisks(lu, instance, force):
2069
  """Start the disks of an instance.
2070

2071
  """
2072
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2073
                                           ignore_secondaries=force)
2074
  if not disks_ok:
2075
    _ShutdownInstanceDisks(lu, instance)
2076
    if force is not None and not force:
2077
      logger.Error("If the message above refers to a secondary node,"
2078
                   " you can retry the operation using '--force'.")
2079
    raise errors.OpExecError("Disk consistency error")
2080

    
2081

    
2082
class LUDeactivateInstanceDisks(NoHooksLU):
2083
  """Shutdown an instance's disks.
2084

2085
  """
2086
  _OP_REQP = ["instance_name"]
2087
  REQ_BGL = False
2088

    
2089
  def ExpandNames(self):
2090
    self._ExpandAndLockInstance()
2091
    self.needed_locks[locking.LEVEL_NODE] = []
2092
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2093

    
2094
  def DeclareLocks(self, level):
2095
    if level == locking.LEVEL_NODE:
2096
      self._LockInstancesNodes()
2097

    
2098
  def CheckPrereq(self):
2099
    """Check prerequisites.
2100

2101
    This checks that the instance is in the cluster.
2102

2103
    """
2104
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2105
    assert self.instance is not None, \
2106
      "Cannot retrieve locked instance %s" % self.op.instance_name
2107

    
2108
  def Exec(self, feedback_fn):
2109
    """Deactivate the disks
2110

2111
    """
2112
    instance = self.instance
2113
    _SafeShutdownInstanceDisks(self, instance)
2114

    
2115

    
2116
def _SafeShutdownInstanceDisks(lu, instance):
2117
  """Shutdown block devices of an instance.
2118

2119
  This function checks if an instance is running, before calling
2120
  _ShutdownInstanceDisks.
2121

2122
  """
2123
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2124
                                      [instance.hypervisor])
2125
  ins_l = ins_l[instance.primary_node]
2126
  if not type(ins_l) is list:
2127
    raise errors.OpExecError("Can't contact node '%s'" %
2128
                             instance.primary_node)
2129

    
2130
  if instance.name in ins_l:
2131
    raise errors.OpExecError("Instance is running, can't shutdown"
2132
                             " block devices.")
2133

    
2134
  _ShutdownInstanceDisks(lu, instance)
2135

    
2136

    
2137
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2138
  """Shutdown block devices of an instance.
2139

2140
  This does the shutdown on all nodes of the instance.
2141

2142
  If the ignore_primary is false, errors on the primary node are
2143
  ignored.
2144

2145
  """
2146
  result = True
2147
  for disk in instance.disks:
2148
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2149
      lu.cfg.SetDiskID(top_disk, node)
2150
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2151
        logger.Error("could not shutdown block device %s on node %s" %
2152
                     (disk.iv_name, node))
2153
        if not ignore_primary or node != instance.primary_node:
2154
          result = False
2155
  return result
2156

    
2157

    
2158
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2159
  """Checks if a node has enough free memory.
2160

2161
  This function check if a given node has the needed amount of free
2162
  memory. In case the node has less memory or we cannot get the
2163
  information from the node, this function raise an OpPrereqError
2164
  exception.
2165

2166
  @type lu: C{LogicalUnit}
2167
  @param lu: a logical unit from which we get configuration data
2168
  @type node: C{str}
2169
  @param node: the node to check
2170
  @type reason: C{str}
2171
  @param reason: string to use in the error message
2172
  @type requested: C{int}
2173
  @param requested: the amount of memory in MiB to check for
2174
  @type hypervisor: C{str}
2175
  @param hypervisor: the hypervisor to ask for memory stats
2176
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2177
      we cannot check the node
2178

2179
  """
2180
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2181
  if not nodeinfo or not isinstance(nodeinfo, dict):
2182
    raise errors.OpPrereqError("Could not contact node %s for resource"
2183
                             " information" % (node,))
2184

    
2185
  free_mem = nodeinfo[node].get('memory_free')
2186
  if not isinstance(free_mem, int):
2187
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2188
                             " was '%s'" % (node, free_mem))
2189
  if requested > free_mem:
2190
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2191
                             " needed %s MiB, available %s MiB" %
2192
                             (node, reason, requested, free_mem))
2193

    
2194

    
2195
class LUStartupInstance(LogicalUnit):
2196
  """Starts an instance.
2197

2198
  """
2199
  HPATH = "instance-start"
2200
  HTYPE = constants.HTYPE_INSTANCE
2201
  _OP_REQP = ["instance_name", "force"]
2202
  REQ_BGL = False
2203

    
2204
  def ExpandNames(self):
2205
    self._ExpandAndLockInstance()
2206
    self.needed_locks[locking.LEVEL_NODE] = []
2207
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2208

    
2209
  def DeclareLocks(self, level):
2210
    if level == locking.LEVEL_NODE:
2211
      self._LockInstancesNodes()
2212

    
2213
  def BuildHooksEnv(self):
2214
    """Build hooks env.
2215

2216
    This runs on master, primary and secondary nodes of the instance.
2217

2218
    """
2219
    env = {
2220
      "FORCE": self.op.force,
2221
      }
2222
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2223
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2224
          list(self.instance.secondary_nodes))
2225
    return env, nl, nl
2226

    
2227
  def CheckPrereq(self):
2228
    """Check prerequisites.
2229

2230
    This checks that the instance is in the cluster.
2231

2232
    """
2233
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2234
    assert self.instance is not None, \
2235
      "Cannot retrieve locked instance %s" % self.op.instance_name
2236

    
2237
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2238
    # check bridges existance
2239
    _CheckInstanceBridgesExist(self, instance)
2240

    
2241
    _CheckNodeFreeMemory(self, instance.primary_node,
2242
                         "starting instance %s" % instance.name,
2243
                         bep[constants.BE_MEMORY], instance.hypervisor)
2244

    
2245
  def Exec(self, feedback_fn):
2246
    """Start the instance.
2247

2248
    """
2249
    instance = self.instance
2250
    force = self.op.force
2251
    extra_args = getattr(self.op, "extra_args", "")
2252

    
2253
    self.cfg.MarkInstanceUp(instance.name)
2254

    
2255
    node_current = instance.primary_node
2256

    
2257
    _StartInstanceDisks(self, instance, force)
2258

    
2259
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2260
      _ShutdownInstanceDisks(self, instance)
2261
      raise errors.OpExecError("Could not start instance")
2262

    
2263

    
2264
class LURebootInstance(LogicalUnit):
2265
  """Reboot an instance.
2266

2267
  """
2268
  HPATH = "instance-reboot"
2269
  HTYPE = constants.HTYPE_INSTANCE
2270
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2271
  REQ_BGL = False
2272

    
2273
  def ExpandNames(self):
2274
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2275
                                   constants.INSTANCE_REBOOT_HARD,
2276
                                   constants.INSTANCE_REBOOT_FULL]:
2277
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2278
                                  (constants.INSTANCE_REBOOT_SOFT,
2279
                                   constants.INSTANCE_REBOOT_HARD,
2280
                                   constants.INSTANCE_REBOOT_FULL))
2281
    self._ExpandAndLockInstance()
2282
    self.needed_locks[locking.LEVEL_NODE] = []
2283
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2284

    
2285
  def DeclareLocks(self, level):
2286
    if level == locking.LEVEL_NODE:
2287
      primary_only = not constants.INSTANCE_REBOOT_FULL
2288
      self._LockInstancesNodes(primary_only=primary_only)
2289

    
2290
  def BuildHooksEnv(self):
2291
    """Build hooks env.
2292

2293
    This runs on master, primary and secondary nodes of the instance.
2294

2295
    """
2296
    env = {
2297
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2298
      }
2299
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2300
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2301
          list(self.instance.secondary_nodes))
2302
    return env, nl, nl
2303

    
2304
  def CheckPrereq(self):
2305
    """Check prerequisites.
2306

2307
    This checks that the instance is in the cluster.
2308

2309
    """
2310
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2311
    assert self.instance is not None, \
2312
      "Cannot retrieve locked instance %s" % self.op.instance_name
2313

    
2314
    # check bridges existance
2315
    _CheckInstanceBridgesExist(self, instance)
2316

    
2317
  def Exec(self, feedback_fn):
2318
    """Reboot the instance.
2319

2320
    """
2321
    instance = self.instance
2322
    ignore_secondaries = self.op.ignore_secondaries
2323
    reboot_type = self.op.reboot_type
2324
    extra_args = getattr(self.op, "extra_args", "")
2325

    
2326
    node_current = instance.primary_node
2327

    
2328
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2329
                       constants.INSTANCE_REBOOT_HARD]:
2330
      if not self.rpc.call_instance_reboot(node_current, instance,
2331
                                           reboot_type, extra_args):
2332
        raise errors.OpExecError("Could not reboot instance")
2333
    else:
2334
      if not self.rpc.call_instance_shutdown(node_current, instance):
2335
        raise errors.OpExecError("could not shutdown instance for full reboot")
2336
      _ShutdownInstanceDisks(self, instance)
2337
      _StartInstanceDisks(self, instance, ignore_secondaries)
2338
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2339
        _ShutdownInstanceDisks(self, instance)
2340
        raise errors.OpExecError("Could not start instance for full reboot")
2341

    
2342
    self.cfg.MarkInstanceUp(instance.name)
2343

    
2344

    
2345
class LUShutdownInstance(LogicalUnit):
2346
  """Shutdown an instance.
2347

2348
  """
2349
  HPATH = "instance-stop"
2350
  HTYPE = constants.HTYPE_INSTANCE
2351
  _OP_REQP = ["instance_name"]
2352
  REQ_BGL = False
2353

    
2354
  def ExpandNames(self):
2355
    self._ExpandAndLockInstance()
2356
    self.needed_locks[locking.LEVEL_NODE] = []
2357
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2358

    
2359
  def DeclareLocks(self, level):
2360
    if level == locking.LEVEL_NODE:
2361
      self._LockInstancesNodes()
2362

    
2363
  def BuildHooksEnv(self):
2364
    """Build hooks env.
2365

2366
    This runs on master, primary and secondary nodes of the instance.
2367

2368
    """
2369
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2370
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2371
          list(self.instance.secondary_nodes))
2372
    return env, nl, nl
2373

    
2374
  def CheckPrereq(self):
2375
    """Check prerequisites.
2376

2377
    This checks that the instance is in the cluster.
2378

2379
    """
2380
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2381
    assert self.instance is not None, \
2382
      "Cannot retrieve locked instance %s" % self.op.instance_name
2383

    
2384
  def Exec(self, feedback_fn):
2385
    """Shutdown the instance.
2386

2387
    """
2388
    instance = self.instance
2389
    node_current = instance.primary_node
2390
    self.cfg.MarkInstanceDown(instance.name)
2391
    if not self.rpc.call_instance_shutdown(node_current, instance):
2392
      logger.Error("could not shutdown instance")
2393

    
2394
    _ShutdownInstanceDisks(self, instance)
2395

    
2396

    
2397
class LUReinstallInstance(LogicalUnit):
2398
  """Reinstall an instance.
2399

2400
  """
2401
  HPATH = "instance-reinstall"
2402
  HTYPE = constants.HTYPE_INSTANCE
2403
  _OP_REQP = ["instance_name"]
2404
  REQ_BGL = False
2405

    
2406
  def ExpandNames(self):
2407
    self._ExpandAndLockInstance()
2408
    self.needed_locks[locking.LEVEL_NODE] = []
2409
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2410

    
2411
  def DeclareLocks(self, level):
2412
    if level == locking.LEVEL_NODE:
2413
      self._LockInstancesNodes()
2414

    
2415
  def BuildHooksEnv(self):
2416
    """Build hooks env.
2417

2418
    This runs on master, primary and secondary nodes of the instance.
2419

2420
    """
2421
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2422
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2423
          list(self.instance.secondary_nodes))
2424
    return env, nl, nl
2425

    
2426
  def CheckPrereq(self):
2427
    """Check prerequisites.
2428

2429
    This checks that the instance is in the cluster and is not running.
2430

2431
    """
2432
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2433
    assert instance is not None, \
2434
      "Cannot retrieve locked instance %s" % self.op.instance_name
2435

    
2436
    if instance.disk_template == constants.DT_DISKLESS:
2437
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2438
                                 self.op.instance_name)
2439
    if instance.status != "down":
2440
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2441
                                 self.op.instance_name)
2442
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2443
                                              instance.name,
2444
                                              instance.hypervisor)
2445
    if remote_info:
2446
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2447
                                 (self.op.instance_name,
2448
                                  instance.primary_node))
2449

    
2450
    self.op.os_type = getattr(self.op, "os_type", None)
2451
    if self.op.os_type is not None:
2452
      # OS verification
2453
      pnode = self.cfg.GetNodeInfo(
2454
        self.cfg.ExpandNodeName(instance.primary_node))
2455
      if pnode is None:
2456
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2457
                                   self.op.pnode)
2458
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2459
      if not os_obj:
2460
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2461
                                   " primary node"  % self.op.os_type)
2462

    
2463
    self.instance = instance
2464

    
2465
  def Exec(self, feedback_fn):
2466
    """Reinstall the instance.
2467

2468
    """
2469
    inst = self.instance
2470

    
2471
    if self.op.os_type is not None:
2472
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2473
      inst.os = self.op.os_type
2474
      self.cfg.Update(inst)
2475

    
2476
    _StartInstanceDisks(self, inst, None)
2477
    try:
2478
      feedback_fn("Running the instance OS create scripts...")
2479
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2480
                                           "sda", "sdb"):
2481
        raise errors.OpExecError("Could not install OS for instance %s"
2482
                                 " on node %s" %
2483
                                 (inst.name, inst.primary_node))
2484
    finally:
2485
      _ShutdownInstanceDisks(self, inst)
2486

    
2487

    
2488
class LURenameInstance(LogicalUnit):
2489
  """Rename an instance.
2490

2491
  """
2492
  HPATH = "instance-rename"
2493
  HTYPE = constants.HTYPE_INSTANCE
2494
  _OP_REQP = ["instance_name", "new_name"]
2495

    
2496
  def BuildHooksEnv(self):
2497
    """Build hooks env.
2498

2499
    This runs on master, primary and secondary nodes of the instance.
2500

2501
    """
2502
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2503
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2504
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2505
          list(self.instance.secondary_nodes))
2506
    return env, nl, nl
2507

    
2508
  def CheckPrereq(self):
2509
    """Check prerequisites.
2510

2511
    This checks that the instance is in the cluster and is not running.
2512

2513
    """
2514
    instance = self.cfg.GetInstanceInfo(
2515
      self.cfg.ExpandInstanceName(self.op.instance_name))
2516
    if instance is None:
2517
      raise errors.OpPrereqError("Instance '%s' not known" %
2518
                                 self.op.instance_name)
2519
    if instance.status != "down":
2520
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2521
                                 self.op.instance_name)
2522
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2523
                                              instance.name,
2524
                                              instance.hypervisor)
2525
    if remote_info:
2526
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2527
                                 (self.op.instance_name,
2528
                                  instance.primary_node))
2529
    self.instance = instance
2530

    
2531
    # new name verification
2532
    name_info = utils.HostInfo(self.op.new_name)
2533

    
2534
    self.op.new_name = new_name = name_info.name
2535
    instance_list = self.cfg.GetInstanceList()
2536
    if new_name in instance_list:
2537
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2538
                                 new_name)
2539

    
2540
    if not getattr(self.op, "ignore_ip", False):
2541
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2542
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2543
                                   (name_info.ip, new_name))
2544

    
2545

    
2546
  def Exec(self, feedback_fn):
2547
    """Reinstall the instance.
2548

2549
    """
2550
    inst = self.instance
2551
    old_name = inst.name
2552

    
2553
    if inst.disk_template == constants.DT_FILE:
2554
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2555

    
2556
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2557
    # Change the instance lock. This is definitely safe while we hold the BGL
2558
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2559
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2560

    
2561
    # re-read the instance from the configuration after rename
2562
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2563

    
2564
    if inst.disk_template == constants.DT_FILE:
2565
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2566
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2567
                                                     old_file_storage_dir,
2568
                                                     new_file_storage_dir)
2569

    
2570
      if not result:
2571
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2572
                                 " directory '%s' to '%s' (but the instance"
2573
                                 " has been renamed in Ganeti)" % (
2574
                                 inst.primary_node, old_file_storage_dir,
2575
                                 new_file_storage_dir))
2576

    
2577
      if not result[0]:
2578
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2579
                                 " (but the instance has been renamed in"
2580
                                 " Ganeti)" % (old_file_storage_dir,
2581
                                               new_file_storage_dir))
2582

    
2583
    _StartInstanceDisks(self, inst, None)
2584
    try:
2585
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2586
                                               old_name):
2587
        msg = ("Could not run OS rename script for instance %s on node %s"
2588
               " (but the instance has been renamed in Ganeti)" %
2589
               (inst.name, inst.primary_node))
2590
        logger.Error(msg)
2591
    finally:
2592
      _ShutdownInstanceDisks(self, inst)
2593

    
2594

    
2595
class LURemoveInstance(LogicalUnit):
2596
  """Remove an instance.
2597

2598
  """
2599
  HPATH = "instance-remove"
2600
  HTYPE = constants.HTYPE_INSTANCE
2601
  _OP_REQP = ["instance_name", "ignore_failures"]
2602
  REQ_BGL = False
2603

    
2604
  def ExpandNames(self):
2605
    self._ExpandAndLockInstance()
2606
    self.needed_locks[locking.LEVEL_NODE] = []
2607
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2608

    
2609
  def DeclareLocks(self, level):
2610
    if level == locking.LEVEL_NODE:
2611
      self._LockInstancesNodes()
2612

    
2613
  def BuildHooksEnv(self):
2614
    """Build hooks env.
2615

2616
    This runs on master, primary and secondary nodes of the instance.
2617

2618
    """
2619
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2620
    nl = [self.cfg.GetMasterNode()]
2621
    return env, nl, nl
2622

    
2623
  def CheckPrereq(self):
2624
    """Check prerequisites.
2625

2626
    This checks that the instance is in the cluster.
2627

2628
    """
2629
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2630
    assert self.instance is not None, \
2631
      "Cannot retrieve locked instance %s" % self.op.instance_name
2632

    
2633
  def Exec(self, feedback_fn):
2634
    """Remove the instance.
2635

2636
    """
2637
    instance = self.instance
2638
    logger.Info("shutting down instance %s on node %s" %
2639
                (instance.name, instance.primary_node))
2640

    
2641
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2642
      if self.op.ignore_failures:
2643
        feedback_fn("Warning: can't shutdown instance")
2644
      else:
2645
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2646
                                 (instance.name, instance.primary_node))
2647

    
2648
    logger.Info("removing block devices for instance %s" % instance.name)
2649

    
2650
    if not _RemoveDisks(self, instance):
2651
      if self.op.ignore_failures:
2652
        feedback_fn("Warning: can't remove instance's disks")
2653
      else:
2654
        raise errors.OpExecError("Can't remove instance's disks")
2655

    
2656
    logger.Info("removing instance %s out of cluster config" % instance.name)
2657

    
2658
    self.cfg.RemoveInstance(instance.name)
2659
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2660

    
2661

    
2662
class LUQueryInstances(NoHooksLU):
2663
  """Logical unit for querying instances.
2664

2665
  """
2666
  _OP_REQP = ["output_fields", "names"]
2667
  REQ_BGL = False
2668

    
2669
  def ExpandNames(self):
2670
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2671
    hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2672
    bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2673
    self.static_fields = frozenset([
2674
      "name", "os", "pnode", "snodes",
2675
      "admin_state", "admin_ram",
2676
      "disk_template", "ip", "mac", "bridge",
2677
      "sda_size", "sdb_size", "vcpus", "tags",
2678
      "network_port",
2679
      "serial_no", "hypervisor", "hvparams",
2680
      ] + hvp + bep)
2681

    
2682
    _CheckOutputFields(static=self.static_fields,
2683
                       dynamic=self.dynamic_fields,
2684
                       selected=self.op.output_fields)
2685

    
2686
    self.needed_locks = {}
2687
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2688
    self.share_locks[locking.LEVEL_NODE] = 1
2689

    
2690
    if self.op.names:
2691
      self.wanted = _GetWantedInstances(self, self.op.names)
2692
    else:
2693
      self.wanted = locking.ALL_SET
2694

    
2695
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2696
    if self.do_locking:
2697
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2698
      self.needed_locks[locking.LEVEL_NODE] = []
2699
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2700

    
2701
  def DeclareLocks(self, level):
2702
    if level == locking.LEVEL_NODE and self.do_locking:
2703
      self._LockInstancesNodes()
2704

    
2705
  def CheckPrereq(self):
2706
    """Check prerequisites.
2707

2708
    """
2709
    pass
2710

    
2711
  def Exec(self, feedback_fn):
2712
    """Computes the list of nodes and their attributes.
2713

2714
    """
2715
    all_info = self.cfg.GetAllInstancesInfo()
2716
    if self.do_locking:
2717
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2718
    elif self.wanted != locking.ALL_SET:
2719
      instance_names = self.wanted
2720
      missing = set(instance_names).difference(all_info.keys())
2721
      if missing:
2722
        raise errors.OpExecError(
2723
          "Some instances were removed before retrieving their data: %s"
2724
          % missing)
2725
    else:
2726
      instance_names = all_info.keys()
2727

    
2728
    instance_names = utils.NiceSort(instance_names)
2729
    instance_list = [all_info[iname] for iname in instance_names]
2730

    
2731
    # begin data gathering
2732

    
2733
    nodes = frozenset([inst.primary_node for inst in instance_list])
2734
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2735

    
2736
    bad_nodes = []
2737
    if self.dynamic_fields.intersection(self.op.output_fields):
2738
      live_data = {}
2739
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2740
      for name in nodes:
2741
        result = node_data[name]
2742
        if result:
2743
          live_data.update(result)
2744
        elif result == False:
2745
          bad_nodes.append(name)
2746
        # else no instance is alive
2747
    else:
2748
      live_data = dict([(name, {}) for name in instance_names])
2749

    
2750
    # end data gathering
2751

    
2752
    HVPREFIX = "hv/"
2753
    BEPREFIX = "be/"
2754
    output = []
2755
    for instance in instance_list:
2756
      iout = []
2757
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2758
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2759
      for field in self.op.output_fields:
2760
        if field == "name":
2761
          val = instance.name
2762
        elif field == "os":
2763
          val = instance.os
2764
        elif field == "pnode":
2765
          val = instance.primary_node
2766
        elif field == "snodes":
2767
          val = list(instance.secondary_nodes)
2768
        elif field == "admin_state":
2769
          val = (instance.status != "down")
2770
        elif field == "oper_state":
2771
          if instance.primary_node in bad_nodes:
2772
            val = None
2773
          else:
2774
            val = bool(live_data.get(instance.name))
2775
        elif field == "status":
2776
          if instance.primary_node in bad_nodes:
2777
            val = "ERROR_nodedown"
2778
          else:
2779
            running = bool(live_data.get(instance.name))
2780
            if running:
2781
              if instance.status != "down":
2782
                val = "running"
2783
              else:
2784
                val = "ERROR_up"
2785
            else:
2786
              if instance.status != "down":
2787
                val = "ERROR_down"
2788
              else:
2789
                val = "ADMIN_down"
2790
        elif field == "oper_ram":
2791
          if instance.primary_node in bad_nodes:
2792
            val = None
2793
          elif instance.name in live_data:
2794
            val = live_data[instance.name].get("memory", "?")
2795
          else:
2796
            val = "-"
2797
        elif field == "disk_template":
2798
          val = instance.disk_template
2799
        elif field == "ip":
2800
          val = instance.nics[0].ip
2801
        elif field == "bridge":
2802
          val = instance.nics[0].bridge
2803
        elif field == "mac":
2804
          val = instance.nics[0].mac
2805
        elif field == "sda_size" or field == "sdb_size":
2806
          disk = instance.FindDisk(field[:3])
2807
          if disk is None:
2808
            val = None
2809
          else:
2810
            val = disk.size
2811
        elif field == "tags":
2812
          val = list(instance.GetTags())
2813
        elif field == "serial_no":
2814
          val = instance.serial_no
2815
        elif field == "network_port":
2816
          val = instance.network_port
2817
        elif field == "hypervisor":
2818
          val = instance.hypervisor
2819
        elif field == "hvparams":
2820
          val = i_hv
2821
        elif (field.startswith(HVPREFIX) and
2822
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2823
          val = i_hv.get(field[len(HVPREFIX):], None)
2824
        elif field == "beparams":
2825
          val = i_be
2826
        elif (field.startswith(BEPREFIX) and
2827
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2828
          val = i_be.get(field[len(BEPREFIX):], None)
2829
        else:
2830
          raise errors.ParameterError(field)
2831
        iout.append(val)
2832
      output.append(iout)
2833

    
2834
    return output
2835

    
2836

    
2837
class LUFailoverInstance(LogicalUnit):
2838
  """Failover an instance.
2839

2840
  """
2841
  HPATH = "instance-failover"
2842
  HTYPE = constants.HTYPE_INSTANCE
2843
  _OP_REQP = ["instance_name", "ignore_consistency"]
2844
  REQ_BGL = False
2845

    
2846
  def ExpandNames(self):
2847
    self._ExpandAndLockInstance()
2848
    self.needed_locks[locking.LEVEL_NODE] = []
2849
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2850

    
2851
  def DeclareLocks(self, level):
2852
    if level == locking.LEVEL_NODE:
2853
      self._LockInstancesNodes()
2854

    
2855
  def BuildHooksEnv(self):
2856
    """Build hooks env.
2857

2858
    This runs on master, primary and secondary nodes of the instance.
2859

2860
    """
2861
    env = {
2862
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2863
      }
2864
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2865
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2866
    return env, nl, nl
2867

    
2868
  def CheckPrereq(self):
2869
    """Check prerequisites.
2870

2871
    This checks that the instance is in the cluster.
2872

2873
    """
2874
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2875
    assert self.instance is not None, \
2876
      "Cannot retrieve locked instance %s" % self.op.instance_name
2877

    
2878
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2879
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2880
      raise errors.OpPrereqError("Instance's disk layout is not"
2881
                                 " network mirrored, cannot failover.")
2882

    
2883
    secondary_nodes = instance.secondary_nodes
2884
    if not secondary_nodes:
2885
      raise errors.ProgrammerError("no secondary node but using "
2886
                                   "a mirrored disk template")
2887

    
2888
    target_node = secondary_nodes[0]
2889
    # check memory requirements on the secondary node
2890
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2891
                         instance.name, bep[constants.BE_MEMORY],
2892
                         instance.hypervisor)
2893

    
2894
    # check bridge existance
2895
    brlist = [nic.bridge for nic in instance.nics]
2896
    if not self.rpc.call_bridges_exist(target_node, brlist):
2897
      raise errors.OpPrereqError("One or more target bridges %s does not"
2898
                                 " exist on destination node '%s'" %
2899
                                 (brlist, target_node))
2900

    
2901
  def Exec(self, feedback_fn):
2902
    """Failover an instance.
2903

2904
    The failover is done by shutting it down on its present node and
2905
    starting it on the secondary.
2906

2907
    """
2908
    instance = self.instance
2909

    
2910
    source_node = instance.primary_node
2911
    target_node = instance.secondary_nodes[0]
2912

    
2913
    feedback_fn("* checking disk consistency between source and target")
2914
    for dev in instance.disks:
2915
      # for drbd, these are drbd over lvm
2916
      if not _CheckDiskConsistency(self, dev, target_node, False):
2917
        if instance.status == "up" and not self.op.ignore_consistency:
2918
          raise errors.OpExecError("Disk %s is degraded on target node,"
2919
                                   " aborting failover." % dev.iv_name)
2920

    
2921
    feedback_fn("* shutting down instance on source node")
2922
    logger.Info("Shutting down instance %s on node %s" %
2923
                (instance.name, source_node))
2924

    
2925
    if not self.rpc.call_instance_shutdown(source_node, instance):
2926
      if self.op.ignore_consistency:
2927
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2928
                     " anyway. Please make sure node %s is down"  %
2929
                     (instance.name, source_node, source_node))
2930
      else:
2931
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2932
                                 (instance.name, source_node))
2933

    
2934
    feedback_fn("* deactivating the instance's disks on source node")
2935
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2936
      raise errors.OpExecError("Can't shut down the instance's disks.")
2937

    
2938
    instance.primary_node = target_node
2939
    # distribute new instance config to the other nodes
2940
    self.cfg.Update(instance)
2941

    
2942
    # Only start the instance if it's marked as up
2943
    if instance.status == "up":
2944
      feedback_fn("* activating the instance's disks on target node")
2945
      logger.Info("Starting instance %s on node %s" %
2946
                  (instance.name, target_node))
2947

    
2948
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2949
                                               ignore_secondaries=True)
2950
      if not disks_ok:
2951
        _ShutdownInstanceDisks(self, instance)
2952
        raise errors.OpExecError("Can't activate the instance's disks")
2953

    
2954
      feedback_fn("* starting the instance on the target node")
2955
      if not self.rpc.call_instance_start(target_node, instance, None):
2956
        _ShutdownInstanceDisks(self, instance)
2957
        raise errors.OpExecError("Could not start instance %s on node %s." %
2958
                                 (instance.name, target_node))
2959

    
2960

    
2961
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2962
  """Create a tree of block devices on the primary node.
2963

2964
  This always creates all devices.
2965

2966
  """
2967
  if device.children:
2968
    for child in device.children:
2969
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2970
        return False
2971

    
2972
  lu.cfg.SetDiskID(device, node)
2973
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2974
                                       instance.name, True, info)
2975
  if not new_id:
2976
    return False
2977
  if device.physical_id is None:
2978
    device.physical_id = new_id
2979
  return True
2980

    
2981

    
2982
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2983
  """Create a tree of block devices on a secondary node.
2984

2985
  If this device type has to be created on secondaries, create it and
2986
  all its children.
2987

2988
  If not, just recurse to children keeping the same 'force' value.
2989

2990
  """
2991
  if device.CreateOnSecondary():
2992
    force = True
2993
  if device.children:
2994
    for child in device.children:
2995
      if not _CreateBlockDevOnSecondary(lu, node, instance,
2996
                                        child, force, info):
2997
        return False
2998

    
2999
  if not force:
3000
    return True
3001
  lu.cfg.SetDiskID(device, node)
3002
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3003
                                       instance.name, False, info)
3004
  if not new_id:
3005
    return False
3006
  if device.physical_id is None:
3007
    device.physical_id = new_id
3008
  return True
3009

    
3010

    
3011
def _GenerateUniqueNames(lu, exts):
3012
  """Generate a suitable LV name.
3013

3014
  This will generate a logical volume name for the given instance.
3015

3016
  """
3017
  results = []
3018
  for val in exts:
3019
    new_id = lu.cfg.GenerateUniqueID()
3020
    results.append("%s%s" % (new_id, val))
3021
  return results
3022

    
3023

    
3024
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3025
                         p_minor, s_minor):
3026
  """Generate a drbd8 device complete with its children.
3027

3028
  """
3029
  port = lu.cfg.AllocatePort()
3030
  vgname = lu.cfg.GetVGName()
3031
  shared_secret = lu.cfg.GenerateDRBDSecret()
3032
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3033
                          logical_id=(vgname, names[0]))
3034
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3035
                          logical_id=(vgname, names[1]))
3036
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3037
                          logical_id=(primary, secondary, port,
3038
                                      p_minor, s_minor,
3039
                                      shared_secret),
3040
                          children=[dev_data, dev_meta],
3041
                          iv_name=iv_name)
3042
  return drbd_dev
3043

    
3044

    
3045
def _GenerateDiskTemplate(lu, template_name,
3046
                          instance_name, primary_node,
3047
                          secondary_nodes, disk_sz, swap_sz,
3048
                          file_storage_dir, file_driver):
3049
  """Generate the entire disk layout for a given template type.
3050

3051
  """
3052
  #TODO: compute space requirements
3053

    
3054
  vgname = lu.cfg.GetVGName()
3055
  if template_name == constants.DT_DISKLESS:
3056
    disks = []
3057
  elif template_name == constants.DT_PLAIN:
3058
    if len(secondary_nodes) != 0:
3059
      raise errors.ProgrammerError("Wrong template configuration")
3060

    
3061
    names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3062
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3063
                           logical_id=(vgname, names[0]),
3064
                           iv_name = "sda")
3065
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3066
                           logical_id=(vgname, names[1]),
3067
                           iv_name = "sdb")
3068
    disks = [sda_dev, sdb_dev]
3069
  elif template_name == constants.DT_DRBD8:
3070
    if len(secondary_nodes) != 1:
3071
      raise errors.ProgrammerError("Wrong template configuration")
3072
    remote_node = secondary_nodes[0]
3073
    (minor_pa, minor_pb,
3074
     minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3075
      [primary_node, primary_node, remote_node, remote_node], instance_name)
3076

    
3077
    names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3078
                                      ".sdb_data", ".sdb_meta"])
3079
    drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3080
                                        disk_sz, names[0:2], "sda",
3081
                                        minor_pa, minor_sa)
3082
    drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3083
                                        swap_sz, names[2:4], "sdb",
3084
                                        minor_pb, minor_sb)
3085
    disks = [drbd_sda_dev, drbd_sdb_dev]
3086
  elif template_name == constants.DT_FILE:
3087
    if len(secondary_nodes) != 0:
3088
      raise errors.ProgrammerError("Wrong template configuration")
3089

    
3090
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3091
                                iv_name="sda", logical_id=(file_driver,
3092
                                "%s/sda" % file_storage_dir))
3093
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3094
                                iv_name="sdb", logical_id=(file_driver,
3095
                                "%s/sdb" % file_storage_dir))
3096
    disks = [file_sda_dev, file_sdb_dev]
3097
  else:
3098
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3099
  return disks
3100

    
3101

    
3102
def _GetInstanceInfoText(instance):
3103
  """Compute that text that should be added to the disk's metadata.
3104

3105
  """
3106
  return "originstname+%s" % instance.name
3107

    
3108

    
3109
def _CreateDisks(lu, instance):
3110
  """Create all disks for an instance.
3111

3112
  This abstracts away some work from AddInstance.
3113

3114
  Args:
3115
    instance: the instance object
3116

3117
  Returns:
3118
    True or False showing the success of the creation process
3119

3120
  """
3121
  info = _GetInstanceInfoText(instance)
3122

    
3123
  if instance.disk_template == constants.DT_FILE:
3124
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3125
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3126
                                                 file_storage_dir)
3127

    
3128
    if not result:
3129
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3130
      return False
3131

    
3132
    if not result[0]:
3133
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3134
      return False
3135

    
3136
  for device in instance.disks:
3137
    logger.Info("creating volume %s for instance %s" %
3138
                (device.iv_name, instance.name))
3139
    #HARDCODE
3140
    for secondary_node in instance.secondary_nodes:
3141
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3142
                                        device, False, info):
3143
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3144
                     (device.iv_name, device, secondary_node))
3145
        return False
3146
    #HARDCODE
3147
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3148
                                    instance, device, info):
3149
      logger.Error("failed to create volume %s on primary!" %
3150
                   device.iv_name)
3151
      return False
3152

    
3153
  return True
3154

    
3155

    
3156
def _RemoveDisks(lu, instance):
3157
  """Remove all disks for an instance.
3158

3159
  This abstracts away some work from `AddInstance()` and
3160
  `RemoveInstance()`. Note that in case some of the devices couldn't
3161
  be removed, the removal will continue with the other ones (compare
3162
  with `_CreateDisks()`).
3163

3164
  Args:
3165
    instance: the instance object
3166

3167
  Returns:
3168
    True or False showing the success of the removal proces
3169

3170
  """
3171
  logger.Info("removing block devices for instance %s" % instance.name)
3172

    
3173
  result = True
3174
  for device in instance.disks:
3175
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3176
      lu.cfg.SetDiskID(disk, node)
3177
      if not lu.rpc.call_blockdev_remove(node, disk):
3178
        logger.Error("could not remove block device %s on node %s,"
3179
                     " continuing anyway" %
3180
                     (device.iv_name, node))
3181
        result = False
3182

    
3183
  if instance.disk_template == constants.DT_FILE:
3184
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3185
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3186
                                               file_storage_dir):
3187
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3188
      result = False
3189

    
3190
  return result
3191

    
3192

    
3193
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3194
  """Compute disk size requirements in the volume group
3195

3196
  This is currently hard-coded for the two-drive layout.
3197

3198
  """
3199
  # Required free disk space as a function of disk and swap space
3200
  req_size_dict = {
3201
    constants.DT_DISKLESS: None,
3202
    constants.DT_PLAIN: disk_size + swap_size,
3203
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3204
    constants.DT_DRBD8: disk_size + swap_size + 256,
3205
    constants.DT_FILE: None,
3206
  }
3207

    
3208
  if disk_template not in req_size_dict:
3209
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3210
                                 " is unknown" %  disk_template)
3211

    
3212
  return req_size_dict[disk_template]
3213

    
3214

    
3215
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3216
  """Hypervisor parameter validation.
3217

3218
  This function abstract the hypervisor parameter validation to be
3219
  used in both instance create and instance modify.
3220

3221
  @type lu: L{LogicalUnit}
3222
  @param lu: the logical unit for which we check
3223
  @type nodenames: list
3224
  @param nodenames: the list of nodes on which we should check
3225
  @type hvname: string
3226
  @param hvname: the name of the hypervisor we should use
3227
  @type hvparams: dict
3228
  @param hvparams: the parameters which we need to check
3229
  @raise errors.OpPrereqError: if the parameters are not valid
3230

3231
  """
3232
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3233
                                                  hvname,
3234
                                                  hvparams)
3235
  for node in nodenames:
3236
    info = hvinfo.get(node, None)
3237
    if not info or not isinstance(info, (tuple, list)):
3238
      raise errors.OpPrereqError("Cannot get current information"
3239
                                 " from node '%s' (%s)" % (node, info))
3240
    if not info[0]:
3241
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3242
                                 " %s" % info[1])
3243

    
3244

    
3245
class LUCreateInstance(LogicalUnit):
3246
  """Create an instance.
3247

3248
  """
3249
  HPATH = "instance-add"
3250
  HTYPE = constants.HTYPE_INSTANCE
3251
  _OP_REQP = ["instance_name", "disk_size",
3252
              "disk_template", "swap_size", "mode", "start",
3253
              "wait_for_sync", "ip_check", "mac",
3254
              "hvparams", "beparams"]
3255
  REQ_BGL = False
3256

    
3257
  def _ExpandNode(self, node):
3258
    """Expands and checks one node name.
3259

3260
    """
3261
    node_full = self.cfg.ExpandNodeName(node)
3262
    if node_full is None:
3263
      raise errors.OpPrereqError("Unknown node %s" % node)
3264
    return node_full
3265

    
3266
  def ExpandNames(self):
3267
    """ExpandNames for CreateInstance.
3268

3269
    Figure out the right locks for instance creation.
3270

3271
    """
3272
    self.needed_locks = {}
3273

    
3274
    # set optional parameters to none if they don't exist
3275
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3276
      if not hasattr(self.op, attr):
3277
        setattr(self.op, attr, None)
3278

    
3279
    # cheap checks, mostly valid constants given
3280

    
3281
    # verify creation mode
3282
    if self.op.mode not in (constants.INSTANCE_CREATE,
3283
                            constants.INSTANCE_IMPORT):
3284
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3285
                                 self.op.mode)
3286

    
3287
    # disk template and mirror node verification
3288
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3289
      raise errors.OpPrereqError("Invalid disk template name")
3290

    
3291
    if self.op.hypervisor is None:
3292
      self.op.hypervisor = self.cfg.GetHypervisorType()
3293

    
3294
    cluster = self.cfg.GetClusterInfo()
3295
    enabled_hvs = cluster.enabled_hypervisors
3296
    if self.op.hypervisor not in enabled_hvs:
3297
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3298
                                 " cluster (%s)" % (self.op.hypervisor,
3299
                                  ",".join(enabled_hvs)))
3300

    
3301
    # check hypervisor parameter syntax (locally)
3302

    
3303
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3304
                                  self.op.hvparams)
3305
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3306
    hv_type.CheckParameterSyntax(filled_hvp)
3307

    
3308
    # fill and remember the beparams dict
3309
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3310
                                    self.op.beparams)
3311

    
3312
    #### instance parameters check
3313

    
3314
    # instance name verification
3315
    hostname1 = utils.HostInfo(self.op.instance_name)
3316
    self.op.instance_name = instance_name = hostname1.name
3317

    
3318
    # this is just a preventive check, but someone might still add this
3319
    # instance in the meantime, and creation will fail at lock-add time
3320
    if instance_name in self.cfg.GetInstanceList():
3321
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3322
                                 instance_name)
3323

    
3324
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3325

    
3326
    # ip validity checks
3327
    ip = getattr(self.op, "ip", None)
3328
    if ip is None or ip.lower() == "none":
3329
      inst_ip = None
3330
    elif ip.lower() == "auto":
3331
      inst_ip = hostname1.ip
3332
    else:
3333
      if not utils.IsValidIP(ip):
3334
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3335
                                   " like a valid IP" % ip)
3336
      inst_ip = ip
3337
    self.inst_ip = self.op.ip = inst_ip
3338
    # used in CheckPrereq for ip ping check
3339
    self.check_ip = hostname1.ip
3340

    
3341
    # MAC address verification
3342
    if self.op.mac != "auto":
3343
      if not utils.IsValidMac(self.op.mac.lower()):
3344
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3345
                                   self.op.mac)
3346

    
3347
    # file storage checks
3348
    if (self.op.file_driver and
3349
        not self.op.file_driver in constants.FILE_DRIVER):
3350
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3351
                                 self.op.file_driver)
3352

    
3353
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3354
      raise errors.OpPrereqError("File storage directory path not absolute")
3355

    
3356
    ### Node/iallocator related checks
3357
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3358
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3359
                                 " node must be given")
3360

    
3361
    if self.op.iallocator:
3362
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3363
    else:
3364
      self.op.pnode = self._ExpandNode(self.op.pnode)
3365
      nodelist = [self.op.pnode]
3366
      if self.op.snode is not None:
3367
        self.op.snode = self._ExpandNode(self.op.snode)
3368
        nodelist.append(self.op.snode)
3369
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3370

    
3371
    # in case of import lock the source node too
3372
    if self.op.mode == constants.INSTANCE_IMPORT:
3373
      src_node = getattr(self.op, "src_node", None)
3374
      src_path = getattr(self.op, "src_path", None)
3375

    
3376
      if src_node is None or src_path is None:
3377
        raise errors.OpPrereqError("Importing an instance requires source"
3378
                                   " node and path options")
3379

    
3380
      if not os.path.isabs(src_path):
3381
        raise errors.OpPrereqError("The source path must be absolute")
3382

    
3383
      self.op.src_node = src_node = self._ExpandNode(src_node)
3384
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3385
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3386

    
3387
    else: # INSTANCE_CREATE
3388
      if getattr(self.op, "os_type", None) is None:
3389
        raise errors.OpPrereqError("No guest OS specified")
3390

    
3391
  def _RunAllocator(self):
3392
    """Run the allocator based on input opcode.
3393

3394
    """
3395
    disks = [{"size": self.op.disk_size, "mode": "w"},
3396
             {"size": self.op.swap_size, "mode": "w"}]
3397
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3398
             "bridge": self.op.bridge}]
3399
    ial = IAllocator(self,
3400
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3401
                     name=self.op.instance_name,
3402
                     disk_template=self.op.disk_template,
3403
                     tags=[],
3404
                     os=self.op.os_type,
3405
                     vcpus=self.be_full[constants.BE_VCPUS],
3406
                     mem_size=self.be_full[constants.BE_MEMORY],
3407
                     disks=disks,
3408
                     nics=nics,
3409
                     )
3410

    
3411
    ial.Run(self.op.iallocator)
3412

    
3413
    if not ial.success:
3414
      raise errors.OpPrereqError("Can't compute nodes using"
3415
                                 " iallocator '%s': %s" % (self.op.iallocator,
3416
                                                           ial.info))
3417
    if len(ial.nodes) != ial.required_nodes:
3418
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3419
                                 " of nodes (%s), required %s" %
3420
                                 (self.op.iallocator, len(ial.nodes),
3421
                                  ial.required_nodes))
3422
    self.op.pnode = ial.nodes[0]
3423
    logger.ToStdout("Selected nodes for the instance: %s" %
3424
                    (", ".join(ial.nodes),))
3425
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3426
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3427
    if ial.required_nodes == 2:
3428
      self.op.snode = ial.nodes[1]
3429

    
3430
  def BuildHooksEnv(self):
3431
    """Build hooks env.
3432

3433
    This runs on master, primary and secondary nodes of the instance.
3434

3435
    """
3436
    env = {
3437
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3438
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3439
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3440
      "INSTANCE_ADD_MODE": self.op.mode,
3441
      }
3442
    if self.op.mode == constants.INSTANCE_IMPORT:
3443
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3444
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3445
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3446

    
3447
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3448
      primary_node=self.op.pnode,
3449
      secondary_nodes=self.secondaries,
3450
      status=self.instance_status,
3451
      os_type=self.op.os_type,
3452
      memory=self.be_full[constants.BE_MEMORY],
3453
      vcpus=self.be_full[constants.BE_VCPUS],
3454
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3455
    ))
3456

    
3457
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3458
          self.secondaries)
3459
    return env, nl, nl
3460

    
3461

    
3462
  def CheckPrereq(self):
3463
    """Check prerequisites.
3464

3465
    """
3466
    if (not self.cfg.GetVGName() and
3467
        self.op.disk_template not in constants.DTS_NOT_LVM):
3468
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3469
                                 " instances")
3470

    
3471

    
3472
    if self.op.mode == constants.INSTANCE_IMPORT:
3473
      src_node = self.op.src_node
3474
      src_path = self.op.src_path
3475

    
3476
      export_info = self.rpc.call_export_info(src_node, src_path)
3477

    
3478
      if not export_info:
3479
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3480

    
3481
      if not export_info.has_section(constants.INISECT_EXP):
3482
        raise errors.ProgrammerError("Corrupted export config")
3483

    
3484
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3485
      if (int(ei_version) != constants.EXPORT_VERSION):
3486
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3487
                                   (ei_version, constants.EXPORT_VERSION))
3488

    
3489
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3490
        raise errors.OpPrereqError("Can't import instance with more than"
3491
                                   " one data disk")
3492

    
3493
      # FIXME: are the old os-es, disk sizes, etc. useful?
3494
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3495
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3496
                                                         'disk0_dump'))
3497
      self.src_image = diskimage
3498

    
3499
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3500

    
3501
    if self.op.start and not self.op.ip_check:
3502
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3503
                                 " adding an instance in start mode")
3504

    
3505
    if self.op.ip_check:
3506
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3507
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3508
                                   (self.check_ip, self.op.instance_name))
3509

    
3510
    # bridge verification
3511
    bridge = getattr(self.op, "bridge", None)
3512
    if bridge is None:
3513
      self.op.bridge = self.cfg.GetDefBridge()
3514
    else:
3515
      self.op.bridge = bridge
3516

    
3517
    #### allocator run
3518

    
3519
    if self.op.iallocator is not None:
3520
      self._RunAllocator()
3521

    
3522
    #### node related checks
3523

    
3524
    # check primary node
3525
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3526
    assert self.pnode is not None, \
3527
      "Cannot retrieve locked node %s" % self.op.pnode
3528
    self.secondaries = []
3529

    
3530
    # mirror node verification
3531
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3532
      if self.op.snode is None:
3533
        raise errors.OpPrereqError("The networked disk templates need"
3534
                                   " a mirror node")
3535
      if self.op.snode == pnode.name:
3536
        raise errors.OpPrereqError("The secondary node cannot be"
3537
                                   " the primary node.")
3538
      self.secondaries.append(self.op.snode)
3539

    
3540
    nodenames = [pnode.name] + self.secondaries
3541

    
3542
    req_size = _ComputeDiskSize(self.op.disk_template,
3543
                                self.op.disk_size, self.op.swap_size)
3544

    
3545
    # Check lv size requirements
3546
    if req_size is not None:
3547
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3548
                                         self.op.hypervisor)
3549
      for node in nodenames:
3550
        info = nodeinfo.get(node, None)
3551
        if not info:
3552
          raise errors.OpPrereqError("Cannot get current information"
3553
                                     " from node '%s'" % node)
3554
        vg_free = info.get('vg_free', None)
3555
        if not isinstance(vg_free, int):
3556
          raise errors.OpPrereqError("Can't compute free disk space on"
3557
                                     " node %s" % node)
3558
        if req_size > info['vg_free']:
3559
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3560
                                     " %d MB available, %d MB required" %
3561
                                     (node, info['vg_free'], req_size))
3562

    
3563
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3564

    
3565
    # os verification
3566
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3567
    if not os_obj:
3568
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3569
                                 " primary node"  % self.op.os_type)
3570

    
3571
    # bridge check on primary node
3572
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3573
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3574
                                 " destination node '%s'" %
3575
                                 (self.op.bridge, pnode.name))
3576

    
3577
    # memory check on primary node
3578
    if self.op.start:
3579
      _CheckNodeFreeMemory(self, self.pnode.name,
3580
                           "creating instance %s" % self.op.instance_name,
3581
                           self.be_full[constants.BE_MEMORY],
3582
                           self.op.hypervisor)
3583

    
3584
    if self.op.start:
3585
      self.instance_status = 'up'
3586
    else:
3587
      self.instance_status = 'down'
3588

    
3589
  def Exec(self, feedback_fn):
3590
    """Create and add the instance to the cluster.
3591

3592
    """
3593
    instance = self.op.instance_name
3594
    pnode_name = self.pnode.name
3595

    
3596
    if self.op.mac == "auto":
3597
      mac_address = self.cfg.GenerateMAC()
3598
    else:
3599
      mac_address = self.op.mac
3600

    
3601
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3602
    if self.inst_ip is not None:
3603
      nic.ip = self.inst_ip
3604

    
3605
    ht_kind = self.op.hypervisor
3606
    if ht_kind in constants.HTS_REQ_PORT:
3607
      network_port = self.cfg.AllocatePort()
3608
    else:
3609
      network_port = None
3610

    
3611
    ##if self.op.vnc_bind_address is None:
3612
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3613

    
3614
    # this is needed because os.path.join does not accept None arguments
3615
    if self.op.file_storage_dir is None:
3616
      string_file_storage_dir = ""
3617
    else:
3618
      string_file_storage_dir = self.op.file_storage_dir
3619

    
3620
    # build the full file storage dir path
3621
    file_storage_dir = os.path.normpath(os.path.join(
3622
                                        self.cfg.GetFileStorageDir(),
3623
                                        string_file_storage_dir, instance))
3624

    
3625

    
3626
    disks = _GenerateDiskTemplate(self,
3627
                                  self.op.disk_template,
3628
                                  instance, pnode_name,
3629
                                  self.secondaries, self.op.disk_size,
3630
                                  self.op.swap_size,
3631
                                  file_storage_dir,
3632
                                  self.op.file_driver)
3633

    
3634
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3635
                            primary_node=pnode_name,
3636
                            nics=[nic], disks=disks,
3637
                            disk_template=self.op.disk_template,
3638
                            status=self.instance_status,
3639
                            network_port=network_port,
3640
                            beparams=self.op.beparams,
3641
                            hvparams=self.op.hvparams,
3642
                            hypervisor=self.op.hypervisor,
3643
                            )
3644

    
3645
    feedback_fn("* creating instance disks...")
3646
    if not _CreateDisks(self, iobj):
3647
      _RemoveDisks(self, iobj)
3648
      self.cfg.ReleaseDRBDMinors(instance)
3649
      raise errors.OpExecError("Device creation failed, reverting...")
3650

    
3651
    feedback_fn("adding instance %s to cluster config" % instance)
3652

    
3653
    self.cfg.AddInstance(iobj)
3654
    # Declare that we don't want to remove the instance lock anymore, as we've
3655
    # added the instance to the config
3656
    del self.remove_locks[locking.LEVEL_INSTANCE]
3657
    # Remove the temp. assignements for the instance's drbds
3658
    self.cfg.ReleaseDRBDMinors(instance)
3659

    
3660
    if self.op.wait_for_sync:
3661
      disk_abort = not _WaitForSync(self, iobj)
3662
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3663
      # make sure the disks are not degraded (still sync-ing is ok)
3664
      time.sleep(15)
3665
      feedback_fn("* checking mirrors status")
3666
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3667
    else:
3668
      disk_abort = False
3669

    
3670
    if disk_abort:
3671
      _RemoveDisks(self, iobj)
3672
      self.cfg.RemoveInstance(iobj.name)
3673
      # Make sure the instance lock gets removed
3674
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3675
      raise errors.OpExecError("There are some degraded disks for"
3676
                               " this instance")
3677

    
3678
    feedback_fn("creating os for instance %s on node %s" %
3679
                (instance, pnode_name))
3680

    
3681
    if iobj.disk_template != constants.DT_DISKLESS:
3682
      if self.op.mode == constants.INSTANCE_CREATE:
3683
        feedback_fn("* running the instance OS create scripts...")
3684
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3685
          raise errors.OpExecError("could not add os for instance %s"
3686
                                   " on node %s" %
3687
                                   (instance, pnode_name))
3688

    
3689
      elif self.op.mode == constants.INSTANCE_IMPORT:
3690
        feedback_fn("* running the instance OS import scripts...")
3691
        src_node = self.op.src_node
3692
        src_image = self.src_image
3693
        cluster_name = self.cfg.GetClusterName()
3694
        if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3695
                                                src_node, src_image,
3696
                                                cluster_name):
3697
          raise errors.OpExecError("Could not import os for instance"
3698
                                   " %s on node %s" %
3699
                                   (instance, pnode_name))
3700
      else:
3701
        # also checked in the prereq part
3702
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3703
                                     % self.op.mode)
3704

    
3705
    if self.op.start:
3706
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3707
      feedback_fn("* starting instance...")
3708
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3709
        raise errors.OpExecError("Could not start instance")
3710

    
3711

    
3712
class LUConnectConsole(NoHooksLU):
3713
  """Connect to an instance's console.
3714

3715
  This is somewhat special in that it returns the command line that
3716
  you need to run on the master node in order to connect to the
3717
  console.
3718

3719
  """
3720
  _OP_REQP = ["instance_name"]
3721
  REQ_BGL = False
3722

    
3723
  def ExpandNames(self):
3724
    self._ExpandAndLockInstance()
3725

    
3726
  def CheckPrereq(self):
3727
    """Check prerequisites.
3728

3729
    This checks that the instance is in the cluster.
3730

3731
    """
3732
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3733
    assert self.instance is not None, \
3734
      "Cannot retrieve locked instance %s" % self.op.instance_name
3735

    
3736
  def Exec(self, feedback_fn):
3737
    """Connect to the console of an instance
3738

3739
    """
3740
    instance = self.instance
3741
    node = instance.primary_node
3742

    
3743
    node_insts = self.rpc.call_instance_list([node],
3744
                                             [instance.hypervisor])[node]
3745
    if node_insts is False:
3746
      raise errors.OpExecError("Can't connect to node %s." % node)
3747

    
3748
    if instance.name not in node_insts:
3749
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3750

    
3751
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3752

    
3753
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3754
    console_cmd = hyper.GetShellCommandForConsole(instance)
3755

    
3756
    # build ssh cmdline
3757
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3758

    
3759

    
3760
class LUReplaceDisks(LogicalUnit):
3761
  """Replace the disks of an instance.
3762

3763
  """
3764
  HPATH = "mirrors-replace"
3765
  HTYPE = constants.HTYPE_INSTANCE
3766
  _OP_REQP = ["instance_name", "mode", "disks"]
3767
  REQ_BGL = False
3768

    
3769
  def ExpandNames(self):
3770
    self._ExpandAndLockInstance()
3771

    
3772
    if not hasattr(self.op, "remote_node"):
3773
      self.op.remote_node = None
3774

    
3775
    ia_name = getattr(self.op, "iallocator", None)
3776
    if ia_name is not None:
3777
      if self.op.remote_node is not None:
3778
        raise errors.OpPrereqError("Give either the iallocator or the new"
3779
                                   " secondary, not both")
3780
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3781
    elif self.op.remote_node is not None:
3782
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3783
      if remote_node is None:
3784
        raise errors.OpPrereqError("Node '%s' not known" %
3785
                                   self.op.remote_node)
3786
      self.op.remote_node = remote_node
3787
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3788
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3789
    else:
3790
      self.needed_locks[locking.LEVEL_NODE] = []
3791
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3792

    
3793
  def DeclareLocks(self, level):
3794
    # If we're not already locking all nodes in the set we have to declare the
3795
    # instance's primary/secondary nodes.
3796
    if (level == locking.LEVEL_NODE and
3797
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3798
      self._LockInstancesNodes()
3799

    
3800
  def _RunAllocator(self):
3801
    """Compute a new secondary node using an IAllocator.
3802

3803
    """
3804
    ial = IAllocator(self,
3805
                     mode=constants.IALLOCATOR_MODE_RELOC,
3806
                     name=self.op.instance_name,
3807
                     relocate_from=[self.sec_node])
3808

    
3809
    ial.Run(self.op.iallocator)
3810

    
3811
    if not ial.success:
3812
      raise errors.OpPrereqError("Can't compute nodes using"
3813
                                 " iallocator '%s': %s" % (self.op.iallocator,
3814
                                                           ial.info))
3815
    if len(ial.nodes) != ial.required_nodes:
3816
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3817
                                 " of nodes (%s), required %s" %
3818
                                 (len(ial.nodes), ial.required_nodes))
3819
    self.op.remote_node = ial.nodes[0]
3820
    logger.ToStdout("Selected new secondary for the instance: %s" %
3821
                    self.op.remote_node)
3822

    
3823
  def BuildHooksEnv(self):
3824
    """Build hooks env.
3825

3826
    This runs on the master, the primary and all the secondaries.
3827

3828
    """
3829
    env = {
3830
      "MODE": self.op.mode,
3831
      "NEW_SECONDARY": self.op.remote_node,
3832
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3833
      }
3834
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3835
    nl = [
3836
      self.cfg.GetMasterNode(),
3837
      self.instance.primary_node,
3838
      ]
3839
    if self.op.remote_node is not None:
3840
      nl.append(self.op.remote_node)
3841
    return env, nl, nl
3842

    
3843
  def CheckPrereq(self):
3844
    """Check prerequisites.
3845

3846
    This checks that the instance is in the cluster.
3847

3848
    """
3849
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3850
    assert instance is not None, \
3851
      "Cannot retrieve locked instance %s" % self.op.instance_name
3852
    self.instance = instance
3853

    
3854
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3855
      raise errors.OpPrereqError("Instance's disk layout is not"
3856
                                 " network mirrored.")
3857

    
3858
    if len(instance.secondary_nodes) != 1:
3859
      raise errors.OpPrereqError("The instance has a strange layout,"
3860
                                 " expected one secondary but found %d" %
3861
                                 len(instance.secondary_nodes))
3862

    
3863
    self.sec_node = instance.secondary_nodes[0]
3864

    
3865
    ia_name = getattr(self.op, "iallocator", None)
3866
    if ia_name is not None:
3867
      self._RunAllocator()
3868

    
3869
    remote_node = self.op.remote_node
3870
    if remote_node is not None:
3871
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3872
      assert self.remote_node_info is not None, \
3873
        "Cannot retrieve locked node %s" % remote_node
3874
    else:
3875
      self.remote_node_info = None
3876
    if remote_node == instance.primary_node:
3877
      raise errors.OpPrereqError("The specified node is the primary node of"
3878
                                 " the instance.")
3879
    elif remote_node == self.sec_node:
3880
      if self.op.mode == constants.REPLACE_DISK_SEC:
3881
        # this is for DRBD8, where we can't execute the same mode of
3882
        # replacement as for drbd7 (no different port allocated)
3883
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3884
                                   " replacement")
3885
    if instance.disk_template == constants.DT_DRBD8:
3886
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3887
          remote_node is not None):
3888
        # switch to replace secondary mode
3889
        self.op.mode = constants.REPLACE_DISK_SEC
3890

    
3891
      if self.op.mode == constants.REPLACE_DISK_ALL:
3892
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3893
                                   " secondary disk replacement, not"
3894
                                   " both at once")
3895
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3896
        if remote_node is not None:
3897
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3898
                                     " the secondary while doing a primary"
3899
                                     " node disk replacement")
3900
        self.tgt_node = instance.primary_node
3901
        self.oth_node = instance.secondary_nodes[0]
3902
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3903
        self.new_node = remote_node # this can be None, in which case
3904
                                    # we don't change the secondary
3905
        self.tgt_node = instance.secondary_nodes[0]
3906
        self.oth_node = instance.primary_node
3907
      else:
3908
        raise errors.ProgrammerError("Unhandled disk replace mode")
3909

    
3910
    for name in self.op.disks:
3911
      if instance.FindDisk(name) is None:
3912
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3913
                                   (name, instance.name))
3914

    
3915
  def _ExecD8DiskOnly(self, feedback_fn):
3916
    """Replace a disk on the primary or secondary for dbrd8.
3917

3918
    The algorithm for replace is quite complicated:
3919
      - for each disk to be replaced:
3920
        - create new LVs on the target node with unique names
3921
        - detach old LVs from the drbd device
3922
        - rename old LVs to name_replaced.<time_t>
3923
        - rename new LVs to old LVs
3924
        - attach the new LVs (with the old names now) to the drbd device
3925
      - wait for sync across all devices
3926
      - for each modified disk:
3927
        - remove old LVs (which have the name name_replaces.<time_t>)
3928

3929
    Failures are not very well handled.
3930

3931
    """
3932
    steps_total = 6
3933
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3934
    instance = self.instance
3935
    iv_names = {}
3936
    vgname = self.cfg.GetVGName()
3937
    # start of work
3938
    cfg = self.cfg
3939
    tgt_node = self.tgt_node
3940
    oth_node = self.oth_node
3941

    
3942
    # Step: check device activation
3943
    self.proc.LogStep(1, steps_total, "check device existence")
3944
    info("checking volume groups")
3945
    my_vg = cfg.GetVGName()
3946
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3947
    if not results:
3948
      raise errors.OpExecError("Can't list volume groups on the nodes")
3949
    for node in oth_node, tgt_node:
3950
      res = results.get(node, False)
3951
      if not res or my_vg not in res:
3952
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3953
                                 (my_vg, node))
3954
    for dev in instance.disks:
3955
      if not dev.iv_name in self.op.disks:
3956
        continue
3957
      for node in tgt_node, oth_node:
3958
        info("checking %s on %s" % (dev.iv_name, node))
3959
        cfg.SetDiskID(dev, node)
3960
        if not self.rpc.call_blockdev_find(node, dev):
3961
          raise errors.OpExecError("Can't find device %s on node %s" %
3962
                                   (dev.iv_name, node))
3963

    
3964
    # Step: check other node consistency
3965
    self.proc.LogStep(2, steps_total, "check peer consistency")
3966
    for dev in instance.disks:
3967
      if not dev.iv_name in self.op.disks:
3968
        continue
3969
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3970
      if not _CheckDiskConsistency(self, dev, oth_node,
3971
                                   oth_node==instance.primary_node):
3972
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3973
                                 " to replace disks on this node (%s)" %
3974
                                 (oth_node, tgt_node))
3975

    
3976
    # Step: create new storage
3977
    self.proc.LogStep(3, steps_total, "allocate new storage")
3978
    for dev in instance.disks:
3979
      if not dev.iv_name in self.op.disks:
3980
        continue
3981
      size = dev.size
3982
      cfg.SetDiskID(dev, tgt_node)
3983
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3984
      names = _GenerateUniqueNames(self, lv_names)
3985
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3986
                             logical_id=(vgname, names[0]))
3987
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3988
                             logical_id=(vgname, names[1]))
3989
      new_lvs = [lv_data, lv_meta]
3990
      old_lvs = dev.children
3991
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3992
      info("creating new local storage on %s for %s" %
3993
           (tgt_node, dev.iv_name))
3994
      # since we *always* want to create this LV, we use the
3995
      # _Create...OnPrimary (which forces the creation), even if we
3996
      # are talking about the secondary node
3997
      for new_lv in new_lvs:
3998
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3999
                                        _GetInstanceInfoText(instance)):
4000
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4001
                                   " node '%s'" %
4002
                                   (new_lv.logical_id[1], tgt_node))
4003

    
4004
    # Step: for each lv, detach+rename*2+attach
4005
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4006
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4007
      info("detaching %s drbd from local storage" % dev.iv_name)
4008
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4009
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4010
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4011
      #dev.children = []
4012
      #cfg.Update(instance)
4013

    
4014
      # ok, we created the new LVs, so now we know we have the needed
4015
      # storage; as such, we proceed on the target node to rename
4016
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4017
      # using the assumption that logical_id == physical_id (which in
4018
      # turn is the unique_id on that node)
4019

    
4020
      # FIXME(iustin): use a better name for the replaced LVs
4021
      temp_suffix = int(time.time())
4022
      ren_fn = lambda d, suff: (d.physical_id[0],
4023
                                d.physical_id[1] + "_replaced-%s" % suff)
4024
      # build the rename list based on what LVs exist on the node
4025
      rlist = []
4026
      for to_ren in old_lvs:
4027
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4028
        if find_res is not None: # device exists
4029
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4030

    
4031
      info("renaming the old LVs on the target node")
4032
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4033
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4034
      # now we rename the new LVs to the old LVs
4035
      info("renaming the new LVs on the target node")
4036
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4037
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4038
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4039

    
4040
      for old, new in zip(old_lvs, new_lvs):
4041
        new.logical_id = old.logical_id
4042
        cfg.SetDiskID(new, tgt_node)
4043

    
4044
      for disk in old_lvs:
4045
        disk.logical_id = ren_fn(disk, temp_suffix)
4046
        cfg.SetDiskID(disk, tgt_node)
4047

    
4048
      # now that the new lvs have the old name, we can add them to the device
4049
      info("adding new mirror component on %s" % tgt_node)
4050
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4051
        for new_lv in new_lvs:
4052
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4053
            warning("Can't rollback device %s", hint="manually cleanup unused"
4054
                    " logical volumes")
4055
        raise errors.OpExecError("Can't add local storage to drbd")
4056

    
4057
      dev.children = new_lvs
4058
      cfg.Update(instance)
4059

    
4060
    # Step: wait for sync
4061

    
4062
    # this can fail as the old devices are degraded and _WaitForSync
4063
    # does a combined result over all disks, so we don't check its
4064
    # return value
4065
    self.proc.LogStep(5, steps_total, "sync devices")
4066
    _WaitForSync(self, instance, unlock=True)
4067

    
4068
    # so check manually all the devices
4069
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4070
      cfg.SetDiskID(dev, instance.primary_node)
4071
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4072
      if is_degr:
4073
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4074

    
4075
    # Step: remove old storage
4076
    self.proc.LogStep(6, steps_total, "removing old storage")
4077
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4078
      info("remove logical volumes for %s" % name)
4079
      for lv in old_lvs:
4080
        cfg.SetDiskID(lv, tgt_node)
4081
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4082
          warning("Can't remove old LV", hint="manually remove unused LVs")
4083
          continue
4084

    
4085
  def _ExecD8Secondary(self, feedback_fn):
4086
    """Replace the secondary node for drbd8.
4087

4088
    The algorithm for replace is quite complicated:
4089
      - for all disks of the instance:
4090
        - create new LVs on the new node with same names
4091
        - shutdown the drbd device on the old secondary
4092
        - disconnect the drbd network on the primary
4093
        - create the drbd device on the new secondary
4094
        - network attach the drbd on the primary, using an artifice:
4095
          the drbd code for Attach() will connect to the network if it
4096
          finds a device which is connected to the good local disks but
4097
          not network enabled
4098
      - wait for sync across all devices
4099
      - remove all disks from the old secondary
4100

4101
    Failures are not very well handled.
4102

4103
    """
4104
    steps_total = 6
4105
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4106
    instance = self.instance
4107
    iv_names = {}
4108
    vgname = self.cfg.GetVGName()
4109
    # start of work
4110
    cfg = self.cfg
4111
    old_node = self.tgt_node
4112
    new_node = self.new_node
4113
    pri_node = instance.primary_node
4114

    
4115
    # Step: check device activation
4116
    self.proc.LogStep(1, steps_total, "check device existence")
4117
    info("checking volume groups")
4118
    my_vg = cfg.GetVGName()
4119
    results = self.rpc.call_vg_list([pri_node, new_node])
4120
    if not results:
4121
      raise errors.OpExecError("Can't list volume groups on the nodes")
4122
    for node in pri_node, new_node:
4123
      res = results.get(node, False)
4124
      if not res or my_vg not in res:
4125
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4126
                                 (my_vg, node))
4127
    for dev in instance.disks:
4128
      if not dev.iv_name in self.op.disks:
4129
        continue
4130
      info("checking %s on %s" % (dev.iv_name, pri_node))
4131
      cfg.SetDiskID(dev, pri_node)
4132
      if not self.rpc.call_blockdev_find(pri_node, dev):
4133
        raise errors.OpExecError("Can't find device %s on node %s" %
4134
                                 (dev.iv_name, pri_node))
4135

    
4136
    # Step: check other node consistency
4137
    self.proc.LogStep(2, steps_total, "check peer consistency")
4138
    for dev in instance.disks:
4139
      if not dev.iv_name in self.op.disks:
4140
        continue
4141
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4142
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4143
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4144
                                 " unsafe to replace the secondary" %
4145
                                 pri_node)
4146

    
4147
    # Step: create new storage
4148
    self.proc.LogStep(3, steps_total, "allocate new storage")
4149
    for dev in instance.disks:
4150
      size = dev.size
4151
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4152
      # since we *always* want to create this LV, we use the
4153
      # _Create...OnPrimary (which forces the creation), even if we
4154
      # are talking about the secondary node
4155
      for new_lv in dev.children:
4156
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4157
                                        _GetInstanceInfoText(instance)):
4158
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4159
                                   " node '%s'" %
4160
                                   (new_lv.logical_id[1], new_node))
4161

    
4162

    
4163
    # Step 4: dbrd minors and drbd setups changes
4164
    # after this, we must manually remove the drbd minors on both the
4165
    # error and the success paths
4166
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4167
                                   instance.name)
4168
    logging.debug("Allocated minors %s" % (minors,))
4169
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4170
    for dev, new_minor in zip(instance.disks, minors):
4171
      size = dev.size
4172
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4173
      # create new devices on new_node
4174
      if pri_node == dev.logical_id[0]:
4175
        new_logical_id = (pri_node, new_node,
4176
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4177
                          dev.logical_id[5])
4178
      else:
4179
        new_logical_id = (new_node, pri_node,
4180
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4181
                          dev.logical_id[5])
4182
      iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4183
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4184
                    new_logical_id)
4185
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4186
                              logical_id=new_logical_id,
4187
                              children=dev.children)
4188
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4189
                                        new_drbd, False,
4190
                                        _GetInstanceInfoText(instance)):
4191
        self.cfg.ReleaseDRBDMinors(instance.name)
4192
        raise errors.OpExecError("Failed to create new DRBD on"
4193
                                 " node '%s'" % new_node)
4194

    
4195
    for dev in instance.disks:
4196
      # we have new devices, shutdown the drbd on the old secondary
4197
      info("shutting down drbd for %s on old node" % dev.iv_name)
4198
      cfg.SetDiskID(dev, old_node)
4199
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4200
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4201
                hint="Please cleanup this device manually as soon as possible")
4202

    
4203
    info("detaching primary drbds from the network (=> standalone)")
4204
    done = 0
4205
    for dev in instance.disks:
4206
      cfg.SetDiskID(dev, pri_node)
4207
      # set the network part of the physical (unique in bdev terms) id
4208
      # to None, meaning detach from network
4209
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4210
      # and 'find' the device, which will 'fix' it to match the
4211
      # standalone state
4212
      if self.rpc.call_blockdev_find(pri_node, dev):
4213
        done += 1
4214
      else:
4215
        warning("Failed to detach drbd %s from network, unusual case" %
4216
                dev.iv_name)
4217

    
4218
    if not done:
4219
      # no detaches succeeded (very unlikely)
4220
      self.cfg.ReleaseDRBDMinors(instance.name)
4221
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4222

    
4223
    # if we managed to detach at least one, we update all the disks of
4224
    # the instance to point to the new secondary
4225
    info("updating instance configuration")
4226
    for dev, _, new_logical_id in iv_names.itervalues():
4227
      dev.logical_id = new_logical_id
4228
      cfg.SetDiskID(dev, pri_node)
4229
    cfg.Update(instance)
4230
    # we can remove now the temp minors as now the new values are
4231
    # written to the config file (and therefore stable)
4232
    self.cfg.ReleaseDRBDMinors(instance.name)
4233

    
4234
    # and now perform the drbd attach
4235
    info("attaching primary drbds to new secondary (standalone => connected)")
4236
    failures = []
4237
    for dev in instance.disks:
4238
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4239
      # since the attach is smart, it's enough to 'find' the device,
4240
      # it will automatically activate the network, if the physical_id
4241
      # is correct
4242
      cfg.SetDiskID(dev, pri_node)
4243
      logging.debug("Disk to attach: %s", dev)
4244
      if not self.rpc.call_blockdev_find(pri_node, dev):
4245
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4246
                "please do a gnt-instance info to see the status of disks")
4247

    
4248
    # this can fail as the old devices are degraded and _WaitForSync
4249
    # does a combined result over all disks, so we don't check its
4250
    # return value
4251
    self.proc.LogStep(5, steps_total, "sync devices")
4252
    _WaitForSync(self, instance, unlock=True)
4253

    
4254
    # so check manually all the devices
4255
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4256
      cfg.SetDiskID(dev, pri_node)
4257
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4258
      if is_degr:
4259
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4260

    
4261
    self.proc.LogStep(6, steps_total, "removing old storage")
4262
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4263
      info("remove logical volumes for %s" % name)
4264
      for lv in old_lvs:
4265
        cfg.SetDiskID(lv, old_node)
4266
        if not self.rpc.call_blockdev_remove(old_node, lv):
4267
          warning("Can't remove LV on old secondary",
4268
                  hint="Cleanup stale volumes by hand")
4269

    
4270
  def Exec(self, feedback_fn):
4271
    """Execute disk replacement.
4272

4273
    This dispatches the disk replacement to the appropriate handler.
4274

4275
    """
4276
    instance = self.instance
4277

    
4278
    # Activate the instance disks if we're replacing them on a down instance
4279
    if instance.status == "down":
4280
      _StartInstanceDisks(self, instance, True)
4281

    
4282
    if instance.disk_template == constants.DT_DRBD8:
4283
      if self.op.remote_node is None:
4284
        fn = self._ExecD8DiskOnly
4285
      else:
4286
        fn = self._ExecD8Secondary
4287
    else:
4288
      raise errors.ProgrammerError("Unhandled disk replacement case")
4289

    
4290
    ret = fn(feedback_fn)
4291

    
4292
    # Deactivate the instance disks if we're replacing them on a down instance
4293
    if instance.status == "down":
4294
      _SafeShutdownInstanceDisks(self, instance)
4295

    
4296
    return ret
4297

    
4298

    
4299
class LUGrowDisk(LogicalUnit):
4300
  """Grow a disk of an instance.
4301

4302
  """
4303
  HPATH = "disk-grow"
4304
  HTYPE = constants.HTYPE_INSTANCE
4305
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4306
  REQ_BGL = False
4307

    
4308
  def ExpandNames(self):
4309
    self._ExpandAndLockInstance()
4310
    self.needed_locks[locking.LEVEL_NODE] = []
4311
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4312

    
4313
  def DeclareLocks(self, level):
4314
    if level == locking.LEVEL_NODE:
4315
      self._LockInstancesNodes()
4316

    
4317
  def BuildHooksEnv(self):
4318
    """Build hooks env.
4319

4320
    This runs on the master, the primary and all the secondaries.
4321

4322
    """
4323
    env = {
4324
      "DISK": self.op.disk,
4325
      "AMOUNT": self.op.amount,
4326
      }
4327
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4328
    nl = [
4329
      self.cfg.GetMasterNode(),
4330
      self.instance.primary_node,
4331
      ]
4332
    return env, nl, nl
4333

    
4334
  def CheckPrereq(self):
4335
    """Check prerequisites.
4336

4337
    This checks that the instance is in the cluster.
4338

4339
    """
4340
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4341
    assert instance is not None, \
4342
      "Cannot retrieve locked instance %s" % self.op.instance_name
4343

    
4344
    self.instance = instance
4345

    
4346
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4347
      raise errors.OpPrereqError("Instance's disk layout does not support"
4348
                                 " growing.")
4349

    
4350
    if instance.FindDisk(self.op.disk) is None:
4351
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4352
                                 (self.op.disk, instance.name))
4353

    
4354
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4355
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4356
                                       instance.hypervisor)
4357
    for node in nodenames:
4358
      info = nodeinfo.get(node, None)
4359
      if not info:
4360
        raise errors.OpPrereqError("Cannot get current information"
4361
                                   " from node '%s'" % node)
4362
      vg_free = info.get('vg_free', None)
4363
      if not isinstance(vg_free, int):
4364
        raise errors.OpPrereqError("Can't compute free disk space on"
4365
                                   " node %s" % node)
4366
      if self.op.amount > info['vg_free']:
4367
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4368
                                   " %d MiB available, %d MiB required" %
4369
                                   (node, info['vg_free'], self.op.amount))
4370

    
4371
  def Exec(self, feedback_fn):
4372
    """Execute disk grow.
4373

4374
    """
4375
    instance = self.instance
4376
    disk = instance.FindDisk(self.op.disk)
4377
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4378
      self.cfg.SetDiskID(disk, node)
4379
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4380
      if (not result or not isinstance(result, (list, tuple)) or
4381
          len(result) != 2):
4382
        raise errors.OpExecError("grow request failed to node %s" % node)
4383
      elif not result[0]:
4384
        raise errors.OpExecError("grow request failed to node %s: %s" %
4385
                                 (node, result[1]))
4386
    disk.RecordGrow(self.op.amount)
4387
    self.cfg.Update(instance)
4388
    if self.op.wait_for_sync:
4389
      disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4390
      if disk_abort:
4391
        logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4392
                     " Please check the instance.")
4393

    
4394

    
4395
class LUQueryInstanceData(NoHooksLU):
4396
  """Query runtime instance data.
4397

4398
  """
4399
  _OP_REQP = ["instances", "static"]
4400
  REQ_BGL = False
4401

    
4402
  def ExpandNames(self):
4403
    self.needed_locks = {}
4404
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4405

    
4406
    if not isinstance(self.op.instances, list):
4407
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4408

    
4409
    if self.op.instances:
4410
      self.wanted_names = []
4411
      for name in self.op.instances:
4412
        full_name = self.cfg.ExpandInstanceName(name)
4413
        if full_name is None:
4414
          raise errors.OpPrereqError("Instance '%s' not known" %
4415
                                     self.op.instance_name)
4416
        self.wanted_names.append(full_name)
4417
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4418
    else:
4419
      self.wanted_names = None
4420
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4421

    
4422
    self.needed_locks[locking.LEVEL_NODE] = []
4423
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4424

    
4425
  def DeclareLocks(self, level):
4426
    if level == locking.LEVEL_NODE:
4427
      self._LockInstancesNodes()
4428

    
4429
  def CheckPrereq(self):
4430
    """Check prerequisites.
4431

4432
    This only checks the optional instance list against the existing names.
4433

4434
    """
4435
    if self.wanted_names is None:
4436
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4437

    
4438
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4439
                             in self.wanted_names]
4440
    return
4441

    
4442
  def _ComputeDiskStatus(self, instance, snode, dev):
4443
    """Compute block device status.
4444

4445
    """
4446
    static = self.op.static
4447
    if not static:
4448
      self.cfg.SetDiskID(dev, instance.primary_node)
4449
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4450
    else:
4451
      dev_pstatus = None
4452

    
4453
    if dev.dev_type in constants.LDS_DRBD:
4454
      # we change the snode then (otherwise we use the one passed in)
4455
      if dev.logical_id[0] == instance.primary_node:
4456
        snode = dev.logical_id[1]
4457
      else:
4458
        snode = dev.logical_id[0]
4459

    
4460
    if snode and not static:
4461
      self.cfg.SetDiskID(dev, snode)
4462
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4463
    else:
4464
      dev_sstatus = None
4465

    
4466
    if dev.children:
4467
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4468
                      for child in dev.children]
4469
    else:
4470
      dev_children = []
4471

    
4472
    data = {
4473
      "iv_name": dev.iv_name,
4474
      "dev_type": dev.dev_type,
4475
      "logical_id": dev.logical_id,
4476
      "physical_id": dev.physical_id,
4477
      "pstatus": dev_pstatus,
4478
      "sstatus": dev_sstatus,
4479
      "children": dev_children,
4480
      }
4481

    
4482
    return data
4483

    
4484
  def Exec(self, feedback_fn):
4485
    """Gather and return data"""
4486
    result = {}
4487

    
4488
    cluster = self.cfg.GetClusterInfo()
4489

    
4490
    for instance in self.wanted_instances:
4491
      if not self.op.static:
4492
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4493
                                                  instance.name,
4494
                                                  instance.hypervisor)
4495
        if remote_info and "state" in remote_info:
4496
          remote_state = "up"
4497
        else:
4498
          remote_state = "down"
4499
      else:
4500
        remote_state = None
4501
      if instance.status == "down":
4502
        config_state = "down"
4503
      else:
4504
        config_state = "up"
4505

    
4506
      disks = [self._ComputeDiskStatus(instance, None, device)
4507
               for device in instance.disks]
4508

    
4509
      idict = {
4510
        "name": instance.name,
4511
        "config_state": config_state,
4512
        "run_state": remote_state,
4513
        "pnode": instance.primary_node,
4514
        "snodes": instance.secondary_nodes,
4515
        "os": instance.os,
4516
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4517
        "disks": disks,
4518
        "hypervisor": instance.hypervisor,
4519
        "network_port": instance.network_port,
4520
        "hv_instance": instance.hvparams,
4521
        "hv_actual": cluster.FillHV(instance),
4522
        "be_instance": instance.beparams,
4523
        "be_actual": cluster.FillBE(instance),
4524
        }
4525

    
4526
      result[instance.name] = idict
4527

    
4528
    return result
4529

    
4530

    
4531
class LUSetInstanceParams(LogicalUnit):
4532
  """Modifies an instances's parameters.
4533

4534
  """
4535
  HPATH = "instance-modify"
4536
  HTYPE = constants.HTYPE_INSTANCE
4537
  _OP_REQP = ["instance_name", "hvparams"]
4538
  REQ_BGL = False
4539

    
4540
  def ExpandNames(self):
4541
    self._ExpandAndLockInstance()
4542
    self.needed_locks[locking.LEVEL_NODE] = []
4543
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4544

    
4545

    
4546
  def DeclareLocks(self, level):
4547
    if level == locking.LEVEL_NODE:
4548
      self._LockInstancesNodes()
4549

    
4550
  def BuildHooksEnv(self):
4551
    """Build hooks env.
4552

4553
    This runs on the master, primary and secondaries.
4554

4555
    """
4556
    args = dict()
4557
    if constants.BE_MEMORY in self.be_new:
4558
      args['memory'] = self.be_new[constants.BE_MEMORY]
4559
    if constants.BE_VCPUS in self.be_new:
4560
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4561
    if self.do_ip or self.do_bridge or self.mac:
4562
      if self.do_ip:
4563
        ip = self.ip
4564
      else:
4565
        ip = self.instance.nics[0].ip
4566
      if self.bridge:
4567
        bridge = self.bridge
4568
      else:
4569
        bridge = self.instance.nics[0].bridge
4570
      if self.mac:
4571
        mac = self.mac
4572
      else:
4573
        mac = self.instance.nics[0].mac
4574
      args['nics'] = [(ip, bridge, mac)]
4575
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4576
    nl = [self.cfg.GetMasterNode(),
4577
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4578
    return env, nl, nl
4579

    
4580
  def CheckPrereq(self):
4581
    """Check prerequisites.
4582

4583
    This only checks the instance list against the existing names.
4584

4585
    """
4586
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4587
    # a separate CheckArguments function, if we implement one, so the operation
4588
    # can be aborted without waiting for any lock, should it have an error...
4589
    self.ip = getattr(self.op, "ip", None)
4590
    self.mac = getattr(self.op, "mac", None)
4591
    self.bridge = getattr(self.op, "bridge", None)
4592
    self.kernel_path = getattr(self.op, "kernel_path", None)
4593
    self.initrd_path = getattr(self.op, "initrd_path", None)
4594
    self.force = getattr(self.op, "force", None)
4595
    all_parms = [self.ip, self.bridge, self.mac]
4596
    if (all_parms.count(None) == len(all_parms) and
4597
        not self.op.hvparams and
4598
        not self.op.beparams):
4599
      raise errors.OpPrereqError("No changes submitted")
4600
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4601
      val = self.op.beparams.get(item, None)
4602
      if val is not None:
4603
        try:
4604
          val = int(val)
4605
        except ValueError, err:
4606
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4607
        self.op.beparams[item] = val
4608
    if self.ip is not None:
4609
      self.do_ip = True
4610
      if self.ip.lower() == "none":
4611
        self.ip = None
4612
      else:
4613
        if not utils.IsValidIP(self.ip):
4614
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4615
    else:
4616
      self.do_ip = False
4617
    self.do_bridge = (self.bridge is not None)
4618
    if self.mac is not None:
4619
      if self.cfg.IsMacInUse(self.mac):
4620
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4621
                                   self.mac)
4622
      if not utils.IsValidMac(self.mac):
4623
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4624

    
4625
    # checking the new params on the primary/secondary nodes
4626

    
4627
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4628
    assert self.instance is not None, \
4629
      "Cannot retrieve locked instance %s" % self.op.instance_name
4630
    pnode = self.instance.primary_node
4631
    nodelist = [pnode]
4632
    nodelist.extend(instance.secondary_nodes)
4633

    
4634
    # hvparams processing
4635
    if self.op.hvparams:
4636
      i_hvdict = copy.deepcopy(instance.hvparams)
4637
      for key, val in self.op.hvparams.iteritems():
4638
        if val is None:
4639
          try:
4640
            del i_hvdict[key]
4641
          except KeyError:
4642
            pass
4643
        else:
4644
          i_hvdict[key] = val
4645
      cluster = self.cfg.GetClusterInfo()
4646
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4647
                                i_hvdict)
4648
      # local check
4649
      hypervisor.GetHypervisor(
4650
        instance.hypervisor).CheckParameterSyntax(hv_new)
4651
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4652
      self.hv_new = hv_new # the new actual values
4653
      self.hv_inst = i_hvdict # the new dict (without defaults)
4654
    else:
4655
      self.hv_new = self.hv_inst = {}
4656

    
4657
    # beparams processing
4658
    if self.op.beparams:
4659
      i_bedict = copy.deepcopy(instance.beparams)
4660
      for key, val in self.op.beparams.iteritems():
4661
        if val is None:
4662
          try:
4663
            del i_bedict[key]
4664
          except KeyError:
4665
            pass
4666
        else:
4667
          i_bedict[key] = val
4668
      cluster = self.cfg.GetClusterInfo()
4669
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4670
                                i_bedict)
4671
      self.be_new = be_new # the new actual values
4672
      self.be_inst = i_bedict # the new dict (without defaults)
4673
    else:
4674
      self.hv_new = self.hv_inst = {}
4675

    
4676
    self.warn = []
4677

    
4678
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4679
      mem_check_list = [pnode]
4680
      if be_new[constants.BE_AUTO_BALANCE]:
4681
        # either we changed auto_balance to yes or it was from before
4682
        mem_check_list.extend(instance.secondary_nodes)
4683
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4684
                                                  instance.hypervisor)
4685
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4686
                                         instance.hypervisor)
4687

    
4688
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4689
        # Assume the primary node is unreachable and go ahead
4690
        self.warn.append("Can't get info from primary node %s" % pnode)
4691
      else:
4692
        if instance_info:
4693
          current_mem = instance_info['memory']
4694
        else:
4695
          # Assume instance not running
4696
          # (there is a slight race condition here, but it's not very probable,
4697
          # and we have no other way to check)
4698
          current_mem = 0
4699
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4700
                    nodeinfo[pnode]['memory_free'])
4701
        if miss_mem > 0:
4702
          raise errors.OpPrereqError("This change will prevent the instance"
4703
                                     " from starting, due to %d MB of memory"
4704
                                     " missing on its primary node" % miss_mem)
4705

    
4706
      if be_new[constants.BE_AUTO_BALANCE]:
4707
        for node in instance.secondary_nodes:
4708
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4709
            self.warn.append("Can't get info from secondary node %s" % node)
4710
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4711
            self.warn.append("Not enough memory to failover instance to"
4712
                             " secondary node %s" % node)
4713

    
4714
    return
4715

    
4716
  def Exec(self, feedback_fn):
4717
    """Modifies an instance.
4718

4719
    All parameters take effect only at the next restart of the instance.
4720
    """
4721
    # Process here the warnings from CheckPrereq, as we don't have a
4722
    # feedback_fn there.
4723
    for warn in self.warn:
4724
      feedback_fn("WARNING: %s" % warn)
4725

    
4726
    result = []
4727
    instance = self.instance
4728
    if self.do_ip:
4729
      instance.nics[0].ip = self.ip
4730
      result.append(("ip", self.ip))
4731
    if self.bridge:
4732
      instance.nics[0].bridge = self.bridge
4733
      result.append(("bridge", self.bridge))
4734
    if self.mac:
4735
      instance.nics[0].mac = self.mac
4736
      result.append(("mac", self.mac))
4737
    if self.op.hvparams:
4738
      instance.hvparams = self.hv_new
4739
      for key, val in self.op.hvparams.iteritems():
4740
        result.append(("hv/%s" % key, val))
4741
    if self.op.beparams:
4742
      instance.beparams = self.be_inst
4743
      for key, val in self.op.beparams.iteritems():
4744
        result.append(("be/%s" % key, val))
4745

    
4746
    self.cfg.Update(instance)
4747

    
4748
    return result
4749

    
4750

    
4751
class LUQueryExports(NoHooksLU):
4752
  """Query the exports list
4753

4754
  """
4755
  _OP_REQP = ['nodes']
4756
  REQ_BGL = False
4757

    
4758
  def ExpandNames(self):
4759
    self.needed_locks = {}
4760
    self.share_locks[locking.LEVEL_NODE] = 1
4761
    if not self.op.nodes:
4762
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4763
    else:
4764
      self.needed_locks[locking.LEVEL_NODE] = \
4765
        _GetWantedNodes(self, self.op.nodes)
4766

    
4767
  def CheckPrereq(self):
4768
    """Check prerequisites.
4769

4770
    """
4771
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4772

    
4773
  def Exec(self, feedback_fn):
4774
    """Compute the list of all the exported system images.
4775

4776
    Returns:
4777
      a dictionary with the structure node->(export-list)
4778
      where export-list is a list of the instances exported on
4779
      that node.
4780

4781
    """
4782
    return self.rpc.call_export_list(self.nodes)
4783

    
4784

    
4785
class LUExportInstance(LogicalUnit):
4786
  """Export an instance to an image in the cluster.
4787

4788
  """
4789
  HPATH = "instance-export"
4790
  HTYPE = constants.HTYPE_INSTANCE
4791
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4792
  REQ_BGL = False
4793

    
4794
  def ExpandNames(self):
4795
    self._ExpandAndLockInstance()
4796
    # FIXME: lock only instance primary and destination node
4797
    #
4798
    # Sad but true, for now we have do lock all nodes, as we don't know where
4799
    # the previous export might be, and and in this LU we search for it and
4800
    # remove it from its current node. In the future we could fix this by:
4801
    #  - making a tasklet to search (share-lock all), then create the new one,
4802
    #    then one to remove, after
4803
    #  - removing the removal operation altoghether
4804
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4805

    
4806
  def DeclareLocks(self, level):
4807
    """Last minute lock declaration."""
4808
    # All nodes are locked anyway, so nothing to do here.
4809

    
4810
  def BuildHooksEnv(self):
4811
    """Build hooks env.
4812

4813
    This will run on the master, primary node and target node.
4814

4815
    """
4816
    env = {
4817
      "EXPORT_NODE": self.op.target_node,
4818
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4819
      }
4820
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4821
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4822
          self.op.target_node]
4823
    return env, nl, nl
4824

    
4825
  def CheckPrereq(self):
4826
    """Check prerequisites.
4827

4828
    This checks that the instance and node names are valid.
4829

4830
    """
4831
    instance_name = self.op.instance_name
4832
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4833
    assert self.instance is not None, \
4834
          "Cannot retrieve locked instance %s" % self.op.instance_name
4835

    
4836
    self.dst_node = self.cfg.GetNodeInfo(
4837
      self.cfg.ExpandNodeName(self.op.target_node))
4838

    
4839
    assert self.dst_node is not None, \
4840
          "Cannot retrieve locked node %s" % self.op.target_node
4841

    
4842
    # instance disk type verification
4843
    for disk in self.instance.disks:
4844
      if disk.dev_type == constants.LD_FILE:
4845
        raise errors.OpPrereqError("Export not supported for instances with"
4846
                                   " file-based disks")
4847

    
4848
  def Exec(self, feedback_fn):
4849
    """Export an instance to an image in the cluster.
4850

4851
    """
4852
    instance = self.instance
4853
    dst_node = self.dst_node
4854
    src_node = instance.primary_node
4855
    if self.op.shutdown:
4856
      # shutdown the instance, but not the disks
4857
      if not self.rpc.call_instance_shutdown(src_node, instance):
4858
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4859
                                 (instance.name, src_node))
4860

    
4861
    vgname = self.cfg.GetVGName()
4862

    
4863
    snap_disks = []
4864

    
4865
    try:
4866
      for disk in instance.disks:
4867
        if disk.iv_name == "sda":
4868
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4869
          new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4870

    
4871
          if not new_dev_name:
4872
            logger.Error("could not snapshot block device %s on node %s" %
4873
                         (disk.logical_id[1], src_node))
4874
          else:
4875
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4876
                                      logical_id=(vgname, new_dev_name),
4877
                                      physical_id=(vgname, new_dev_name),
4878
                                      iv_name=disk.iv_name)
4879
            snap_disks.append(new_dev)
4880

    
4881
    finally:
4882
      if self.op.shutdown and instance.status == "up":
4883
        if not self.rpc.call_instance_start(src_node, instance, None):
4884
          _ShutdownInstanceDisks(self, instance)
4885
          raise errors.OpExecError("Could not start instance")
4886

    
4887
    # TODO: check for size
4888

    
4889
    cluster_name = self.cfg.GetClusterName()
4890
    for dev in snap_disks:
4891
      if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4892
                                      instance, cluster_name):
4893
        logger.Error("could not export block device %s from node %s to node %s"
4894
                     % (dev.logical_id[1], src_node, dst_node.name))
4895
      if not self.rpc.call_blockdev_remove(src_node, dev):
4896
        logger.Error("could not remove snapshot block device %s from node %s" %
4897
                     (dev.logical_id[1], src_node))
4898

    
4899
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4900
      logger.Error("could not finalize export for instance %s on node %s" %
4901
                   (instance.name, dst_node.name))
4902

    
4903
    nodelist = self.cfg.GetNodeList()
4904
    nodelist.remove(dst_node.name)
4905

    
4906
    # on one-node clusters nodelist will be empty after the removal
4907
    # if we proceed the backup would be removed because OpQueryExports
4908
    # substitutes an empty list with the full cluster node list.
4909
    if nodelist:
4910
      exportlist = self.rpc.call_export_list(nodelist)
4911
      for node in exportlist:
4912
        if instance.name in exportlist[node]:
4913
          if not self.rpc.call_export_remove(node, instance.name):
4914
            logger.Error("could not remove older export for instance %s"
4915
                         " on node %s" % (instance.name, node))
4916

    
4917

    
4918
class LURemoveExport(NoHooksLU):
4919
  """Remove exports related to the named instance.
4920

4921
  """
4922
  _OP_REQP = ["instance_name"]
4923
  REQ_BGL = False
4924

    
4925
  def ExpandNames(self):
4926
    self.needed_locks = {}
4927
    # We need all nodes to be locked in order for RemoveExport to work, but we
4928
    # don't need to lock the instance itself, as nothing will happen to it (and
4929
    # we can remove exports also for a removed instance)
4930
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4931

    
4932
  def CheckPrereq(self):
4933
    """Check prerequisites.
4934
    """
4935
    pass
4936

    
4937
  def Exec(self, feedback_fn):
4938
    """Remove any export.
4939

4940
    """
4941
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4942
    # If the instance was not found we'll try with the name that was passed in.
4943
    # This will only work if it was an FQDN, though.
4944
    fqdn_warn = False
4945
    if not instance_name:
4946
      fqdn_warn = True
4947
      instance_name = self.op.instance_name
4948

    
4949
    exportlist = self.rpc.call_export_list(self.acquired_locks[
4950
      locking.LEVEL_NODE])
4951
    found = False
4952
    for node in exportlist:
4953
      if instance_name in exportlist[node]:
4954
        found = True
4955
        if not self.rpc.call_export_remove(node, instance_name):
4956
          logger.Error("could not remove export for instance %s"
4957
                       " on node %s" % (instance_name, node))
4958

    
4959
    if fqdn_warn and not found:
4960
      feedback_fn("Export not found. If trying to remove an export belonging"
4961
                  " to a deleted instance please use its Fully Qualified"
4962
                  " Domain Name.")
4963

    
4964

    
4965
class TagsLU(NoHooksLU):
4966
  """Generic tags LU.
4967

4968
  This is an abstract class which is the parent of all the other tags LUs.
4969

4970
  """
4971

    
4972
  def ExpandNames(self):
4973
    self.needed_locks = {}
4974
    if self.op.kind == constants.TAG_NODE:
4975
      name = self.cfg.ExpandNodeName(self.op.name)
4976
      if name is None:
4977
        raise errors.OpPrereqError("Invalid node name (%s)" %
4978
                                   (self.op.name,))
4979
      self.op.name = name
4980
      self.needed_locks[locking.LEVEL_NODE] = name
4981
    elif self.op.kind == constants.TAG_INSTANCE:
4982
      name = self.cfg.ExpandInstanceName(self.op.name)
4983
      if name is None:
4984
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4985
                                   (self.op.name,))
4986
      self.op.name = name
4987
      self.needed_locks[locking.LEVEL_INSTANCE] = name
4988

    
4989
  def CheckPrereq(self):
4990
    """Check prerequisites.
4991

4992
    """
4993
    if self.op.kind == constants.TAG_CLUSTER:
4994
      self.target = self.cfg.GetClusterInfo()
4995
    elif self.op.kind == constants.TAG_NODE:
4996
      self.target = self.cfg.GetNodeInfo(self.op.name)
4997
    elif self.op.kind == constants.TAG_INSTANCE:
4998
      self.target = self.cfg.GetInstanceInfo(self.op.name)
4999
    else:
5000
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5001
                                 str(self.op.kind))
5002

    
5003

    
5004
class LUGetTags(TagsLU):
5005
  """Returns the tags of a given object.
5006

5007
  """
5008
  _OP_REQP = ["kind", "name"]
5009
  REQ_BGL = False
5010

    
5011
  def Exec(self, feedback_fn):
5012
    """Returns the tag list.
5013

5014
    """
5015
    return list(self.target.GetTags())
5016

    
5017

    
5018
class LUSearchTags(NoHooksLU):
5019
  """Searches the tags for a given pattern.
5020

5021
  """
5022
  _OP_REQP = ["pattern"]
5023
  REQ_BGL = False
5024

    
5025
  def ExpandNames(self):
5026
    self.needed_locks = {}
5027

    
5028
  def CheckPrereq(self):
5029
    """Check prerequisites.
5030

5031
    This checks the pattern passed for validity by compiling it.
5032

5033
    """
5034
    try:
5035
      self.re = re.compile(self.op.pattern)
5036
    except re.error, err:
5037
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5038
                                 (self.op.pattern, err))
5039

    
5040
  def Exec(self, feedback_fn):
5041
    """Returns the tag list.
5042

5043
    """
5044
    cfg = self.cfg
5045
    tgts = [("/cluster", cfg.GetClusterInfo())]
5046
    ilist = cfg.GetAllInstancesInfo().values()
5047
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5048
    nlist = cfg.GetAllNodesInfo().values()
5049
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5050
    results = []
5051
    for path, target in tgts:
5052
      for tag in target.GetTags():
5053
        if self.re.search(tag):
5054
          results.append((path, tag))
5055
    return results
5056

    
5057

    
5058
class LUAddTags(TagsLU):
5059
  """Sets a tag on a given object.
5060

5061
  """
5062
  _OP_REQP = ["kind", "name", "tags"]
5063
  REQ_BGL = False
5064

    
5065
  def CheckPrereq(self):
5066
    """Check prerequisites.
5067

5068
    This checks the type and length of the tag name and value.
5069

5070
    """
5071
    TagsLU.CheckPrereq(self)
5072
    for tag in self.op.tags:
5073
      objects.TaggableObject.ValidateTag(tag)
5074

    
5075
  def Exec(self, feedback_fn):
5076
    """Sets the tag.
5077

5078
    """
5079
    try:
5080
      for tag in self.op.tags:
5081
        self.target.AddTag(tag)
5082
    except errors.TagError, err:
5083
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5084
    try:
5085
      self.cfg.Update(self.target)
5086
    except errors.ConfigurationError:
5087
      raise errors.OpRetryError("There has been a modification to the"
5088
                                " config file and the operation has been"
5089
                                " aborted. Please retry.")
5090

    
5091

    
5092
class LUDelTags(TagsLU):
5093
  """Delete a list of tags from a given object.
5094

5095
  """
5096
  _OP_REQP = ["kind", "name", "tags"]
5097
  REQ_BGL = False
5098

    
5099
  def CheckPrereq(self):
5100
    """Check prerequisites.
5101

5102
    This checks that we have the given tag.
5103

5104
    """
5105
    TagsLU.CheckPrereq(self)
5106
    for tag in self.op.tags:
5107
      objects.TaggableObject.ValidateTag(tag)
5108
    del_tags = frozenset(self.op.tags)
5109
    cur_tags = self.target.GetTags()
5110
    if not del_tags <= cur_tags:
5111
      diff_tags = del_tags - cur_tags
5112
      diff_names = ["'%s'" % tag for tag in diff_tags]
5113
      diff_names.sort()
5114
      raise errors.OpPrereqError("Tag(s) %s not found" %
5115
                                 (",".join(diff_names)))
5116

    
5117
  def Exec(self, feedback_fn):
5118
    """Remove the tag from the object.
5119

5120
    """
5121
    for tag in self.op.tags:
5122
      self.target.RemoveTag(tag)
5123
    try:
5124
      self.cfg.Update(self.target)
5125
    except errors.ConfigurationError:
5126
      raise errors.OpRetryError("There has been a modification to the"
5127
                                " config file and the operation has been"
5128
                                " aborted. Please retry.")
5129

    
5130

    
5131
class LUTestDelay(NoHooksLU):
5132
  """Sleep for a specified amount of time.
5133

5134
  This LU sleeps on the master and/or nodes for a specified amount of
5135
  time.
5136

5137
  """
5138
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5139
  REQ_BGL = False
5140

    
5141
  def ExpandNames(self):
5142
    """Expand names and set required locks.
5143

5144
    This expands the node list, if any.
5145

5146
    """
5147
    self.needed_locks = {}
5148
    if self.op.on_nodes:
5149
      # _GetWantedNodes can be used here, but is not always appropriate to use
5150
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5151
      # more information.
5152
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5153
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5154

    
5155
  def CheckPrereq(self):
5156
    """Check prerequisites.
5157

5158
    """
5159

    
5160
  def Exec(self, feedback_fn):
5161
    """Do the actual sleep.
5162

5163
    """
5164
    if self.op.on_master:
5165
      if not utils.TestDelay(self.op.duration):
5166
        raise errors.OpExecError("Error during master delay test")
5167
    if self.op.on_nodes:
5168
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5169
      if not result:
5170
        raise errors.OpExecError("Complete failure from rpc call")
5171
      for node, node_result in result.items():
5172
        if not node_result:
5173
          raise errors.OpExecError("Failure during rpc call to node %s,"
5174
                                   " result: %s" % (node, node_result))
5175

    
5176

    
5177
class IAllocator(object):
5178
  """IAllocator framework.
5179

5180
  An IAllocator instance has three sets of attributes:
5181
    - cfg that is needed to query the cluster
5182
    - input data (all members of the _KEYS class attribute are required)
5183
    - four buffer attributes (in|out_data|text), that represent the
5184
      input (to the external script) in text and data structure format,
5185
      and the output from it, again in two formats
5186
    - the result variables from the script (success, info, nodes) for
5187
      easy usage
5188

5189
  """
5190
  _ALLO_KEYS = [
5191
    "mem_size", "disks", "disk_template",
5192
    "os", "tags", "nics", "vcpus",
5193
    ]
5194
  _RELO_KEYS = [
5195
    "relocate_from",
5196
    ]
5197

    
5198
  def __init__(self, lu, mode, name, **kwargs):
5199
    self.lu = lu
5200
    # init buffer variables
5201
    self.in_text = self.out_text = self.in_data = self.out_data = None
5202
    # init all input fields so that pylint is happy
5203
    self.mode = mode
5204
    self.name = name
5205
    self.mem_size = self.disks = self.disk_template = None
5206
    self.os = self.tags = self.nics = self.vcpus = None
5207
    self.relocate_from = None
5208
    # computed fields
5209
    self.required_nodes = None
5210
    # init result fields
5211
    self.success = self.info = self.nodes = None
5212
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5213
      keyset = self._ALLO_KEYS
5214
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5215
      keyset = self._RELO_KEYS
5216
    else:
5217
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5218
                                   " IAllocator" % self.mode)
5219
    for key in kwargs:
5220
      if key not in keyset:
5221
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5222
                                     " IAllocator" % key)
5223
      setattr(self, key, kwargs[key])
5224
    for key in keyset:
5225
      if key not in kwargs:
5226
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5227
                                     " IAllocator" % key)
5228
    self._BuildInputData()
5229

    
5230
  def _ComputeClusterData(self):
5231
    """Compute the generic allocator input data.
5232

5233
    This is the data that is independent of the actual operation.
5234

5235
    """
5236
    cfg = self.lu.cfg
5237
    cluster_info = cfg.GetClusterInfo()
5238
    # cluster data
5239
    data = {
5240
      "version": 1,
5241
      "cluster_name": cfg.GetClusterName(),
5242
      "cluster_tags": list(cluster_info.GetTags()),
5243
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5244
      # we don't have job IDs
5245
      }
5246

    
5247
    i_list = []
5248
    cluster = self.cfg.GetClusterInfo()
5249
    for iname in cfg.GetInstanceList():
5250
      i_obj = cfg.GetInstanceInfo(iname)
5251
      i_list.append((i_obj, cluster.FillBE(i_obj)))
5252

    
5253
    # node data
5254
    node_results = {}
5255
    node_list = cfg.GetNodeList()
5256
    # FIXME: here we have only one hypervisor information, but
5257
    # instance can belong to different hypervisors
5258
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5259
                                           cfg.GetHypervisorType())
5260
    for nname in node_list:
5261
      ninfo = cfg.GetNodeInfo(nname)
5262
      if nname not in node_data or not isinstance(node_data[nname], dict):
5263
        raise errors.OpExecError("Can't get data for node %s" % nname)
5264
      remote_info = node_data[nname]
5265
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5266
                   'vg_size', 'vg_free', 'cpu_total']:
5267
        if attr not in remote_info:
5268
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5269
                                   (nname, attr))
5270
        try:
5271
          remote_info[attr] = int(remote_info[attr])
5272
        except ValueError, err:
5273
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5274
                                   " %s" % (nname, attr, str(err)))
5275
      # compute memory used by primary instances
5276
      i_p_mem = i_p_up_mem = 0
5277
      for iinfo, beinfo in i_list:
5278
        if iinfo.primary_node == nname:
5279
          i_p_mem += beinfo[constants.BE_MEMORY]
5280
          if iinfo.status == "up":
5281
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5282

    
5283
      # compute memory used by instances
5284
      pnr = {
5285
        "tags": list(ninfo.GetTags()),
5286
        "total_memory": remote_info['memory_total'],
5287
        "reserved_memory": remote_info['memory_dom0'],
5288
        "free_memory": remote_info['memory_free'],
5289
        "i_pri_memory": i_p_mem,
5290
        "i_pri_up_memory": i_p_up_mem,
5291
        "total_disk": remote_info['vg_size'],
5292
        "free_disk": remote_info['vg_free'],
5293
        "primary_ip": ninfo.primary_ip,
5294
        "secondary_ip": ninfo.secondary_ip,
5295
        "total_cpus": remote_info['cpu_total'],
5296
        }
5297
      node_results[nname] = pnr
5298
    data["nodes"] = node_results
5299

    
5300
    # instance data
5301
    instance_data = {}
5302
    for iinfo, beinfo in i_list:
5303
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5304
                  for n in iinfo.nics]
5305
      pir = {
5306
        "tags": list(iinfo.GetTags()),
5307
        "should_run": iinfo.status == "up",
5308
        "vcpus": beinfo[constants.BE_VCPUS],
5309
        "memory": beinfo[constants.BE_MEMORY],
5310
        "os": iinfo.os,
5311
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5312
        "nics": nic_data,
5313
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5314
        "disk_template": iinfo.disk_template,
5315
        "hypervisor": iinfo.hypervisor,
5316
        }
5317
      instance_data[iinfo.name] = pir
5318

    
5319
    data["instances"] = instance_data
5320

    
5321
    self.in_data = data
5322

    
5323
  def _AddNewInstance(self):
5324
    """Add new instance data to allocator structure.
5325

5326
    This in combination with _AllocatorGetClusterData will create the
5327
    correct structure needed as input for the allocator.
5328

5329
    The checks for the completeness of the opcode must have already been
5330
    done.
5331

5332
    """
5333
    data = self.in_data
5334
    if len(self.disks) != 2:
5335
      raise errors.OpExecError("Only two-disk configurations supported")
5336

    
5337
    disk_space = _ComputeDiskSize(self.disk_template,
5338
                                  self.disks[0]["size"], self.disks[1]["size"])
5339

    
5340
    if self.disk_template in constants.DTS_NET_MIRROR:
5341
      self.required_nodes = 2
5342
    else:
5343
      self.required_nodes = 1
5344
    request = {
5345
      "type": "allocate",
5346
      "name": self.name,
5347
      "disk_template": self.disk_template,
5348
      "tags": self.tags,
5349
      "os": self.os,
5350
      "vcpus": self.vcpus,
5351
      "memory": self.mem_size,
5352
      "disks": self.disks,
5353
      "disk_space_total": disk_space,
5354
      "nics": self.nics,
5355
      "required_nodes": self.required_nodes,
5356
      }
5357
    data["request"] = request
5358

    
5359
  def _AddRelocateInstance(self):
5360
    """Add relocate instance data to allocator structure.
5361

5362
    This in combination with _IAllocatorGetClusterData will create the
5363
    correct structure needed as input for the allocator.
5364

5365
    The checks for the completeness of the opcode must have already been
5366
    done.
5367

5368
    """
5369
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5370
    if instance is None:
5371
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5372
                                   " IAllocator" % self.name)
5373

    
5374
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5375
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5376

    
5377
    if len(instance.secondary_nodes) != 1:
5378
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5379

    
5380
    self.required_nodes = 1
5381

    
5382
    disk_space = _ComputeDiskSize(instance.disk_template,
5383
                                  instance.disks[0].size,
5384
                                  instance.disks[1].size)
5385

    
5386
    request = {
5387
      "type": "relocate",
5388
      "name": self.name,
5389
      "disk_space_total": disk_space,
5390
      "required_nodes": self.required_nodes,
5391
      "relocate_from": self.relocate_from,
5392
      }
5393
    self.in_data["request"] = request
5394

    
5395
  def _BuildInputData(self):
5396
    """Build input data structures.
5397

5398
    """
5399
    self._ComputeClusterData()
5400

    
5401
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5402
      self._AddNewInstance()
5403
    else:
5404
      self._AddRelocateInstance()
5405

    
5406
    self.in_text = serializer.Dump(self.in_data)
5407

    
5408
  def Run(self, name, validate=True, call_fn=None):
5409
    """Run an instance allocator and return the results.
5410

5411
    """
5412
    if call_fn is None:
5413
      call_fn = self.lu.rpc.call_iallocator_runner
5414
    data = self.in_text
5415

    
5416
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5417

    
5418
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5419
      raise errors.OpExecError("Invalid result from master iallocator runner")
5420

    
5421
    rcode, stdout, stderr, fail = result
5422

    
5423
    if rcode == constants.IARUN_NOTFOUND:
5424
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5425
    elif rcode == constants.IARUN_FAILURE:
5426
      raise errors.OpExecError("Instance allocator call failed: %s,"
5427
                               " output: %s" % (fail, stdout+stderr))
5428
    self.out_text = stdout
5429
    if validate:
5430
      self._ValidateResult()
5431

    
5432
  def _ValidateResult(self):
5433
    """Process the allocator results.
5434

5435
    This will process and if successful save the result in
5436
    self.out_data and the other parameters.
5437

5438
    """
5439
    try:
5440
      rdict = serializer.Load(self.out_text)
5441
    except Exception, err:
5442
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5443

    
5444
    if not isinstance(rdict, dict):
5445
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5446

    
5447
    for key in "success", "info", "nodes":
5448
      if key not in rdict:
5449
        raise errors.OpExecError("Can't parse iallocator results:"
5450
                                 " missing key '%s'" % key)
5451
      setattr(self, key, rdict[key])
5452

    
5453
    if not isinstance(rdict["nodes"], list):
5454
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5455
                               " is not a list")
5456
    self.out_data = rdict
5457

    
5458

    
5459
class LUTestAllocator(NoHooksLU):
5460
  """Run allocator tests.
5461

5462
  This LU runs the allocator tests
5463

5464
  """
5465
  _OP_REQP = ["direction", "mode", "name"]
5466

    
5467
  def CheckPrereq(self):
5468
    """Check prerequisites.
5469

5470
    This checks the opcode parameters depending on the director and mode test.
5471

5472
    """
5473
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5474
      for attr in ["name", "mem_size", "disks", "disk_template",
5475
                   "os", "tags", "nics", "vcpus"]:
5476
        if not hasattr(self.op, attr):
5477
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5478
                                     attr)
5479
      iname = self.cfg.ExpandInstanceName(self.op.name)
5480
      if iname is not None:
5481
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5482
                                   iname)
5483
      if not isinstance(self.op.nics, list):
5484
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5485
      for row in self.op.nics:
5486
        if (not isinstance(row, dict) or
5487
            "mac" not in row or
5488
            "ip" not in row or
5489
            "bridge" not in row):
5490
          raise errors.OpPrereqError("Invalid contents of the"
5491
                                     " 'nics' parameter")
5492
      if not isinstance(self.op.disks, list):
5493
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5494
      if len(self.op.disks) != 2:
5495
        raise errors.OpPrereqError("Only two-disk configurations supported")
5496
      for row in self.op.disks:
5497
        if (not isinstance(row, dict) or
5498
            "size" not in row or
5499
            not isinstance(row["size"], int) or
5500
            "mode" not in row or
5501
            row["mode"] not in ['r', 'w']):
5502
          raise errors.OpPrereqError("Invalid contents of the"
5503
                                     " 'disks' parameter")
5504
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5505
      if not hasattr(self.op, "name"):
5506
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5507
      fname = self.cfg.ExpandInstanceName(self.op.name)
5508
      if fname is None:
5509
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5510
                                   self.op.name)
5511
      self.op.name = fname
5512
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5513
    else:
5514
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5515
                                 self.op.mode)
5516

    
5517
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5518
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5519
        raise errors.OpPrereqError("Missing allocator name")
5520
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5521
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5522
                                 self.op.direction)
5523

    
5524
  def Exec(self, feedback_fn):
5525
    """Run the allocator test.
5526

5527
    """
5528
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5529
      ial = IAllocator(self,
5530
                       mode=self.op.mode,
5531
                       name=self.op.name,
5532
                       mem_size=self.op.mem_size,
5533
                       disks=self.op.disks,
5534
                       disk_template=self.op.disk_template,
5535
                       os=self.op.os,
5536
                       tags=self.op.tags,
5537
                       nics=self.op.nics,
5538
                       vcpus=self.op.vcpus,
5539
                       )
5540
    else:
5541
      ial = IAllocator(self,
5542
                       mode=self.op.mode,
5543
                       name=self.op.name,
5544
                       relocate_from=list(self.relocate_from),
5545
                       )
5546

    
5547
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5548
      result = ial.in_text
5549
    else:
5550
      ial.Run(self.op.allocator, validate=False)
5551
      result = ial.out_text
5552
    return result