Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 6785674e

History | View | Annotate | Download (189.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34

    
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_MASTER = True
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90

    
91
    for attr_name in self._OP_REQP:
92
      attr_val = getattr(op, attr_name, None)
93
      if attr_val is None:
94
        raise errors.OpPrereqError("Required parameter '%s' missing" %
95
                                   attr_name)
96

    
97
    if not self.cfg.IsCluster():
98
      raise errors.OpPrereqError("Cluster not initialized yet,"
99
                                 " use 'gnt-cluster init' first.")
100
    if self.REQ_MASTER:
101
      master = self.cfg.GetMasterNode()
102
      if master != utils.HostInfo().name:
103
        raise errors.OpPrereqError("Commands must be run on the master"
104
                                   " node %s" % master)
105

    
106
  def __GetSSH(self):
107
    """Returns the SshRunner object
108

109
    """
110
    if not self.__ssh:
111
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
112
    return self.__ssh
113

    
114
  ssh = property(fget=__GetSSH)
115

    
116
  def ExpandNames(self):
117
    """Expand names for this LU.
118

119
    This method is called before starting to execute the opcode, and it should
120
    update all the parameters of the opcode to their canonical form (e.g. a
121
    short node name must be fully expanded after this method has successfully
122
    completed). This way locking, hooks, logging, ecc. can work correctly.
123

124
    LUs which implement this method must also populate the self.needed_locks
125
    member, as a dict with lock levels as keys, and a list of needed lock names
126
    as values. Rules:
127
      - Use an empty dict if you don't need any lock
128
      - If you don't need any lock at a particular level omit that level
129
      - Don't put anything for the BGL level
130
      - If you want all locks at a level use locking.ALL_SET as a value
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: locking.ALL_SET,
140
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265
  def _LockInstancesNodes(self, primary_only=False):
266
    """Helper function to declare instances' nodes for locking.
267

268
    This function should be called after locking one or more instances to lock
269
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270
    with all primary or secondary nodes for instances already locked and
271
    present in self.needed_locks[locking.LEVEL_INSTANCE].
272

273
    It should be called from DeclareLocks, and for safety only works if
274
    self.recalculate_locks[locking.LEVEL_NODE] is set.
275

276
    In the future it may grow parameters to just lock some instance's nodes, or
277
    to just lock primaries or secondary nodes, if needed.
278

279
    If should be called in DeclareLocks in a way similar to:
280

281
    if level == locking.LEVEL_NODE:
282
      self._LockInstancesNodes()
283

284
    @type primary_only: boolean
285
    @param primary_only: only lock primary nodes of locked instances
286

287
    """
288
    assert locking.LEVEL_NODE in self.recalculate_locks, \
289
      "_LockInstancesNodes helper function called with no nodes to recalculate"
290

    
291
    # TODO: check if we're really been called with the instance locks held
292

    
293
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294
    # future we might want to have different behaviors depending on the value
295
    # of self.recalculate_locks[locking.LEVEL_NODE]
296
    wanted_nodes = []
297
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298
      instance = self.context.cfg.GetInstanceInfo(instance_name)
299
      wanted_nodes.append(instance.primary_node)
300
      if not primary_only:
301
        wanted_nodes.extend(instance.secondary_nodes)
302

    
303
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
307

    
308
    del self.recalculate_locks[locking.LEVEL_NODE]
309

    
310

    
311
class NoHooksLU(LogicalUnit):
312
  """Simple LU which runs no hooks.
313

314
  This LU is intended as a parent for other LogicalUnits which will
315
  run no hooks, in order to reduce duplicate code.
316

317
  """
318
  HPATH = None
319
  HTYPE = None
320

    
321

    
322
def _GetWantedNodes(lu, nodes):
323
  """Returns list of checked and expanded node names.
324

325
  Args:
326
    nodes: List of nodes (strings) or None for all
327

328
  """
329
  if not isinstance(nodes, list):
330
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
331

    
332
  if not nodes:
333
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334
      " non-empty list of nodes whose name is to be expanded.")
335

    
336
  wanted = []
337
  for name in nodes:
338
    node = lu.cfg.ExpandNodeName(name)
339
    if node is None:
340
      raise errors.OpPrereqError("No such node name '%s'" % name)
341
    wanted.append(node)
342

    
343
  return utils.NiceSort(wanted)
344

    
345

    
346
def _GetWantedInstances(lu, instances):
347
  """Returns list of checked and expanded instance names.
348

349
  Args:
350
    instances: List of instances (strings) or None for all
351

352
  """
353
  if not isinstance(instances, list):
354
    raise errors.OpPrereqError("Invalid argument type 'instances'")
355

    
356
  if instances:
357
    wanted = []
358

    
359
    for name in instances:
360
      instance = lu.cfg.ExpandInstanceName(name)
361
      if instance is None:
362
        raise errors.OpPrereqError("No such instance name '%s'" % name)
363
      wanted.append(instance)
364

    
365
  else:
366
    wanted = lu.cfg.GetInstanceList()
367
  return utils.NiceSort(wanted)
368

    
369

    
370
def _CheckOutputFields(static, dynamic, selected):
371
  """Checks whether all selected fields are valid.
372

373
  Args:
374
    static: Static fields
375
    dynamic: Dynamic fields
376

377
  """
378
  static_fields = frozenset(static)
379
  dynamic_fields = frozenset(dynamic)
380

    
381
  all_fields = static_fields | dynamic_fields
382

    
383
  if not all_fields.issuperset(selected):
384
    raise errors.OpPrereqError("Unknown output fields selected: %s"
385
                               % ",".join(frozenset(selected).
386
                                          difference(all_fields)))
387

    
388

    
389
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390
                          memory, vcpus, nics):
391
  """Builds instance related env variables for hooks from single variables.
392

393
  Args:
394
    secondary_nodes: List of secondary nodes as strings
395
  """
396
  env = {
397
    "OP_TARGET": name,
398
    "INSTANCE_NAME": name,
399
    "INSTANCE_PRIMARY": primary_node,
400
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401
    "INSTANCE_OS_TYPE": os_type,
402
    "INSTANCE_STATUS": status,
403
    "INSTANCE_MEMORY": memory,
404
    "INSTANCE_VCPUS": vcpus,
405
  }
406

    
407
  if nics:
408
    nic_count = len(nics)
409
    for idx, (ip, bridge, mac) in enumerate(nics):
410
      if ip is None:
411
        ip = ""
412
      env["INSTANCE_NIC%d_IP" % idx] = ip
413
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
415
  else:
416
    nic_count = 0
417

    
418
  env["INSTANCE_NIC_COUNT"] = nic_count
419

    
420
  return env
421

    
422

    
423
def _BuildInstanceHookEnvByObject(instance, override=None):
424
  """Builds instance related env variables for hooks from an object.
425

426
  Args:
427
    instance: objects.Instance object of instance
428
    override: dict of values to override
429
  """
430
  args = {
431
    'name': instance.name,
432
    'primary_node': instance.primary_node,
433
    'secondary_nodes': instance.secondary_nodes,
434
    'os_type': instance.os,
435
    'status': instance.os,
436
    'memory': instance.memory,
437
    'vcpus': instance.vcpus,
438
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
439
  }
440
  if override:
441
    args.update(override)
442
  return _BuildInstanceHookEnv(**args)
443

    
444

    
445
def _CheckInstanceBridgesExist(lu, instance):
446
  """Check that the brigdes needed by an instance exist.
447

448
  """
449
  # check bridges existance
450
  brlist = [nic.bridge for nic in instance.nics]
451
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
452
    raise errors.OpPrereqError("one or more target bridges %s does not"
453
                               " exist on destination node '%s'" %
454
                               (brlist, instance.primary_node))
455

    
456

    
457
class LUDestroyCluster(NoHooksLU):
458
  """Logical unit for destroying the cluster.
459

460
  """
461
  _OP_REQP = []
462

    
463
  def CheckPrereq(self):
464
    """Check prerequisites.
465

466
    This checks whether the cluster is empty.
467

468
    Any errors are signalled by raising errors.OpPrereqError.
469

470
    """
471
    master = self.cfg.GetMasterNode()
472

    
473
    nodelist = self.cfg.GetNodeList()
474
    if len(nodelist) != 1 or nodelist[0] != master:
475
      raise errors.OpPrereqError("There are still %d node(s) in"
476
                                 " this cluster." % (len(nodelist) - 1))
477
    instancelist = self.cfg.GetInstanceList()
478
    if instancelist:
479
      raise errors.OpPrereqError("There are still %d instance(s) in"
480
                                 " this cluster." % len(instancelist))
481

    
482
  def Exec(self, feedback_fn):
483
    """Destroys the cluster.
484

485
    """
486
    master = self.cfg.GetMasterNode()
487
    if not self.rpc.call_node_stop_master(master, False):
488
      raise errors.OpExecError("Could not disable the master role")
489
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
490
    utils.CreateBackup(priv_key)
491
    utils.CreateBackup(pub_key)
492
    return master
493

    
494

    
495
class LUVerifyCluster(LogicalUnit):
496
  """Verifies the cluster status.
497

498
  """
499
  HPATH = "cluster-verify"
500
  HTYPE = constants.HTYPE_CLUSTER
501
  _OP_REQP = ["skip_checks"]
502
  REQ_BGL = False
503

    
504
  def ExpandNames(self):
505
    self.needed_locks = {
506
      locking.LEVEL_NODE: locking.ALL_SET,
507
      locking.LEVEL_INSTANCE: locking.ALL_SET,
508
    }
509
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
510

    
511
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
512
                  remote_version, feedback_fn):
513
    """Run multiple tests against a node.
514

515
    Test list:
516
      - compares ganeti version
517
      - checks vg existance and size > 20G
518
      - checks config file checksum
519
      - checks ssh to other nodes
520

521
    Args:
522
      node: name of the node to check
523
      file_list: required list of files
524
      local_cksum: dictionary of local files and their checksums
525

526
    """
527
    # compares ganeti version
528
    local_version = constants.PROTOCOL_VERSION
529
    if not remote_version:
530
      feedback_fn("  - ERROR: connection to %s failed" % (node))
531
      return True
532

    
533
    if local_version != remote_version:
534
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
535
                      (local_version, node, remote_version))
536
      return True
537

    
538
    # checks vg existance and size > 20G
539

    
540
    bad = False
541
    if not vglist:
542
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
543
                      (node,))
544
      bad = True
545
    else:
546
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
547
                                            constants.MIN_VG_SIZE)
548
      if vgstatus:
549
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
550
        bad = True
551

    
552
    if not node_result:
553
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
554
      return True
555

    
556
    # checks config file checksum
557
    # checks ssh to any
558

    
559
    if 'filelist' not in node_result:
560
      bad = True
561
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
562
    else:
563
      remote_cksum = node_result['filelist']
564
      for file_name in file_list:
565
        if file_name not in remote_cksum:
566
          bad = True
567
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
568
        elif remote_cksum[file_name] != local_cksum[file_name]:
569
          bad = True
570
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
571

    
572
    if 'nodelist' not in node_result:
573
      bad = True
574
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
575
    else:
576
      if node_result['nodelist']:
577
        bad = True
578
        for node in node_result['nodelist']:
579
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
580
                          (node, node_result['nodelist'][node]))
581
    if 'node-net-test' not in node_result:
582
      bad = True
583
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
584
    else:
585
      if node_result['node-net-test']:
586
        bad = True
587
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
588
        for node in nlist:
589
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
590
                          (node, node_result['node-net-test'][node]))
591

    
592
    hyp_result = node_result.get('hypervisor', None)
593
    if isinstance(hyp_result, dict):
594
      for hv_name, hv_result in hyp_result.iteritems():
595
        if hv_result is not None:
596
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
597
                      (hv_name, hv_result))
598
    return bad
599

    
600
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
601
                      node_instance, feedback_fn):
602
    """Verify an instance.
603

604
    This function checks to see if the required block devices are
605
    available on the instance's node.
606

607
    """
608
    bad = False
609

    
610
    node_current = instanceconfig.primary_node
611

    
612
    node_vol_should = {}
613
    instanceconfig.MapLVsByNode(node_vol_should)
614

    
615
    for node in node_vol_should:
616
      for volume in node_vol_should[node]:
617
        if node not in node_vol_is or volume not in node_vol_is[node]:
618
          feedback_fn("  - ERROR: volume %s missing on node %s" %
619
                          (volume, node))
620
          bad = True
621

    
622
    if not instanceconfig.status == 'down':
623
      if (node_current not in node_instance or
624
          not instance in node_instance[node_current]):
625
        feedback_fn("  - ERROR: instance %s not running on node %s" %
626
                        (instance, node_current))
627
        bad = True
628

    
629
    for node in node_instance:
630
      if (not node == node_current):
631
        if instance in node_instance[node]:
632
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
633
                          (instance, node))
634
          bad = True
635

    
636
    return bad
637

    
638
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
639
    """Verify if there are any unknown volumes in the cluster.
640

641
    The .os, .swap and backup volumes are ignored. All other volumes are
642
    reported as unknown.
643

644
    """
645
    bad = False
646

    
647
    for node in node_vol_is:
648
      for volume in node_vol_is[node]:
649
        if node not in node_vol_should or volume not in node_vol_should[node]:
650
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
651
                      (volume, node))
652
          bad = True
653
    return bad
654

    
655
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
656
    """Verify the list of running instances.
657

658
    This checks what instances are running but unknown to the cluster.
659

660
    """
661
    bad = False
662
    for node in node_instance:
663
      for runninginstance in node_instance[node]:
664
        if runninginstance not in instancelist:
665
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
666
                          (runninginstance, node))
667
          bad = True
668
    return bad
669

    
670
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
671
    """Verify N+1 Memory Resilience.
672

673
    Check that if one single node dies we can still start all the instances it
674
    was primary for.
675

676
    """
677
    bad = False
678

    
679
    for node, nodeinfo in node_info.iteritems():
680
      # This code checks that every node which is now listed as secondary has
681
      # enough memory to host all instances it is supposed to should a single
682
      # other node in the cluster fail.
683
      # FIXME: not ready for failover to an arbitrary node
684
      # FIXME: does not support file-backed instances
685
      # WARNING: we currently take into account down instances as well as up
686
      # ones, considering that even if they're down someone might want to start
687
      # them even in the event of a node failure.
688
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
689
        needed_mem = 0
690
        for instance in instances:
691
          needed_mem += instance_cfg[instance].memory
692
        if nodeinfo['mfree'] < needed_mem:
693
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
694
                      " failovers should node %s fail" % (node, prinode))
695
          bad = True
696
    return bad
697

    
698
  def CheckPrereq(self):
699
    """Check prerequisites.
700

701
    Transform the list of checks we're going to skip into a set and check that
702
    all its members are valid.
703

704
    """
705
    self.skip_set = frozenset(self.op.skip_checks)
706
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
707
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
708

    
709
  def BuildHooksEnv(self):
710
    """Build hooks env.
711

712
    Cluster-Verify hooks just rone in the post phase and their failure makes
713
    the output be logged in the verify output and the verification to fail.
714

715
    """
716
    all_nodes = self.cfg.GetNodeList()
717
    # TODO: populate the environment with useful information for verify hooks
718
    env = {}
719
    return env, [], all_nodes
720

    
721
  def Exec(self, feedback_fn):
722
    """Verify integrity of cluster, performing various test on nodes.
723

724
    """
725
    bad = False
726
    feedback_fn("* Verifying global settings")
727
    for msg in self.cfg.VerifyConfig():
728
      feedback_fn("  - ERROR: %s" % msg)
729

    
730
    vg_name = self.cfg.GetVGName()
731
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
732
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
733
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
734
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
735
    i_non_redundant = [] # Non redundant instances
736
    node_volume = {}
737
    node_instance = {}
738
    node_info = {}
739
    instance_cfg = {}
740

    
741
    # FIXME: verify OS list
742
    # do local checksums
743
    file_names = []
744
    file_names.append(constants.SSL_CERT_FILE)
745
    file_names.append(constants.CLUSTER_CONF_FILE)
746
    local_checksums = utils.FingerprintFiles(file_names)
747

    
748
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
749
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
750
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
751
    all_vglist = self.rpc.call_vg_list(nodelist)
752
    node_verify_param = {
753
      'filelist': file_names,
754
      'nodelist': nodelist,
755
      'hypervisor': hypervisors,
756
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
757
                        for node in nodeinfo]
758
      }
759
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
760
                                           self.cfg.GetClusterName())
761
    all_rversion = self.rpc.call_version(nodelist)
762
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
763
                                        self.cfg.GetHypervisorType())
764

    
765
    for node in nodelist:
766
      feedback_fn("* Verifying node %s" % node)
767
      result = self._VerifyNode(node, file_names, local_checksums,
768
                                all_vglist[node], all_nvinfo[node],
769
                                all_rversion[node], feedback_fn)
770
      bad = bad or result
771

    
772
      # node_volume
773
      volumeinfo = all_volumeinfo[node]
774

    
775
      if isinstance(volumeinfo, basestring):
776
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
777
                    (node, volumeinfo[-400:].encode('string_escape')))
778
        bad = True
779
        node_volume[node] = {}
780
      elif not isinstance(volumeinfo, dict):
781
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
782
        bad = True
783
        continue
784
      else:
785
        node_volume[node] = volumeinfo
786

    
787
      # node_instance
788
      nodeinstance = all_instanceinfo[node]
789
      if type(nodeinstance) != list:
790
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
791
        bad = True
792
        continue
793

    
794
      node_instance[node] = nodeinstance
795

    
796
      # node_info
797
      nodeinfo = all_ninfo[node]
798
      if not isinstance(nodeinfo, dict):
799
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
800
        bad = True
801
        continue
802

    
803
      try:
804
        node_info[node] = {
805
          "mfree": int(nodeinfo['memory_free']),
806
          "dfree": int(nodeinfo['vg_free']),
807
          "pinst": [],
808
          "sinst": [],
809
          # dictionary holding all instances this node is secondary for,
810
          # grouped by their primary node. Each key is a cluster node, and each
811
          # value is a list of instances which have the key as primary and the
812
          # current node as secondary.  this is handy to calculate N+1 memory
813
          # availability if you can only failover from a primary to its
814
          # secondary.
815
          "sinst-by-pnode": {},
816
        }
817
      except ValueError:
818
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
819
        bad = True
820
        continue
821

    
822
    node_vol_should = {}
823

    
824
    for instance in instancelist:
825
      feedback_fn("* Verifying instance %s" % instance)
826
      inst_config = self.cfg.GetInstanceInfo(instance)
827
      result =  self._VerifyInstance(instance, inst_config, node_volume,
828
                                     node_instance, feedback_fn)
829
      bad = bad or result
830

    
831
      inst_config.MapLVsByNode(node_vol_should)
832

    
833
      instance_cfg[instance] = inst_config
834

    
835
      pnode = inst_config.primary_node
836
      if pnode in node_info:
837
        node_info[pnode]['pinst'].append(instance)
838
      else:
839
        feedback_fn("  - ERROR: instance %s, connection to primary node"
840
                    " %s failed" % (instance, pnode))
841
        bad = True
842

    
843
      # If the instance is non-redundant we cannot survive losing its primary
844
      # node, so we are not N+1 compliant. On the other hand we have no disk
845
      # templates with more than one secondary so that situation is not well
846
      # supported either.
847
      # FIXME: does not support file-backed instances
848
      if len(inst_config.secondary_nodes) == 0:
849
        i_non_redundant.append(instance)
850
      elif len(inst_config.secondary_nodes) > 1:
851
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
852
                    % instance)
853

    
854
      for snode in inst_config.secondary_nodes:
855
        if snode in node_info:
856
          node_info[snode]['sinst'].append(instance)
857
          if pnode not in node_info[snode]['sinst-by-pnode']:
858
            node_info[snode]['sinst-by-pnode'][pnode] = []
859
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
860
        else:
861
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
862
                      " %s failed" % (instance, snode))
863

    
864
    feedback_fn("* Verifying orphan volumes")
865
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
866
                                       feedback_fn)
867
    bad = bad or result
868

    
869
    feedback_fn("* Verifying remaining instances")
870
    result = self._VerifyOrphanInstances(instancelist, node_instance,
871
                                         feedback_fn)
872
    bad = bad or result
873

    
874
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
875
      feedback_fn("* Verifying N+1 Memory redundancy")
876
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
877
      bad = bad or result
878

    
879
    feedback_fn("* Other Notes")
880
    if i_non_redundant:
881
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
882
                  % len(i_non_redundant))
883

    
884
    return not bad
885

    
886
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
887
    """Analize the post-hooks' result, handle it, and send some
888
    nicely-formatted feedback back to the user.
889

890
    Args:
891
      phase: the hooks phase that has just been run
892
      hooks_results: the results of the multi-node hooks rpc call
893
      feedback_fn: function to send feedback back to the caller
894
      lu_result: previous Exec result
895

896
    """
897
    # We only really run POST phase hooks, and are only interested in
898
    # their results
899
    if phase == constants.HOOKS_PHASE_POST:
900
      # Used to change hooks' output to proper indentation
901
      indent_re = re.compile('^', re.M)
902
      feedback_fn("* Hooks Results")
903
      if not hooks_results:
904
        feedback_fn("  - ERROR: general communication failure")
905
        lu_result = 1
906
      else:
907
        for node_name in hooks_results:
908
          show_node_header = True
909
          res = hooks_results[node_name]
910
          if res is False or not isinstance(res, list):
911
            feedback_fn("    Communication failure")
912
            lu_result = 1
913
            continue
914
          for script, hkr, output in res:
915
            if hkr == constants.HKR_FAIL:
916
              # The node header is only shown once, if there are
917
              # failing hooks on that node
918
              if show_node_header:
919
                feedback_fn("  Node %s:" % node_name)
920
                show_node_header = False
921
              feedback_fn("    ERROR: Script %s failed, output:" % script)
922
              output = indent_re.sub('      ', output)
923
              feedback_fn("%s" % output)
924
              lu_result = 1
925

    
926
      return lu_result
927

    
928

    
929
class LUVerifyDisks(NoHooksLU):
930
  """Verifies the cluster disks status.
931

932
  """
933
  _OP_REQP = []
934
  REQ_BGL = False
935

    
936
  def ExpandNames(self):
937
    self.needed_locks = {
938
      locking.LEVEL_NODE: locking.ALL_SET,
939
      locking.LEVEL_INSTANCE: locking.ALL_SET,
940
    }
941
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
942

    
943
  def CheckPrereq(self):
944
    """Check prerequisites.
945

946
    This has no prerequisites.
947

948
    """
949
    pass
950

    
951
  def Exec(self, feedback_fn):
952
    """Verify integrity of cluster disks.
953

954
    """
955
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
956

    
957
    vg_name = self.cfg.GetVGName()
958
    nodes = utils.NiceSort(self.cfg.GetNodeList())
959
    instances = [self.cfg.GetInstanceInfo(name)
960
                 for name in self.cfg.GetInstanceList()]
961

    
962
    nv_dict = {}
963
    for inst in instances:
964
      inst_lvs = {}
965
      if (inst.status != "up" or
966
          inst.disk_template not in constants.DTS_NET_MIRROR):
967
        continue
968
      inst.MapLVsByNode(inst_lvs)
969
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
970
      for node, vol_list in inst_lvs.iteritems():
971
        for vol in vol_list:
972
          nv_dict[(node, vol)] = inst
973

    
974
    if not nv_dict:
975
      return result
976

    
977
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
978

    
979
    to_act = set()
980
    for node in nodes:
981
      # node_volume
982
      lvs = node_lvs[node]
983

    
984
      if isinstance(lvs, basestring):
985
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
986
        res_nlvm[node] = lvs
987
      elif not isinstance(lvs, dict):
988
        logger.Info("connection to node %s failed or invalid data returned" %
989
                    (node,))
990
        res_nodes.append(node)
991
        continue
992

    
993
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
994
        inst = nv_dict.pop((node, lv_name), None)
995
        if (not lv_online and inst is not None
996
            and inst.name not in res_instances):
997
          res_instances.append(inst.name)
998

    
999
    # any leftover items in nv_dict are missing LVs, let's arrange the
1000
    # data better
1001
    for key, inst in nv_dict.iteritems():
1002
      if inst.name not in res_missing:
1003
        res_missing[inst.name] = []
1004
      res_missing[inst.name].append(key)
1005

    
1006
    return result
1007

    
1008

    
1009
class LURenameCluster(LogicalUnit):
1010
  """Rename the cluster.
1011

1012
  """
1013
  HPATH = "cluster-rename"
1014
  HTYPE = constants.HTYPE_CLUSTER
1015
  _OP_REQP = ["name"]
1016

    
1017
  def BuildHooksEnv(self):
1018
    """Build hooks env.
1019

1020
    """
1021
    env = {
1022
      "OP_TARGET": self.cfg.GetClusterName(),
1023
      "NEW_NAME": self.op.name,
1024
      }
1025
    mn = self.cfg.GetMasterNode()
1026
    return env, [mn], [mn]
1027

    
1028
  def CheckPrereq(self):
1029
    """Verify that the passed name is a valid one.
1030

1031
    """
1032
    hostname = utils.HostInfo(self.op.name)
1033

    
1034
    new_name = hostname.name
1035
    self.ip = new_ip = hostname.ip
1036
    old_name = self.cfg.GetClusterName()
1037
    old_ip = self.cfg.GetMasterIP()
1038
    if new_name == old_name and new_ip == old_ip:
1039
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1040
                                 " cluster has changed")
1041
    if new_ip != old_ip:
1042
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1043
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1044
                                   " reachable on the network. Aborting." %
1045
                                   new_ip)
1046

    
1047
    self.op.name = new_name
1048

    
1049
  def Exec(self, feedback_fn):
1050
    """Rename the cluster.
1051

1052
    """
1053
    clustername = self.op.name
1054
    ip = self.ip
1055

    
1056
    # shutdown the master IP
1057
    master = self.cfg.GetMasterNode()
1058
    if not self.rpc.call_node_stop_master(master, False):
1059
      raise errors.OpExecError("Could not disable the master role")
1060

    
1061
    try:
1062
      # modify the sstore
1063
      # TODO: sstore
1064
      ss.SetKey(ss.SS_MASTER_IP, ip)
1065
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1066

    
1067
      # Distribute updated ss config to all nodes
1068
      myself = self.cfg.GetNodeInfo(master)
1069
      dist_nodes = self.cfg.GetNodeList()
1070
      if myself.name in dist_nodes:
1071
        dist_nodes.remove(myself.name)
1072

    
1073
      logger.Debug("Copying updated ssconf data to all nodes")
1074
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1075
        fname = ss.KeyToFilename(keyname)
1076
        result = self.rpc.call_upload_file(dist_nodes, fname)
1077
        for to_node in dist_nodes:
1078
          if not result[to_node]:
1079
            logger.Error("copy of file %s to node %s failed" %
1080
                         (fname, to_node))
1081
    finally:
1082
      if not self.rpc.call_node_start_master(master, False):
1083
        logger.Error("Could not re-enable the master role on the master,"
1084
                     " please restart manually.")
1085

    
1086

    
1087
def _RecursiveCheckIfLVMBased(disk):
1088
  """Check if the given disk or its children are lvm-based.
1089

1090
  Args:
1091
    disk: ganeti.objects.Disk object
1092

1093
  Returns:
1094
    boolean indicating whether a LD_LV dev_type was found or not
1095

1096
  """
1097
  if disk.children:
1098
    for chdisk in disk.children:
1099
      if _RecursiveCheckIfLVMBased(chdisk):
1100
        return True
1101
  return disk.dev_type == constants.LD_LV
1102

    
1103

    
1104
class LUSetClusterParams(LogicalUnit):
1105
  """Change the parameters of the cluster.
1106

1107
  """
1108
  HPATH = "cluster-modify"
1109
  HTYPE = constants.HTYPE_CLUSTER
1110
  _OP_REQP = []
1111
  REQ_BGL = False
1112

    
1113
  def ExpandNames(self):
1114
    # FIXME: in the future maybe other cluster params won't require checking on
1115
    # all nodes to be modified.
1116
    self.needed_locks = {
1117
      locking.LEVEL_NODE: locking.ALL_SET,
1118
    }
1119
    self.share_locks[locking.LEVEL_NODE] = 1
1120

    
1121
  def BuildHooksEnv(self):
1122
    """Build hooks env.
1123

1124
    """
1125
    env = {
1126
      "OP_TARGET": self.cfg.GetClusterName(),
1127
      "NEW_VG_NAME": self.op.vg_name,
1128
      }
1129
    mn = self.cfg.GetMasterNode()
1130
    return env, [mn], [mn]
1131

    
1132
  def CheckPrereq(self):
1133
    """Check prerequisites.
1134

1135
    This checks whether the given params don't conflict and
1136
    if the given volume group is valid.
1137

1138
    """
1139
    # FIXME: This only works because there is only one parameter that can be
1140
    # changed or removed.
1141
    if not self.op.vg_name:
1142
      instances = self.cfg.GetAllInstancesInfo().values()
1143
      for inst in instances:
1144
        for disk in inst.disks:
1145
          if _RecursiveCheckIfLVMBased(disk):
1146
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1147
                                       " lvm-based instances exist")
1148

    
1149
    # if vg_name not None, checks given volume group on all nodes
1150
    if self.op.vg_name:
1151
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1152
      vglist = self.rpc.call_vg_list(node_list)
1153
      for node in node_list:
1154
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1155
                                              constants.MIN_VG_SIZE)
1156
        if vgstatus:
1157
          raise errors.OpPrereqError("Error on node '%s': %s" %
1158
                                     (node, vgstatus))
1159

    
1160
  def Exec(self, feedback_fn):
1161
    """Change the parameters of the cluster.
1162

1163
    """
1164
    if self.op.vg_name != self.cfg.GetVGName():
1165
      self.cfg.SetVGName(self.op.vg_name)
1166
    else:
1167
      feedback_fn("Cluster LVM configuration already in desired"
1168
                  " state, not changing")
1169

    
1170

    
1171
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1172
  """Sleep and poll for an instance's disk to sync.
1173

1174
  """
1175
  if not instance.disks:
1176
    return True
1177

    
1178
  if not oneshot:
1179
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1180

    
1181
  node = instance.primary_node
1182

    
1183
  for dev in instance.disks:
1184
    lu.cfg.SetDiskID(dev, node)
1185

    
1186
  retries = 0
1187
  while True:
1188
    max_time = 0
1189
    done = True
1190
    cumul_degraded = False
1191
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1192
    if not rstats:
1193
      lu.proc.LogWarning("Can't get any data from node %s" % node)
1194
      retries += 1
1195
      if retries >= 10:
1196
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1197
                                 " aborting." % node)
1198
      time.sleep(6)
1199
      continue
1200
    retries = 0
1201
    for i in range(len(rstats)):
1202
      mstat = rstats[i]
1203
      if mstat is None:
1204
        lu.proc.LogWarning("Can't compute data for node %s/%s" %
1205
                           (node, instance.disks[i].iv_name))
1206
        continue
1207
      # we ignore the ldisk parameter
1208
      perc_done, est_time, is_degraded, _ = mstat
1209
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1210
      if perc_done is not None:
1211
        done = False
1212
        if est_time is not None:
1213
          rem_time = "%d estimated seconds remaining" % est_time
1214
          max_time = est_time
1215
        else:
1216
          rem_time = "no time estimate"
1217
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1218
                        (instance.disks[i].iv_name, perc_done, rem_time))
1219
    if done or oneshot:
1220
      break
1221

    
1222
    time.sleep(min(60, max_time))
1223

    
1224
  if done:
1225
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1226
  return not cumul_degraded
1227

    
1228

    
1229
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1230
  """Check that mirrors are not degraded.
1231

1232
  The ldisk parameter, if True, will change the test from the
1233
  is_degraded attribute (which represents overall non-ok status for
1234
  the device(s)) to the ldisk (representing the local storage status).
1235

1236
  """
1237
  lu.cfg.SetDiskID(dev, node)
1238
  if ldisk:
1239
    idx = 6
1240
  else:
1241
    idx = 5
1242

    
1243
  result = True
1244
  if on_primary or dev.AssembleOnSecondary():
1245
    rstats = lu.rpc.call_blockdev_find(node, dev)
1246
    if not rstats:
1247
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1248
      result = False
1249
    else:
1250
      result = result and (not rstats[idx])
1251
  if dev.children:
1252
    for child in dev.children:
1253
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1254

    
1255
  return result
1256

    
1257

    
1258
class LUDiagnoseOS(NoHooksLU):
1259
  """Logical unit for OS diagnose/query.
1260

1261
  """
1262
  _OP_REQP = ["output_fields", "names"]
1263
  REQ_BGL = False
1264

    
1265
  def ExpandNames(self):
1266
    if self.op.names:
1267
      raise errors.OpPrereqError("Selective OS query not supported")
1268

    
1269
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1270
    _CheckOutputFields(static=[],
1271
                       dynamic=self.dynamic_fields,
1272
                       selected=self.op.output_fields)
1273

    
1274
    # Lock all nodes, in shared mode
1275
    self.needed_locks = {}
1276
    self.share_locks[locking.LEVEL_NODE] = 1
1277
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1278

    
1279
  def CheckPrereq(self):
1280
    """Check prerequisites.
1281

1282
    """
1283

    
1284
  @staticmethod
1285
  def _DiagnoseByOS(node_list, rlist):
1286
    """Remaps a per-node return list into an a per-os per-node dictionary
1287

1288
      Args:
1289
        node_list: a list with the names of all nodes
1290
        rlist: a map with node names as keys and OS objects as values
1291

1292
      Returns:
1293
        map: a map with osnames as keys and as value another map, with
1294
             nodes as
1295
             keys and list of OS objects as values
1296
             e.g. {"debian-etch": {"node1": [<object>,...],
1297
                                   "node2": [<object>,]}
1298
                  }
1299

1300
    """
1301
    all_os = {}
1302
    for node_name, nr in rlist.iteritems():
1303
      if not nr:
1304
        continue
1305
      for os_obj in nr:
1306
        if os_obj.name not in all_os:
1307
          # build a list of nodes for this os containing empty lists
1308
          # for each node in node_list
1309
          all_os[os_obj.name] = {}
1310
          for nname in node_list:
1311
            all_os[os_obj.name][nname] = []
1312
        all_os[os_obj.name][node_name].append(os_obj)
1313
    return all_os
1314

    
1315
  def Exec(self, feedback_fn):
1316
    """Compute the list of OSes.
1317

1318
    """
1319
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1320
    node_data = self.rpc.call_os_diagnose(node_list)
1321
    if node_data == False:
1322
      raise errors.OpExecError("Can't gather the list of OSes")
1323
    pol = self._DiagnoseByOS(node_list, node_data)
1324
    output = []
1325
    for os_name, os_data in pol.iteritems():
1326
      row = []
1327
      for field in self.op.output_fields:
1328
        if field == "name":
1329
          val = os_name
1330
        elif field == "valid":
1331
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1332
        elif field == "node_status":
1333
          val = {}
1334
          for node_name, nos_list in os_data.iteritems():
1335
            val[node_name] = [(v.status, v.path) for v in nos_list]
1336
        else:
1337
          raise errors.ParameterError(field)
1338
        row.append(val)
1339
      output.append(row)
1340

    
1341
    return output
1342

    
1343

    
1344
class LURemoveNode(LogicalUnit):
1345
  """Logical unit for removing a node.
1346

1347
  """
1348
  HPATH = "node-remove"
1349
  HTYPE = constants.HTYPE_NODE
1350
  _OP_REQP = ["node_name"]
1351

    
1352
  def BuildHooksEnv(self):
1353
    """Build hooks env.
1354

1355
    This doesn't run on the target node in the pre phase as a failed
1356
    node would then be impossible to remove.
1357

1358
    """
1359
    env = {
1360
      "OP_TARGET": self.op.node_name,
1361
      "NODE_NAME": self.op.node_name,
1362
      }
1363
    all_nodes = self.cfg.GetNodeList()
1364
    all_nodes.remove(self.op.node_name)
1365
    return env, all_nodes, all_nodes
1366

    
1367
  def CheckPrereq(self):
1368
    """Check prerequisites.
1369

1370
    This checks:
1371
     - the node exists in the configuration
1372
     - it does not have primary or secondary instances
1373
     - it's not the master
1374

1375
    Any errors are signalled by raising errors.OpPrereqError.
1376

1377
    """
1378
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1379
    if node is None:
1380
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1381

    
1382
    instance_list = self.cfg.GetInstanceList()
1383

    
1384
    masternode = self.cfg.GetMasterNode()
1385
    if node.name == masternode:
1386
      raise errors.OpPrereqError("Node is the master node,"
1387
                                 " you need to failover first.")
1388

    
1389
    for instance_name in instance_list:
1390
      instance = self.cfg.GetInstanceInfo(instance_name)
1391
      if node.name == instance.primary_node:
1392
        raise errors.OpPrereqError("Instance %s still running on the node,"
1393
                                   " please remove first." % instance_name)
1394
      if node.name in instance.secondary_nodes:
1395
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1396
                                   " please remove first." % instance_name)
1397
    self.op.node_name = node.name
1398
    self.node = node
1399

    
1400
  def Exec(self, feedback_fn):
1401
    """Removes the node from the cluster.
1402

1403
    """
1404
    node = self.node
1405
    logger.Info("stopping the node daemon and removing configs from node %s" %
1406
                node.name)
1407

    
1408
    self.context.RemoveNode(node.name)
1409

    
1410
    self.rpc.call_node_leave_cluster(node.name)
1411

    
1412

    
1413
class LUQueryNodes(NoHooksLU):
1414
  """Logical unit for querying nodes.
1415

1416
  """
1417
  _OP_REQP = ["output_fields", "names"]
1418
  REQ_BGL = False
1419

    
1420
  def ExpandNames(self):
1421
    self.dynamic_fields = frozenset([
1422
      "dtotal", "dfree",
1423
      "mtotal", "mnode", "mfree",
1424
      "bootid",
1425
      "ctotal",
1426
      ])
1427

    
1428
    self.static_fields = frozenset([
1429
      "name", "pinst_cnt", "sinst_cnt",
1430
      "pinst_list", "sinst_list",
1431
      "pip", "sip", "tags",
1432
      "serial_no",
1433
      ])
1434

    
1435
    _CheckOutputFields(static=self.static_fields,
1436
                       dynamic=self.dynamic_fields,
1437
                       selected=self.op.output_fields)
1438

    
1439
    self.needed_locks = {}
1440
    self.share_locks[locking.LEVEL_NODE] = 1
1441

    
1442
    if self.op.names:
1443
      self.wanted = _GetWantedNodes(self, self.op.names)
1444
    else:
1445
      self.wanted = locking.ALL_SET
1446

    
1447
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1448
    if self.do_locking:
1449
      # if we don't request only static fields, we need to lock the nodes
1450
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1451

    
1452

    
1453
  def CheckPrereq(self):
1454
    """Check prerequisites.
1455

1456
    """
1457
    # The validation of the node list is done in the _GetWantedNodes,
1458
    # if non empty, and if empty, there's no validation to do
1459
    pass
1460

    
1461
  def Exec(self, feedback_fn):
1462
    """Computes the list of nodes and their attributes.
1463

1464
    """
1465
    all_info = self.cfg.GetAllNodesInfo()
1466
    if self.do_locking:
1467
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1468
    elif self.wanted != locking.ALL_SET:
1469
      nodenames = self.wanted
1470
      missing = set(nodenames).difference(all_info.keys())
1471
      if missing:
1472
        raise errors.OpExecError(
1473
          "Some nodes were removed before retrieving their data: %s" % missing)
1474
    else:
1475
      nodenames = all_info.keys()
1476
    nodelist = [all_info[name] for name in nodenames]
1477

    
1478
    # begin data gathering
1479

    
1480
    if self.dynamic_fields.intersection(self.op.output_fields):
1481
      live_data = {}
1482
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1483
                                          self.cfg.GetHypervisorType())
1484
      for name in nodenames:
1485
        nodeinfo = node_data.get(name, None)
1486
        if nodeinfo:
1487
          live_data[name] = {
1488
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1489
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1490
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1491
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1492
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1493
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1494
            "bootid": nodeinfo['bootid'],
1495
            }
1496
        else:
1497
          live_data[name] = {}
1498
    else:
1499
      live_data = dict.fromkeys(nodenames, {})
1500

    
1501
    node_to_primary = dict([(name, set()) for name in nodenames])
1502
    node_to_secondary = dict([(name, set()) for name in nodenames])
1503

    
1504
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1505
                             "sinst_cnt", "sinst_list"))
1506
    if inst_fields & frozenset(self.op.output_fields):
1507
      instancelist = self.cfg.GetInstanceList()
1508

    
1509
      for instance_name in instancelist:
1510
        inst = self.cfg.GetInstanceInfo(instance_name)
1511
        if inst.primary_node in node_to_primary:
1512
          node_to_primary[inst.primary_node].add(inst.name)
1513
        for secnode in inst.secondary_nodes:
1514
          if secnode in node_to_secondary:
1515
            node_to_secondary[secnode].add(inst.name)
1516

    
1517
    # end data gathering
1518

    
1519
    output = []
1520
    for node in nodelist:
1521
      node_output = []
1522
      for field in self.op.output_fields:
1523
        if field == "name":
1524
          val = node.name
1525
        elif field == "pinst_list":
1526
          val = list(node_to_primary[node.name])
1527
        elif field == "sinst_list":
1528
          val = list(node_to_secondary[node.name])
1529
        elif field == "pinst_cnt":
1530
          val = len(node_to_primary[node.name])
1531
        elif field == "sinst_cnt":
1532
          val = len(node_to_secondary[node.name])
1533
        elif field == "pip":
1534
          val = node.primary_ip
1535
        elif field == "sip":
1536
          val = node.secondary_ip
1537
        elif field == "tags":
1538
          val = list(node.GetTags())
1539
        elif field == "serial_no":
1540
          val = node.serial_no
1541
        elif field in self.dynamic_fields:
1542
          val = live_data[node.name].get(field, None)
1543
        else:
1544
          raise errors.ParameterError(field)
1545
        node_output.append(val)
1546
      output.append(node_output)
1547

    
1548
    return output
1549

    
1550

    
1551
class LUQueryNodeVolumes(NoHooksLU):
1552
  """Logical unit for getting volumes on node(s).
1553

1554
  """
1555
  _OP_REQP = ["nodes", "output_fields"]
1556
  REQ_BGL = False
1557

    
1558
  def ExpandNames(self):
1559
    _CheckOutputFields(static=["node"],
1560
                       dynamic=["phys", "vg", "name", "size", "instance"],
1561
                       selected=self.op.output_fields)
1562

    
1563
    self.needed_locks = {}
1564
    self.share_locks[locking.LEVEL_NODE] = 1
1565
    if not self.op.nodes:
1566
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1567
    else:
1568
      self.needed_locks[locking.LEVEL_NODE] = \
1569
        _GetWantedNodes(self, self.op.nodes)
1570

    
1571
  def CheckPrereq(self):
1572
    """Check prerequisites.
1573

1574
    This checks that the fields required are valid output fields.
1575

1576
    """
1577
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1578

    
1579
  def Exec(self, feedback_fn):
1580
    """Computes the list of nodes and their attributes.
1581

1582
    """
1583
    nodenames = self.nodes
1584
    volumes = self.rpc.call_node_volumes(nodenames)
1585

    
1586
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1587
             in self.cfg.GetInstanceList()]
1588

    
1589
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1590

    
1591
    output = []
1592
    for node in nodenames:
1593
      if node not in volumes or not volumes[node]:
1594
        continue
1595

    
1596
      node_vols = volumes[node][:]
1597
      node_vols.sort(key=lambda vol: vol['dev'])
1598

    
1599
      for vol in node_vols:
1600
        node_output = []
1601
        for field in self.op.output_fields:
1602
          if field == "node":
1603
            val = node
1604
          elif field == "phys":
1605
            val = vol['dev']
1606
          elif field == "vg":
1607
            val = vol['vg']
1608
          elif field == "name":
1609
            val = vol['name']
1610
          elif field == "size":
1611
            val = int(float(vol['size']))
1612
          elif field == "instance":
1613
            for inst in ilist:
1614
              if node not in lv_by_node[inst]:
1615
                continue
1616
              if vol['name'] in lv_by_node[inst][node]:
1617
                val = inst.name
1618
                break
1619
            else:
1620
              val = '-'
1621
          else:
1622
            raise errors.ParameterError(field)
1623
          node_output.append(str(val))
1624

    
1625
        output.append(node_output)
1626

    
1627
    return output
1628

    
1629

    
1630
class LUAddNode(LogicalUnit):
1631
  """Logical unit for adding node to the cluster.
1632

1633
  """
1634
  HPATH = "node-add"
1635
  HTYPE = constants.HTYPE_NODE
1636
  _OP_REQP = ["node_name"]
1637

    
1638
  def BuildHooksEnv(self):
1639
    """Build hooks env.
1640

1641
    This will run on all nodes before, and on all nodes + the new node after.
1642

1643
    """
1644
    env = {
1645
      "OP_TARGET": self.op.node_name,
1646
      "NODE_NAME": self.op.node_name,
1647
      "NODE_PIP": self.op.primary_ip,
1648
      "NODE_SIP": self.op.secondary_ip,
1649
      }
1650
    nodes_0 = self.cfg.GetNodeList()
1651
    nodes_1 = nodes_0 + [self.op.node_name, ]
1652
    return env, nodes_0, nodes_1
1653

    
1654
  def CheckPrereq(self):
1655
    """Check prerequisites.
1656

1657
    This checks:
1658
     - the new node is not already in the config
1659
     - it is resolvable
1660
     - its parameters (single/dual homed) matches the cluster
1661

1662
    Any errors are signalled by raising errors.OpPrereqError.
1663

1664
    """
1665
    node_name = self.op.node_name
1666
    cfg = self.cfg
1667

    
1668
    dns_data = utils.HostInfo(node_name)
1669

    
1670
    node = dns_data.name
1671
    primary_ip = self.op.primary_ip = dns_data.ip
1672
    secondary_ip = getattr(self.op, "secondary_ip", None)
1673
    if secondary_ip is None:
1674
      secondary_ip = primary_ip
1675
    if not utils.IsValidIP(secondary_ip):
1676
      raise errors.OpPrereqError("Invalid secondary IP given")
1677
    self.op.secondary_ip = secondary_ip
1678

    
1679
    node_list = cfg.GetNodeList()
1680
    if not self.op.readd and node in node_list:
1681
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1682
                                 node)
1683
    elif self.op.readd and node not in node_list:
1684
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1685

    
1686
    for existing_node_name in node_list:
1687
      existing_node = cfg.GetNodeInfo(existing_node_name)
1688

    
1689
      if self.op.readd and node == existing_node_name:
1690
        if (existing_node.primary_ip != primary_ip or
1691
            existing_node.secondary_ip != secondary_ip):
1692
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1693
                                     " address configuration as before")
1694
        continue
1695

    
1696
      if (existing_node.primary_ip == primary_ip or
1697
          existing_node.secondary_ip == primary_ip or
1698
          existing_node.primary_ip == secondary_ip or
1699
          existing_node.secondary_ip == secondary_ip):
1700
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1701
                                   " existing node %s" % existing_node.name)
1702

    
1703
    # check that the type of the node (single versus dual homed) is the
1704
    # same as for the master
1705
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1706
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1707
    newbie_singlehomed = secondary_ip == primary_ip
1708
    if master_singlehomed != newbie_singlehomed:
1709
      if master_singlehomed:
1710
        raise errors.OpPrereqError("The master has no private ip but the"
1711
                                   " new node has one")
1712
      else:
1713
        raise errors.OpPrereqError("The master has a private ip but the"
1714
                                   " new node doesn't have one")
1715

    
1716
    # checks reachablity
1717
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1718
      raise errors.OpPrereqError("Node not reachable by ping")
1719

    
1720
    if not newbie_singlehomed:
1721
      # check reachability from my secondary ip to newbie's secondary ip
1722
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1723
                           source=myself.secondary_ip):
1724
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1725
                                   " based ping to noded port")
1726

    
1727
    self.new_node = objects.Node(name=node,
1728
                                 primary_ip=primary_ip,
1729
                                 secondary_ip=secondary_ip)
1730

    
1731
  def Exec(self, feedback_fn):
1732
    """Adds the new node to the cluster.
1733

1734
    """
1735
    new_node = self.new_node
1736
    node = new_node.name
1737

    
1738
    # check connectivity
1739
    result = self.rpc.call_version([node])[node]
1740
    if result:
1741
      if constants.PROTOCOL_VERSION == result:
1742
        logger.Info("communication to node %s fine, sw version %s match" %
1743
                    (node, result))
1744
      else:
1745
        raise errors.OpExecError("Version mismatch master version %s,"
1746
                                 " node version %s" %
1747
                                 (constants.PROTOCOL_VERSION, result))
1748
    else:
1749
      raise errors.OpExecError("Cannot get version from the new node")
1750

    
1751
    # setup ssh on node
1752
    logger.Info("copy ssh key to node %s" % node)
1753
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1754
    keyarray = []
1755
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1756
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1757
                priv_key, pub_key]
1758

    
1759
    for i in keyfiles:
1760
      f = open(i, 'r')
1761
      try:
1762
        keyarray.append(f.read())
1763
      finally:
1764
        f.close()
1765

    
1766
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1767
                                    keyarray[2],
1768
                                    keyarray[3], keyarray[4], keyarray[5])
1769

    
1770
    if not result:
1771
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1772

    
1773
    # Add node to our /etc/hosts, and add key to known_hosts
1774
    utils.AddHostToEtcHosts(new_node.name)
1775

    
1776
    if new_node.secondary_ip != new_node.primary_ip:
1777
      if not self.rpc.call_node_has_ip_address(new_node.name,
1778
                                               new_node.secondary_ip):
1779
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1780
                                 " you gave (%s). Please fix and re-run this"
1781
                                 " command." % new_node.secondary_ip)
1782

    
1783
    node_verify_list = [self.cfg.GetMasterNode()]
1784
    node_verify_param = {
1785
      'nodelist': [node],
1786
      # TODO: do a node-net-test as well?
1787
    }
1788

    
1789
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1790
                                       self.cfg.GetClusterName())
1791
    for verifier in node_verify_list:
1792
      if not result[verifier]:
1793
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1794
                                 " for remote verification" % verifier)
1795
      if result[verifier]['nodelist']:
1796
        for failed in result[verifier]['nodelist']:
1797
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1798
                      (verifier, result[verifier]['nodelist'][failed]))
1799
        raise errors.OpExecError("ssh/hostname verification failed.")
1800

    
1801
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1802
    # including the node just added
1803
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1804
    dist_nodes = self.cfg.GetNodeList()
1805
    if not self.op.readd:
1806
      dist_nodes.append(node)
1807
    if myself.name in dist_nodes:
1808
      dist_nodes.remove(myself.name)
1809

    
1810
    logger.Debug("Copying hosts and known_hosts to all nodes")
1811
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1812
      result = self.rpc.call_upload_file(dist_nodes, fname)
1813
      for to_node in dist_nodes:
1814
        if not result[to_node]:
1815
          logger.Error("copy of file %s to node %s failed" %
1816
                       (fname, to_node))
1817

    
1818
    to_copy = []
1819
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1820
      to_copy.append(constants.VNC_PASSWORD_FILE)
1821
    for fname in to_copy:
1822
      result = self.rpc.call_upload_file([node], fname)
1823
      if not result[node]:
1824
        logger.Error("could not copy file %s to node %s" % (fname, node))
1825

    
1826
    if self.op.readd:
1827
      self.context.ReaddNode(new_node)
1828
    else:
1829
      self.context.AddNode(new_node)
1830

    
1831

    
1832
class LUQueryClusterInfo(NoHooksLU):
1833
  """Query cluster configuration.
1834

1835
  """
1836
  _OP_REQP = []
1837
  REQ_MASTER = False
1838
  REQ_BGL = False
1839

    
1840
  def ExpandNames(self):
1841
    self.needed_locks = {}
1842

    
1843
  def CheckPrereq(self):
1844
    """No prerequsites needed for this LU.
1845

1846
    """
1847
    pass
1848

    
1849
  def Exec(self, feedback_fn):
1850
    """Return cluster config.
1851

1852
    """
1853
    result = {
1854
      "name": self.cfg.GetClusterName(),
1855
      "software_version": constants.RELEASE_VERSION,
1856
      "protocol_version": constants.PROTOCOL_VERSION,
1857
      "config_version": constants.CONFIG_VERSION,
1858
      "os_api_version": constants.OS_API_VERSION,
1859
      "export_version": constants.EXPORT_VERSION,
1860
      "master": self.cfg.GetMasterNode(),
1861
      "architecture": (platform.architecture()[0], platform.machine()),
1862
      "hypervisor_type": self.cfg.GetHypervisorType(),
1863
      "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1864
      }
1865

    
1866
    return result
1867

    
1868

    
1869
class LUQueryConfigValues(NoHooksLU):
1870
  """Return configuration values.
1871

1872
  """
1873
  _OP_REQP = []
1874
  REQ_BGL = False
1875

    
1876
  def ExpandNames(self):
1877
    self.needed_locks = {}
1878

    
1879
    static_fields = ["cluster_name", "master_node"]
1880
    _CheckOutputFields(static=static_fields,
1881
                       dynamic=[],
1882
                       selected=self.op.output_fields)
1883

    
1884
  def CheckPrereq(self):
1885
    """No prerequisites.
1886

1887
    """
1888
    pass
1889

    
1890
  def Exec(self, feedback_fn):
1891
    """Dump a representation of the cluster config to the standard output.
1892

1893
    """
1894
    values = []
1895
    for field in self.op.output_fields:
1896
      if field == "cluster_name":
1897
        values.append(self.cfg.GetClusterName())
1898
      elif field == "master_node":
1899
        values.append(self.cfg.GetMasterNode())
1900
      else:
1901
        raise errors.ParameterError(field)
1902
    return values
1903

    
1904

    
1905
class LUActivateInstanceDisks(NoHooksLU):
1906
  """Bring up an instance's disks.
1907

1908
  """
1909
  _OP_REQP = ["instance_name"]
1910
  REQ_BGL = False
1911

    
1912
  def ExpandNames(self):
1913
    self._ExpandAndLockInstance()
1914
    self.needed_locks[locking.LEVEL_NODE] = []
1915
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1916

    
1917
  def DeclareLocks(self, level):
1918
    if level == locking.LEVEL_NODE:
1919
      self._LockInstancesNodes()
1920

    
1921
  def CheckPrereq(self):
1922
    """Check prerequisites.
1923

1924
    This checks that the instance is in the cluster.
1925

1926
    """
1927
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1928
    assert self.instance is not None, \
1929
      "Cannot retrieve locked instance %s" % self.op.instance_name
1930

    
1931
  def Exec(self, feedback_fn):
1932
    """Activate the disks.
1933

1934
    """
1935
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1936
    if not disks_ok:
1937
      raise errors.OpExecError("Cannot activate block devices")
1938

    
1939
    return disks_info
1940

    
1941

    
1942
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1943
  """Prepare the block devices for an instance.
1944

1945
  This sets up the block devices on all nodes.
1946

1947
  Args:
1948
    instance: a ganeti.objects.Instance object
1949
    ignore_secondaries: if true, errors on secondary nodes won't result
1950
                        in an error return from the function
1951

1952
  Returns:
1953
    false if the operation failed
1954
    list of (host, instance_visible_name, node_visible_name) if the operation
1955
         suceeded with the mapping from node devices to instance devices
1956
  """
1957
  device_info = []
1958
  disks_ok = True
1959
  iname = instance.name
1960
  # With the two passes mechanism we try to reduce the window of
1961
  # opportunity for the race condition of switching DRBD to primary
1962
  # before handshaking occured, but we do not eliminate it
1963

    
1964
  # The proper fix would be to wait (with some limits) until the
1965
  # connection has been made and drbd transitions from WFConnection
1966
  # into any other network-connected state (Connected, SyncTarget,
1967
  # SyncSource, etc.)
1968

    
1969
  # 1st pass, assemble on all nodes in secondary mode
1970
  for inst_disk in instance.disks:
1971
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1972
      lu.cfg.SetDiskID(node_disk, node)
1973
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1974
      if not result:
1975
        logger.Error("could not prepare block device %s on node %s"
1976
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1977
        if not ignore_secondaries:
1978
          disks_ok = False
1979

    
1980
  # FIXME: race condition on drbd migration to primary
1981

    
1982
  # 2nd pass, do only the primary node
1983
  for inst_disk in instance.disks:
1984
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1985
      if node != instance.primary_node:
1986
        continue
1987
      lu.cfg.SetDiskID(node_disk, node)
1988
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
1989
      if not result:
1990
        logger.Error("could not prepare block device %s on node %s"
1991
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1992
        disks_ok = False
1993
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1994

    
1995
  # leave the disks configured for the primary node
1996
  # this is a workaround that would be fixed better by
1997
  # improving the logical/physical id handling
1998
  for disk in instance.disks:
1999
    lu.cfg.SetDiskID(disk, instance.primary_node)
2000

    
2001
  return disks_ok, device_info
2002

    
2003

    
2004
def _StartInstanceDisks(lu, instance, force):
2005
  """Start the disks of an instance.
2006

2007
  """
2008
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2009
                                           ignore_secondaries=force)
2010
  if not disks_ok:
2011
    _ShutdownInstanceDisks(lu, instance)
2012
    if force is not None and not force:
2013
      logger.Error("If the message above refers to a secondary node,"
2014
                   " you can retry the operation using '--force'.")
2015
    raise errors.OpExecError("Disk consistency error")
2016

    
2017

    
2018
class LUDeactivateInstanceDisks(NoHooksLU):
2019
  """Shutdown an instance's disks.
2020

2021
  """
2022
  _OP_REQP = ["instance_name"]
2023
  REQ_BGL = False
2024

    
2025
  def ExpandNames(self):
2026
    self._ExpandAndLockInstance()
2027
    self.needed_locks[locking.LEVEL_NODE] = []
2028
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2029

    
2030
  def DeclareLocks(self, level):
2031
    if level == locking.LEVEL_NODE:
2032
      self._LockInstancesNodes()
2033

    
2034
  def CheckPrereq(self):
2035
    """Check prerequisites.
2036

2037
    This checks that the instance is in the cluster.
2038

2039
    """
2040
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2041
    assert self.instance is not None, \
2042
      "Cannot retrieve locked instance %s" % self.op.instance_name
2043

    
2044
  def Exec(self, feedback_fn):
2045
    """Deactivate the disks
2046

2047
    """
2048
    instance = self.instance
2049
    _SafeShutdownInstanceDisks(self, instance)
2050

    
2051

    
2052
def _SafeShutdownInstanceDisks(lu, instance):
2053
  """Shutdown block devices of an instance.
2054

2055
  This function checks if an instance is running, before calling
2056
  _ShutdownInstanceDisks.
2057

2058
  """
2059
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2060
                                      [instance.hypervisor])
2061
  ins_l = ins_l[instance.primary_node]
2062
  if not type(ins_l) is list:
2063
    raise errors.OpExecError("Can't contact node '%s'" %
2064
                             instance.primary_node)
2065

    
2066
  if instance.name in ins_l:
2067
    raise errors.OpExecError("Instance is running, can't shutdown"
2068
                             " block devices.")
2069

    
2070
  _ShutdownInstanceDisks(lu, instance)
2071

    
2072

    
2073
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2074
  """Shutdown block devices of an instance.
2075

2076
  This does the shutdown on all nodes of the instance.
2077

2078
  If the ignore_primary is false, errors on the primary node are
2079
  ignored.
2080

2081
  """
2082
  result = True
2083
  for disk in instance.disks:
2084
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2085
      lu.cfg.SetDiskID(top_disk, node)
2086
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2087
        logger.Error("could not shutdown block device %s on node %s" %
2088
                     (disk.iv_name, node))
2089
        if not ignore_primary or node != instance.primary_node:
2090
          result = False
2091
  return result
2092

    
2093

    
2094
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2095
  """Checks if a node has enough free memory.
2096

2097
  This function check if a given node has the needed amount of free
2098
  memory. In case the node has less memory or we cannot get the
2099
  information from the node, this function raise an OpPrereqError
2100
  exception.
2101

2102
  @type lu: C{LogicalUnit}
2103
  @param lu: a logical unit from which we get configuration data
2104
  @type node: C{str}
2105
  @param node: the node to check
2106
  @type reason: C{str}
2107
  @param reason: string to use in the error message
2108
  @type requested: C{int}
2109
  @param requested: the amount of memory in MiB to check for
2110
  @type hypervisor: C{str}
2111
  @param hypervisor: the hypervisor to ask for memory stats
2112
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2113
      we cannot check the node
2114

2115
  """
2116
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2117
  if not nodeinfo or not isinstance(nodeinfo, dict):
2118
    raise errors.OpPrereqError("Could not contact node %s for resource"
2119
                             " information" % (node,))
2120

    
2121
  free_mem = nodeinfo[node].get('memory_free')
2122
  if not isinstance(free_mem, int):
2123
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2124
                             " was '%s'" % (node, free_mem))
2125
  if requested > free_mem:
2126
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2127
                             " needed %s MiB, available %s MiB" %
2128
                             (node, reason, requested, free_mem))
2129

    
2130

    
2131
class LUStartupInstance(LogicalUnit):
2132
  """Starts an instance.
2133

2134
  """
2135
  HPATH = "instance-start"
2136
  HTYPE = constants.HTYPE_INSTANCE
2137
  _OP_REQP = ["instance_name", "force"]
2138
  REQ_BGL = False
2139

    
2140
  def ExpandNames(self):
2141
    self._ExpandAndLockInstance()
2142
    self.needed_locks[locking.LEVEL_NODE] = []
2143
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2144

    
2145
  def DeclareLocks(self, level):
2146
    if level == locking.LEVEL_NODE:
2147
      self._LockInstancesNodes()
2148

    
2149
  def BuildHooksEnv(self):
2150
    """Build hooks env.
2151

2152
    This runs on master, primary and secondary nodes of the instance.
2153

2154
    """
2155
    env = {
2156
      "FORCE": self.op.force,
2157
      }
2158
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2159
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2160
          list(self.instance.secondary_nodes))
2161
    return env, nl, nl
2162

    
2163
  def CheckPrereq(self):
2164
    """Check prerequisites.
2165

2166
    This checks that the instance is in the cluster.
2167

2168
    """
2169
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2170
    assert self.instance is not None, \
2171
      "Cannot retrieve locked instance %s" % self.op.instance_name
2172

    
2173
    # check bridges existance
2174
    _CheckInstanceBridgesExist(self, instance)
2175

    
2176
    _CheckNodeFreeMemory(self, instance.primary_node,
2177
                         "starting instance %s" % instance.name,
2178
                         instance.memory, instance.hypervisor)
2179

    
2180
  def Exec(self, feedback_fn):
2181
    """Start the instance.
2182

2183
    """
2184
    instance = self.instance
2185
    force = self.op.force
2186
    extra_args = getattr(self.op, "extra_args", "")
2187

    
2188
    self.cfg.MarkInstanceUp(instance.name)
2189

    
2190
    node_current = instance.primary_node
2191

    
2192
    _StartInstanceDisks(self, instance, force)
2193

    
2194
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2195
      _ShutdownInstanceDisks(self, instance)
2196
      raise errors.OpExecError("Could not start instance")
2197

    
2198

    
2199
class LURebootInstance(LogicalUnit):
2200
  """Reboot an instance.
2201

2202
  """
2203
  HPATH = "instance-reboot"
2204
  HTYPE = constants.HTYPE_INSTANCE
2205
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2206
  REQ_BGL = False
2207

    
2208
  def ExpandNames(self):
2209
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2210
                                   constants.INSTANCE_REBOOT_HARD,
2211
                                   constants.INSTANCE_REBOOT_FULL]:
2212
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2213
                                  (constants.INSTANCE_REBOOT_SOFT,
2214
                                   constants.INSTANCE_REBOOT_HARD,
2215
                                   constants.INSTANCE_REBOOT_FULL))
2216
    self._ExpandAndLockInstance()
2217
    self.needed_locks[locking.LEVEL_NODE] = []
2218
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2219

    
2220
  def DeclareLocks(self, level):
2221
    if level == locking.LEVEL_NODE:
2222
      primary_only = not constants.INSTANCE_REBOOT_FULL
2223
      self._LockInstancesNodes(primary_only=primary_only)
2224

    
2225
  def BuildHooksEnv(self):
2226
    """Build hooks env.
2227

2228
    This runs on master, primary and secondary nodes of the instance.
2229

2230
    """
2231
    env = {
2232
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2233
      }
2234
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2235
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2236
          list(self.instance.secondary_nodes))
2237
    return env, nl, nl
2238

    
2239
  def CheckPrereq(self):
2240
    """Check prerequisites.
2241

2242
    This checks that the instance is in the cluster.
2243

2244
    """
2245
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2246
    assert self.instance is not None, \
2247
      "Cannot retrieve locked instance %s" % self.op.instance_name
2248

    
2249
    # check bridges existance
2250
    _CheckInstanceBridgesExist(self, instance)
2251

    
2252
  def Exec(self, feedback_fn):
2253
    """Reboot the instance.
2254

2255
    """
2256
    instance = self.instance
2257
    ignore_secondaries = self.op.ignore_secondaries
2258
    reboot_type = self.op.reboot_type
2259
    extra_args = getattr(self.op, "extra_args", "")
2260

    
2261
    node_current = instance.primary_node
2262

    
2263
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2264
                       constants.INSTANCE_REBOOT_HARD]:
2265
      if not self.rpc.call_instance_reboot(node_current, instance,
2266
                                           reboot_type, extra_args):
2267
        raise errors.OpExecError("Could not reboot instance")
2268
    else:
2269
      if not self.rpc.call_instance_shutdown(node_current, instance):
2270
        raise errors.OpExecError("could not shutdown instance for full reboot")
2271
      _ShutdownInstanceDisks(self, instance)
2272
      _StartInstanceDisks(self, instance, ignore_secondaries)
2273
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2274
        _ShutdownInstanceDisks(self, instance)
2275
        raise errors.OpExecError("Could not start instance for full reboot")
2276

    
2277
    self.cfg.MarkInstanceUp(instance.name)
2278

    
2279

    
2280
class LUShutdownInstance(LogicalUnit):
2281
  """Shutdown an instance.
2282

2283
  """
2284
  HPATH = "instance-stop"
2285
  HTYPE = constants.HTYPE_INSTANCE
2286
  _OP_REQP = ["instance_name"]
2287
  REQ_BGL = False
2288

    
2289
  def ExpandNames(self):
2290
    self._ExpandAndLockInstance()
2291
    self.needed_locks[locking.LEVEL_NODE] = []
2292
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2293

    
2294
  def DeclareLocks(self, level):
2295
    if level == locking.LEVEL_NODE:
2296
      self._LockInstancesNodes()
2297

    
2298
  def BuildHooksEnv(self):
2299
    """Build hooks env.
2300

2301
    This runs on master, primary and secondary nodes of the instance.
2302

2303
    """
2304
    env = _BuildInstanceHookEnvByObject(self.instance)
2305
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2306
          list(self.instance.secondary_nodes))
2307
    return env, nl, nl
2308

    
2309
  def CheckPrereq(self):
2310
    """Check prerequisites.
2311

2312
    This checks that the instance is in the cluster.
2313

2314
    """
2315
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2316
    assert self.instance is not None, \
2317
      "Cannot retrieve locked instance %s" % self.op.instance_name
2318

    
2319
  def Exec(self, feedback_fn):
2320
    """Shutdown the instance.
2321

2322
    """
2323
    instance = self.instance
2324
    node_current = instance.primary_node
2325
    self.cfg.MarkInstanceDown(instance.name)
2326
    if not self.rpc.call_instance_shutdown(node_current, instance):
2327
      logger.Error("could not shutdown instance")
2328

    
2329
    _ShutdownInstanceDisks(self, instance)
2330

    
2331

    
2332
class LUReinstallInstance(LogicalUnit):
2333
  """Reinstall an instance.
2334

2335
  """
2336
  HPATH = "instance-reinstall"
2337
  HTYPE = constants.HTYPE_INSTANCE
2338
  _OP_REQP = ["instance_name"]
2339
  REQ_BGL = False
2340

    
2341
  def ExpandNames(self):
2342
    self._ExpandAndLockInstance()
2343
    self.needed_locks[locking.LEVEL_NODE] = []
2344
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2345

    
2346
  def DeclareLocks(self, level):
2347
    if level == locking.LEVEL_NODE:
2348
      self._LockInstancesNodes()
2349

    
2350
  def BuildHooksEnv(self):
2351
    """Build hooks env.
2352

2353
    This runs on master, primary and secondary nodes of the instance.
2354

2355
    """
2356
    env = _BuildInstanceHookEnvByObject(self.instance)
2357
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2358
          list(self.instance.secondary_nodes))
2359
    return env, nl, nl
2360

    
2361
  def CheckPrereq(self):
2362
    """Check prerequisites.
2363

2364
    This checks that the instance is in the cluster and is not running.
2365

2366
    """
2367
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2368
    assert instance is not None, \
2369
      "Cannot retrieve locked instance %s" % self.op.instance_name
2370

    
2371
    if instance.disk_template == constants.DT_DISKLESS:
2372
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2373
                                 self.op.instance_name)
2374
    if instance.status != "down":
2375
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2376
                                 self.op.instance_name)
2377
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2378
                                              instance.name,
2379
                                              instance.hypervisor)
2380
    if remote_info:
2381
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2382
                                 (self.op.instance_name,
2383
                                  instance.primary_node))
2384

    
2385
    self.op.os_type = getattr(self.op, "os_type", None)
2386
    if self.op.os_type is not None:
2387
      # OS verification
2388
      pnode = self.cfg.GetNodeInfo(
2389
        self.cfg.ExpandNodeName(instance.primary_node))
2390
      if pnode is None:
2391
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2392
                                   self.op.pnode)
2393
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2394
      if not os_obj:
2395
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2396
                                   " primary node"  % self.op.os_type)
2397

    
2398
    self.instance = instance
2399

    
2400
  def Exec(self, feedback_fn):
2401
    """Reinstall the instance.
2402

2403
    """
2404
    inst = self.instance
2405

    
2406
    if self.op.os_type is not None:
2407
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2408
      inst.os = self.op.os_type
2409
      self.cfg.Update(inst)
2410

    
2411
    _StartInstanceDisks(self, inst, None)
2412
    try:
2413
      feedback_fn("Running the instance OS create scripts...")
2414
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2415
                                           "sda", "sdb"):
2416
        raise errors.OpExecError("Could not install OS for instance %s"
2417
                                 " on node %s" %
2418
                                 (inst.name, inst.primary_node))
2419
    finally:
2420
      _ShutdownInstanceDisks(self, inst)
2421

    
2422

    
2423
class LURenameInstance(LogicalUnit):
2424
  """Rename an instance.
2425

2426
  """
2427
  HPATH = "instance-rename"
2428
  HTYPE = constants.HTYPE_INSTANCE
2429
  _OP_REQP = ["instance_name", "new_name"]
2430

    
2431
  def BuildHooksEnv(self):
2432
    """Build hooks env.
2433

2434
    This runs on master, primary and secondary nodes of the instance.
2435

2436
    """
2437
    env = _BuildInstanceHookEnvByObject(self.instance)
2438
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2439
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2440
          list(self.instance.secondary_nodes))
2441
    return env, nl, nl
2442

    
2443
  def CheckPrereq(self):
2444
    """Check prerequisites.
2445

2446
    This checks that the instance is in the cluster and is not running.
2447

2448
    """
2449
    instance = self.cfg.GetInstanceInfo(
2450
      self.cfg.ExpandInstanceName(self.op.instance_name))
2451
    if instance is None:
2452
      raise errors.OpPrereqError("Instance '%s' not known" %
2453
                                 self.op.instance_name)
2454
    if instance.status != "down":
2455
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2456
                                 self.op.instance_name)
2457
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2458
                                              instance.name,
2459
                                              instance.hypervisor)
2460
    if remote_info:
2461
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2462
                                 (self.op.instance_name,
2463
                                  instance.primary_node))
2464
    self.instance = instance
2465

    
2466
    # new name verification
2467
    name_info = utils.HostInfo(self.op.new_name)
2468

    
2469
    self.op.new_name = new_name = name_info.name
2470
    instance_list = self.cfg.GetInstanceList()
2471
    if new_name in instance_list:
2472
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2473
                                 new_name)
2474

    
2475
    if not getattr(self.op, "ignore_ip", False):
2476
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2477
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2478
                                   (name_info.ip, new_name))
2479

    
2480

    
2481
  def Exec(self, feedback_fn):
2482
    """Reinstall the instance.
2483

2484
    """
2485
    inst = self.instance
2486
    old_name = inst.name
2487

    
2488
    if inst.disk_template == constants.DT_FILE:
2489
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2490

    
2491
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2492
    # Change the instance lock. This is definitely safe while we hold the BGL
2493
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2494
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2495

    
2496
    # re-read the instance from the configuration after rename
2497
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2498

    
2499
    if inst.disk_template == constants.DT_FILE:
2500
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2501
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2502
                                                     old_file_storage_dir,
2503
                                                     new_file_storage_dir)
2504

    
2505
      if not result:
2506
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2507
                                 " directory '%s' to '%s' (but the instance"
2508
                                 " has been renamed in Ganeti)" % (
2509
                                 inst.primary_node, old_file_storage_dir,
2510
                                 new_file_storage_dir))
2511

    
2512
      if not result[0]:
2513
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2514
                                 " (but the instance has been renamed in"
2515
                                 " Ganeti)" % (old_file_storage_dir,
2516
                                               new_file_storage_dir))
2517

    
2518
    _StartInstanceDisks(self, inst, None)
2519
    try:
2520
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2521
                                               old_name,
2522
                                               "sda", "sdb"):
2523
        msg = ("Could not run OS rename script for instance %s on node %s"
2524
               " (but the instance has been renamed in Ganeti)" %
2525
               (inst.name, inst.primary_node))
2526
        logger.Error(msg)
2527
    finally:
2528
      _ShutdownInstanceDisks(self, inst)
2529

    
2530

    
2531
class LURemoveInstance(LogicalUnit):
2532
  """Remove an instance.
2533

2534
  """
2535
  HPATH = "instance-remove"
2536
  HTYPE = constants.HTYPE_INSTANCE
2537
  _OP_REQP = ["instance_name", "ignore_failures"]
2538
  REQ_BGL = False
2539

    
2540
  def ExpandNames(self):
2541
    self._ExpandAndLockInstance()
2542
    self.needed_locks[locking.LEVEL_NODE] = []
2543
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2544

    
2545
  def DeclareLocks(self, level):
2546
    if level == locking.LEVEL_NODE:
2547
      self._LockInstancesNodes()
2548

    
2549
  def BuildHooksEnv(self):
2550
    """Build hooks env.
2551

2552
    This runs on master, primary and secondary nodes of the instance.
2553

2554
    """
2555
    env = _BuildInstanceHookEnvByObject(self.instance)
2556
    nl = [self.cfg.GetMasterNode()]
2557
    return env, nl, nl
2558

    
2559
  def CheckPrereq(self):
2560
    """Check prerequisites.
2561

2562
    This checks that the instance is in the cluster.
2563

2564
    """
2565
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2566
    assert self.instance is not None, \
2567
      "Cannot retrieve locked instance %s" % self.op.instance_name
2568

    
2569
  def Exec(self, feedback_fn):
2570
    """Remove the instance.
2571

2572
    """
2573
    instance = self.instance
2574
    logger.Info("shutting down instance %s on node %s" %
2575
                (instance.name, instance.primary_node))
2576

    
2577
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2578
      if self.op.ignore_failures:
2579
        feedback_fn("Warning: can't shutdown instance")
2580
      else:
2581
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2582
                                 (instance.name, instance.primary_node))
2583

    
2584
    logger.Info("removing block devices for instance %s" % instance.name)
2585

    
2586
    if not _RemoveDisks(self, instance):
2587
      if self.op.ignore_failures:
2588
        feedback_fn("Warning: can't remove instance's disks")
2589
      else:
2590
        raise errors.OpExecError("Can't remove instance's disks")
2591

    
2592
    logger.Info("removing instance %s out of cluster config" % instance.name)
2593

    
2594
    self.cfg.RemoveInstance(instance.name)
2595
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2596

    
2597

    
2598
class LUQueryInstances(NoHooksLU):
2599
  """Logical unit for querying instances.
2600

2601
  """
2602
  _OP_REQP = ["output_fields", "names"]
2603
  REQ_BGL = False
2604

    
2605
  def ExpandNames(self):
2606
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2607
    self.static_fields = frozenset([
2608
      "name", "os", "pnode", "snodes",
2609
      "admin_state", "admin_ram",
2610
      "disk_template", "ip", "mac", "bridge",
2611
      "sda_size", "sdb_size", "vcpus", "tags",
2612
      "network_port", "kernel_path", "initrd_path",
2613
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2614
      "hvm_cdrom_image_path", "hvm_nic_type",
2615
      "hvm_disk_type", "vnc_bind_address",
2616
      "serial_no", "hypervisor",
2617
      ])
2618
    _CheckOutputFields(static=self.static_fields,
2619
                       dynamic=self.dynamic_fields,
2620
                       selected=self.op.output_fields)
2621

    
2622
    self.needed_locks = {}
2623
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2624
    self.share_locks[locking.LEVEL_NODE] = 1
2625

    
2626
    if self.op.names:
2627
      self.wanted = _GetWantedInstances(self, self.op.names)
2628
    else:
2629
      self.wanted = locking.ALL_SET
2630

    
2631
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2632
    if self.do_locking:
2633
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2634
      self.needed_locks[locking.LEVEL_NODE] = []
2635
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2636

    
2637
  def DeclareLocks(self, level):
2638
    if level == locking.LEVEL_NODE and self.do_locking:
2639
      self._LockInstancesNodes()
2640

    
2641
  def CheckPrereq(self):
2642
    """Check prerequisites.
2643

2644
    """
2645
    pass
2646

    
2647
  def Exec(self, feedback_fn):
2648
    """Computes the list of nodes and their attributes.
2649

2650
    """
2651
    all_info = self.cfg.GetAllInstancesInfo()
2652
    if self.do_locking:
2653
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2654
    elif self.wanted != locking.ALL_SET:
2655
      instance_names = self.wanted
2656
      missing = set(instance_names).difference(all_info.keys())
2657
      if missing:
2658
        raise errors.OpExecError(
2659
          "Some instances were removed before retrieving their data: %s"
2660
          % missing)
2661
    else:
2662
      instance_names = all_info.keys()
2663
    instance_list = [all_info[iname] for iname in instance_names]
2664

    
2665
    # begin data gathering
2666

    
2667
    nodes = frozenset([inst.primary_node for inst in instance_list])
2668
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2669

    
2670
    bad_nodes = []
2671
    if self.dynamic_fields.intersection(self.op.output_fields):
2672
      live_data = {}
2673
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2674
      for name in nodes:
2675
        result = node_data[name]
2676
        if result:
2677
          live_data.update(result)
2678
        elif result == False:
2679
          bad_nodes.append(name)
2680
        # else no instance is alive
2681
    else:
2682
      live_data = dict([(name, {}) for name in instance_names])
2683

    
2684
    # end data gathering
2685

    
2686
    output = []
2687
    for instance in instance_list:
2688
      iout = []
2689
      for field in self.op.output_fields:
2690
        if field == "name":
2691
          val = instance.name
2692
        elif field == "os":
2693
          val = instance.os
2694
        elif field == "pnode":
2695
          val = instance.primary_node
2696
        elif field == "snodes":
2697
          val = list(instance.secondary_nodes)
2698
        elif field == "admin_state":
2699
          val = (instance.status != "down")
2700
        elif field == "oper_state":
2701
          if instance.primary_node in bad_nodes:
2702
            val = None
2703
          else:
2704
            val = bool(live_data.get(instance.name))
2705
        elif field == "status":
2706
          if instance.primary_node in bad_nodes:
2707
            val = "ERROR_nodedown"
2708
          else:
2709
            running = bool(live_data.get(instance.name))
2710
            if running:
2711
              if instance.status != "down":
2712
                val = "running"
2713
              else:
2714
                val = "ERROR_up"
2715
            else:
2716
              if instance.status != "down":
2717
                val = "ERROR_down"
2718
              else:
2719
                val = "ADMIN_down"
2720
        elif field == "admin_ram":
2721
          val = instance.memory
2722
        elif field == "oper_ram":
2723
          if instance.primary_node in bad_nodes:
2724
            val = None
2725
          elif instance.name in live_data:
2726
            val = live_data[instance.name].get("memory", "?")
2727
          else:
2728
            val = "-"
2729
        elif field == "disk_template":
2730
          val = instance.disk_template
2731
        elif field == "ip":
2732
          val = instance.nics[0].ip
2733
        elif field == "bridge":
2734
          val = instance.nics[0].bridge
2735
        elif field == "mac":
2736
          val = instance.nics[0].mac
2737
        elif field == "sda_size" or field == "sdb_size":
2738
          disk = instance.FindDisk(field[:3])
2739
          if disk is None:
2740
            val = None
2741
          else:
2742
            val = disk.size
2743
        elif field == "vcpus":
2744
          val = instance.vcpus
2745
        elif field == "tags":
2746
          val = list(instance.GetTags())
2747
        elif field == "serial_no":
2748
          val = instance.serial_no
2749
        elif field in ("network_port", "kernel_path", "initrd_path",
2750
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2751
                       "hvm_cdrom_image_path", "hvm_nic_type",
2752
                       "hvm_disk_type", "vnc_bind_address"):
2753
          val = getattr(instance, field, None)
2754
          if val is not None:
2755
            pass
2756
          elif field in ("hvm_nic_type", "hvm_disk_type",
2757
                         "kernel_path", "initrd_path"):
2758
            val = "default"
2759
          else:
2760
            val = "-"
2761
        elif field == "hypervisor":
2762
          val = instance.hypervisor
2763
        else:
2764
          raise errors.ParameterError(field)
2765
        iout.append(val)
2766
      output.append(iout)
2767

    
2768
    return output
2769

    
2770

    
2771
class LUFailoverInstance(LogicalUnit):
2772
  """Failover an instance.
2773

2774
  """
2775
  HPATH = "instance-failover"
2776
  HTYPE = constants.HTYPE_INSTANCE
2777
  _OP_REQP = ["instance_name", "ignore_consistency"]
2778
  REQ_BGL = False
2779

    
2780
  def ExpandNames(self):
2781
    self._ExpandAndLockInstance()
2782
    self.needed_locks[locking.LEVEL_NODE] = []
2783
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2784

    
2785
  def DeclareLocks(self, level):
2786
    if level == locking.LEVEL_NODE:
2787
      self._LockInstancesNodes()
2788

    
2789
  def BuildHooksEnv(self):
2790
    """Build hooks env.
2791

2792
    This runs on master, primary and secondary nodes of the instance.
2793

2794
    """
2795
    env = {
2796
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2797
      }
2798
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2799
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2800
    return env, nl, nl
2801

    
2802
  def CheckPrereq(self):
2803
    """Check prerequisites.
2804

2805
    This checks that the instance is in the cluster.
2806

2807
    """
2808
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2809
    assert self.instance is not None, \
2810
      "Cannot retrieve locked instance %s" % self.op.instance_name
2811

    
2812
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2813
      raise errors.OpPrereqError("Instance's disk layout is not"
2814
                                 " network mirrored, cannot failover.")
2815

    
2816
    secondary_nodes = instance.secondary_nodes
2817
    if not secondary_nodes:
2818
      raise errors.ProgrammerError("no secondary node but using "
2819
                                   "a mirrored disk template")
2820

    
2821
    target_node = secondary_nodes[0]
2822
    # check memory requirements on the secondary node
2823
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2824
                         instance.name, instance.memory,
2825
                         instance.hypervisor)
2826

    
2827
    # check bridge existance
2828
    brlist = [nic.bridge for nic in instance.nics]
2829
    if not self.rpc.call_bridges_exist(target_node, brlist):
2830
      raise errors.OpPrereqError("One or more target bridges %s does not"
2831
                                 " exist on destination node '%s'" %
2832
                                 (brlist, target_node))
2833

    
2834
  def Exec(self, feedback_fn):
2835
    """Failover an instance.
2836

2837
    The failover is done by shutting it down on its present node and
2838
    starting it on the secondary.
2839

2840
    """
2841
    instance = self.instance
2842

    
2843
    source_node = instance.primary_node
2844
    target_node = instance.secondary_nodes[0]
2845

    
2846
    feedback_fn("* checking disk consistency between source and target")
2847
    for dev in instance.disks:
2848
      # for drbd, these are drbd over lvm
2849
      if not _CheckDiskConsistency(self, dev, target_node, False):
2850
        if instance.status == "up" and not self.op.ignore_consistency:
2851
          raise errors.OpExecError("Disk %s is degraded on target node,"
2852
                                   " aborting failover." % dev.iv_name)
2853

    
2854
    feedback_fn("* shutting down instance on source node")
2855
    logger.Info("Shutting down instance %s on node %s" %
2856
                (instance.name, source_node))
2857

    
2858
    if not self.rpc.call_instance_shutdown(source_node, instance):
2859
      if self.op.ignore_consistency:
2860
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2861
                     " anyway. Please make sure node %s is down"  %
2862
                     (instance.name, source_node, source_node))
2863
      else:
2864
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2865
                                 (instance.name, source_node))
2866

    
2867
    feedback_fn("* deactivating the instance's disks on source node")
2868
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2869
      raise errors.OpExecError("Can't shut down the instance's disks.")
2870

    
2871
    instance.primary_node = target_node
2872
    # distribute new instance config to the other nodes
2873
    self.cfg.Update(instance)
2874

    
2875
    # Only start the instance if it's marked as up
2876
    if instance.status == "up":
2877
      feedback_fn("* activating the instance's disks on target node")
2878
      logger.Info("Starting instance %s on node %s" %
2879
                  (instance.name, target_node))
2880

    
2881
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2882
                                               ignore_secondaries=True)
2883
      if not disks_ok:
2884
        _ShutdownInstanceDisks(self, instance)
2885
        raise errors.OpExecError("Can't activate the instance's disks")
2886

    
2887
      feedback_fn("* starting the instance on the target node")
2888
      if not self.rpc.call_instance_start(target_node, instance, None):
2889
        _ShutdownInstanceDisks(self, instance)
2890
        raise errors.OpExecError("Could not start instance %s on node %s." %
2891
                                 (instance.name, target_node))
2892

    
2893

    
2894
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2895
  """Create a tree of block devices on the primary node.
2896

2897
  This always creates all devices.
2898

2899
  """
2900
  if device.children:
2901
    for child in device.children:
2902
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2903
        return False
2904

    
2905
  lu.cfg.SetDiskID(device, node)
2906
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2907
                                       instance.name, True, info)
2908
  if not new_id:
2909
    return False
2910
  if device.physical_id is None:
2911
    device.physical_id = new_id
2912
  return True
2913

    
2914

    
2915
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2916
  """Create a tree of block devices on a secondary node.
2917

2918
  If this device type has to be created on secondaries, create it and
2919
  all its children.
2920

2921
  If not, just recurse to children keeping the same 'force' value.
2922

2923
  """
2924
  if device.CreateOnSecondary():
2925
    force = True
2926
  if device.children:
2927
    for child in device.children:
2928
      if not _CreateBlockDevOnSecondary(lu, node, instance,
2929
                                        child, force, info):
2930
        return False
2931

    
2932
  if not force:
2933
    return True
2934
  lu.cfg.SetDiskID(device, node)
2935
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2936
                                       instance.name, False, info)
2937
  if not new_id:
2938
    return False
2939
  if device.physical_id is None:
2940
    device.physical_id = new_id
2941
  return True
2942

    
2943

    
2944
def _GenerateUniqueNames(lu, exts):
2945
  """Generate a suitable LV name.
2946

2947
  This will generate a logical volume name for the given instance.
2948

2949
  """
2950
  results = []
2951
  for val in exts:
2952
    new_id = lu.cfg.GenerateUniqueID()
2953
    results.append("%s%s" % (new_id, val))
2954
  return results
2955

    
2956

    
2957
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2958
                         p_minor, s_minor):
2959
  """Generate a drbd8 device complete with its children.
2960

2961
  """
2962
  port = lu.cfg.AllocatePort()
2963
  vgname = lu.cfg.GetVGName()
2964
  shared_secret = lu.cfg.GenerateDRBDSecret()
2965
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2966
                          logical_id=(vgname, names[0]))
2967
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2968
                          logical_id=(vgname, names[1]))
2969
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2970
                          logical_id=(primary, secondary, port,
2971
                                      p_minor, s_minor,
2972
                                      shared_secret),
2973
                          children=[dev_data, dev_meta],
2974
                          iv_name=iv_name)
2975
  return drbd_dev
2976

    
2977

    
2978
def _GenerateDiskTemplate(lu, template_name,
2979
                          instance_name, primary_node,
2980
                          secondary_nodes, disk_sz, swap_sz,
2981
                          file_storage_dir, file_driver):
2982
  """Generate the entire disk layout for a given template type.
2983

2984
  """
2985
  #TODO: compute space requirements
2986

    
2987
  vgname = lu.cfg.GetVGName()
2988
  if template_name == constants.DT_DISKLESS:
2989
    disks = []
2990
  elif template_name == constants.DT_PLAIN:
2991
    if len(secondary_nodes) != 0:
2992
      raise errors.ProgrammerError("Wrong template configuration")
2993

    
2994
    names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
2995
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2996
                           logical_id=(vgname, names[0]),
2997
                           iv_name = "sda")
2998
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2999
                           logical_id=(vgname, names[1]),
3000
                           iv_name = "sdb")
3001
    disks = [sda_dev, sdb_dev]
3002
  elif template_name == constants.DT_DRBD8:
3003
    if len(secondary_nodes) != 1:
3004
      raise errors.ProgrammerError("Wrong template configuration")
3005
    remote_node = secondary_nodes[0]
3006
    (minor_pa, minor_pb,
3007
     minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3008
      [primary_node, primary_node, remote_node, remote_node], instance_name)
3009

    
3010
    names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3011
                                      ".sdb_data", ".sdb_meta"])
3012
    drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3013
                                        disk_sz, names[0:2], "sda",
3014
                                        minor_pa, minor_sa)
3015
    drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3016
                                        swap_sz, names[2:4], "sdb",
3017
                                        minor_pb, minor_sb)
3018
    disks = [drbd_sda_dev, drbd_sdb_dev]
3019
  elif template_name == constants.DT_FILE:
3020
    if len(secondary_nodes) != 0:
3021
      raise errors.ProgrammerError("Wrong template configuration")
3022

    
3023
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3024
                                iv_name="sda", logical_id=(file_driver,
3025
                                "%s/sda" % file_storage_dir))
3026
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3027
                                iv_name="sdb", logical_id=(file_driver,
3028
                                "%s/sdb" % file_storage_dir))
3029
    disks = [file_sda_dev, file_sdb_dev]
3030
  else:
3031
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3032
  return disks
3033

    
3034

    
3035
def _GetInstanceInfoText(instance):
3036
  """Compute that text that should be added to the disk's metadata.
3037

3038
  """
3039
  return "originstname+%s" % instance.name
3040

    
3041

    
3042
def _CreateDisks(lu, instance):
3043
  """Create all disks for an instance.
3044

3045
  This abstracts away some work from AddInstance.
3046

3047
  Args:
3048
    instance: the instance object
3049

3050
  Returns:
3051
    True or False showing the success of the creation process
3052

3053
  """
3054
  info = _GetInstanceInfoText(instance)
3055

    
3056
  if instance.disk_template == constants.DT_FILE:
3057
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3058
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3059
                                                 file_storage_dir)
3060

    
3061
    if not result:
3062
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3063
      return False
3064

    
3065
    if not result[0]:
3066
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3067
      return False
3068

    
3069
  for device in instance.disks:
3070
    logger.Info("creating volume %s for instance %s" %
3071
                (device.iv_name, instance.name))
3072
    #HARDCODE
3073
    for secondary_node in instance.secondary_nodes:
3074
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3075
                                        device, False, info):
3076
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3077
                     (device.iv_name, device, secondary_node))
3078
        return False
3079
    #HARDCODE
3080
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3081
                                    instance, device, info):
3082
      logger.Error("failed to create volume %s on primary!" %
3083
                   device.iv_name)
3084
      return False
3085

    
3086
  return True
3087

    
3088

    
3089
def _RemoveDisks(lu, instance):
3090
  """Remove all disks for an instance.
3091

3092
  This abstracts away some work from `AddInstance()` and
3093
  `RemoveInstance()`. Note that in case some of the devices couldn't
3094
  be removed, the removal will continue with the other ones (compare
3095
  with `_CreateDisks()`).
3096

3097
  Args:
3098
    instance: the instance object
3099

3100
  Returns:
3101
    True or False showing the success of the removal proces
3102

3103
  """
3104
  logger.Info("removing block devices for instance %s" % instance.name)
3105

    
3106
  result = True
3107
  for device in instance.disks:
3108
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3109
      lu.cfg.SetDiskID(disk, node)
3110
      if not lu.rpc.call_blockdev_remove(node, disk):
3111
        logger.Error("could not remove block device %s on node %s,"
3112
                     " continuing anyway" %
3113
                     (device.iv_name, node))
3114
        result = False
3115

    
3116
  if instance.disk_template == constants.DT_FILE:
3117
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3118
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3119
                                               file_storage_dir):
3120
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3121
      result = False
3122

    
3123
  return result
3124

    
3125

    
3126
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3127
  """Compute disk size requirements in the volume group
3128

3129
  This is currently hard-coded for the two-drive layout.
3130

3131
  """
3132
  # Required free disk space as a function of disk and swap space
3133
  req_size_dict = {
3134
    constants.DT_DISKLESS: None,
3135
    constants.DT_PLAIN: disk_size + swap_size,
3136
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3137
    constants.DT_DRBD8: disk_size + swap_size + 256,
3138
    constants.DT_FILE: None,
3139
  }
3140

    
3141
  if disk_template not in req_size_dict:
3142
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3143
                                 " is unknown" %  disk_template)
3144

    
3145
  return req_size_dict[disk_template]
3146

    
3147

    
3148
class LUCreateInstance(LogicalUnit):
3149
  """Create an instance.
3150

3151
  """
3152
  HPATH = "instance-add"
3153
  HTYPE = constants.HTYPE_INSTANCE
3154
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3155
              "disk_template", "swap_size", "mode", "start", "vcpus",
3156
              "wait_for_sync", "ip_check", "mac", "hvparams"]
3157
  REQ_BGL = False
3158

    
3159
  def _ExpandNode(self, node):
3160
    """Expands and checks one node name.
3161

3162
    """
3163
    node_full = self.cfg.ExpandNodeName(node)
3164
    if node_full is None:
3165
      raise errors.OpPrereqError("Unknown node %s" % node)
3166
    return node_full
3167

    
3168
  def ExpandNames(self):
3169
    """ExpandNames for CreateInstance.
3170

3171
    Figure out the right locks for instance creation.
3172

3173
    """
3174
    self.needed_locks = {}
3175

    
3176
    # set optional parameters to none if they don't exist
3177
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3178
      if not hasattr(self.op, attr):
3179
        setattr(self.op, attr, None)
3180

    
3181
    # cheap checks, mostly valid constants given
3182

    
3183
    # verify creation mode
3184
    if self.op.mode not in (constants.INSTANCE_CREATE,
3185
                            constants.INSTANCE_IMPORT):
3186
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3187
                                 self.op.mode)
3188

    
3189
    # disk template and mirror node verification
3190
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3191
      raise errors.OpPrereqError("Invalid disk template name")
3192

    
3193
    if self.op.hypervisor is None:
3194
      self.op.hypervisor = self.cfg.GetHypervisorType()
3195

    
3196
    enabled_hvs = self.cfg.GetClusterInfo().enabled_hypervisors
3197
    if self.op.hypervisor not in enabled_hvs:
3198
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3199
                                 " cluster (%s)" % (self.op.hypervisor,
3200
                                  ",".join(enabled_hvs)))
3201

    
3202
    # check hypervisor parameter syntax (locally)
3203

    
3204
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3205
    hv_type.CheckParameterSyntax(self.op.hvparams)
3206

    
3207
    #### instance parameters check
3208

    
3209
    # instance name verification
3210
    hostname1 = utils.HostInfo(self.op.instance_name)
3211
    self.op.instance_name = instance_name = hostname1.name
3212

    
3213
    # this is just a preventive check, but someone might still add this
3214
    # instance in the meantime, and creation will fail at lock-add time
3215
    if instance_name in self.cfg.GetInstanceList():
3216
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3217
                                 instance_name)
3218

    
3219
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3220

    
3221
    # ip validity checks
3222
    ip = getattr(self.op, "ip", None)
3223
    if ip is None or ip.lower() == "none":
3224
      inst_ip = None
3225
    elif ip.lower() == "auto":
3226
      inst_ip = hostname1.ip
3227
    else:
3228
      if not utils.IsValidIP(ip):
3229
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3230
                                   " like a valid IP" % ip)
3231
      inst_ip = ip
3232
    self.inst_ip = self.op.ip = inst_ip
3233
    # used in CheckPrereq for ip ping check
3234
    self.check_ip = hostname1.ip
3235

    
3236
    # MAC address verification
3237
    if self.op.mac != "auto":
3238
      if not utils.IsValidMac(self.op.mac.lower()):
3239
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3240
                                   self.op.mac)
3241

    
3242
    # file storage checks
3243
    if (self.op.file_driver and
3244
        not self.op.file_driver in constants.FILE_DRIVER):
3245
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3246
                                 self.op.file_driver)
3247

    
3248
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3249
      raise errors.OpPrereqError("File storage directory path not absolute")
3250

    
3251
    ### Node/iallocator related checks
3252
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3253
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3254
                                 " node must be given")
3255

    
3256
    if self.op.iallocator:
3257
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3258
    else:
3259
      self.op.pnode = self._ExpandNode(self.op.pnode)
3260
      nodelist = [self.op.pnode]
3261
      if self.op.snode is not None:
3262
        self.op.snode = self._ExpandNode(self.op.snode)
3263
        nodelist.append(self.op.snode)
3264
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3265

    
3266
    # in case of import lock the source node too
3267
    if self.op.mode == constants.INSTANCE_IMPORT:
3268
      src_node = getattr(self.op, "src_node", None)
3269
      src_path = getattr(self.op, "src_path", None)
3270

    
3271
      if src_node is None or src_path is None:
3272
        raise errors.OpPrereqError("Importing an instance requires source"
3273
                                   " node and path options")
3274

    
3275
      if not os.path.isabs(src_path):
3276
        raise errors.OpPrereqError("The source path must be absolute")
3277

    
3278
      self.op.src_node = src_node = self._ExpandNode(src_node)
3279
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3280
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3281

    
3282
    else: # INSTANCE_CREATE
3283
      if getattr(self.op, "os_type", None) is None:
3284
        raise errors.OpPrereqError("No guest OS specified")
3285

    
3286
  def _RunAllocator(self):
3287
    """Run the allocator based on input opcode.
3288

3289
    """
3290
    disks = [{"size": self.op.disk_size, "mode": "w"},
3291
             {"size": self.op.swap_size, "mode": "w"}]
3292
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3293
             "bridge": self.op.bridge}]
3294
    ial = IAllocator(self,
3295
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3296
                     name=self.op.instance_name,
3297
                     disk_template=self.op.disk_template,
3298
                     tags=[],
3299
                     os=self.op.os_type,
3300
                     vcpus=self.op.vcpus,
3301
                     mem_size=self.op.mem_size,
3302
                     disks=disks,
3303
                     nics=nics,
3304
                     )
3305

    
3306
    ial.Run(self.op.iallocator)
3307

    
3308
    if not ial.success:
3309
      raise errors.OpPrereqError("Can't compute nodes using"
3310
                                 " iallocator '%s': %s" % (self.op.iallocator,
3311
                                                           ial.info))
3312
    if len(ial.nodes) != ial.required_nodes:
3313
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3314
                                 " of nodes (%s), required %s" %
3315
                                 (self.op.iallocator, len(ial.nodes),
3316
                                  ial.required_nodes))
3317
    self.op.pnode = ial.nodes[0]
3318
    logger.ToStdout("Selected nodes for the instance: %s" %
3319
                    (", ".join(ial.nodes),))
3320
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3321
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3322
    if ial.required_nodes == 2:
3323
      self.op.snode = ial.nodes[1]
3324

    
3325
  def BuildHooksEnv(self):
3326
    """Build hooks env.
3327

3328
    This runs on master, primary and secondary nodes of the instance.
3329

3330
    """
3331
    env = {
3332
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3333
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3334
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3335
      "INSTANCE_ADD_MODE": self.op.mode,
3336
      }
3337
    if self.op.mode == constants.INSTANCE_IMPORT:
3338
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3339
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3340
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3341

    
3342
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3343
      primary_node=self.op.pnode,
3344
      secondary_nodes=self.secondaries,
3345
      status=self.instance_status,
3346
      os_type=self.op.os_type,
3347
      memory=self.op.mem_size,
3348
      vcpus=self.op.vcpus,
3349
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3350
    ))
3351

    
3352
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3353
          self.secondaries)
3354
    return env, nl, nl
3355

    
3356

    
3357
  def CheckPrereq(self):
3358
    """Check prerequisites.
3359

3360
    """
3361
    if (not self.cfg.GetVGName() and
3362
        self.op.disk_template not in constants.DTS_NOT_LVM):
3363
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3364
                                 " instances")
3365

    
3366

    
3367
    if self.op.mode == constants.INSTANCE_IMPORT:
3368
      src_node = self.op.src_node
3369
      src_path = self.op.src_path
3370

    
3371
      export_info = self.rpc.call_export_info(src_node, src_path)
3372

    
3373
      if not export_info:
3374
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3375

    
3376
      if not export_info.has_section(constants.INISECT_EXP):
3377
        raise errors.ProgrammerError("Corrupted export config")
3378

    
3379
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3380
      if (int(ei_version) != constants.EXPORT_VERSION):
3381
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3382
                                   (ei_version, constants.EXPORT_VERSION))
3383

    
3384
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3385
        raise errors.OpPrereqError("Can't import instance with more than"
3386
                                   " one data disk")
3387

    
3388
      # FIXME: are the old os-es, disk sizes, etc. useful?
3389
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3390
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3391
                                                         'disk0_dump'))
3392
      self.src_image = diskimage
3393

    
3394
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3395

    
3396
    if self.op.start and not self.op.ip_check:
3397
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3398
                                 " adding an instance in start mode")
3399

    
3400
    if self.op.ip_check:
3401
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3402
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3403
                                   (self.check_ip, self.op.instance_name))
3404

    
3405
    # bridge verification
3406
    bridge = getattr(self.op, "bridge", None)
3407
    if bridge is None:
3408
      self.op.bridge = self.cfg.GetDefBridge()
3409
    else:
3410
      self.op.bridge = bridge
3411

    
3412
    #### allocator run
3413

    
3414
    if self.op.iallocator is not None:
3415
      self._RunAllocator()
3416

    
3417
    #### node related checks
3418

    
3419
    # check primary node
3420
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3421
    assert self.pnode is not None, \
3422
      "Cannot retrieve locked node %s" % self.op.pnode
3423
    self.secondaries = []
3424

    
3425
    # mirror node verification
3426
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3427
      if self.op.snode is None:
3428
        raise errors.OpPrereqError("The networked disk templates need"
3429
                                   " a mirror node")
3430
      if self.op.snode == pnode.name:
3431
        raise errors.OpPrereqError("The secondary node cannot be"
3432
                                   " the primary node.")
3433
      self.secondaries.append(self.op.snode)
3434

    
3435
    nodenames = [pnode.name] + self.secondaries
3436

    
3437
    req_size = _ComputeDiskSize(self.op.disk_template,
3438
                                self.op.disk_size, self.op.swap_size)
3439

    
3440
    # Check lv size requirements
3441
    if req_size is not None:
3442
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3443
                                         self.op.hypervisor)
3444
      for node in nodenames:
3445
        info = nodeinfo.get(node, None)
3446
        if not info:
3447
          raise errors.OpPrereqError("Cannot get current information"
3448
                                     " from node '%s'" % node)
3449
        vg_free = info.get('vg_free', None)
3450
        if not isinstance(vg_free, int):
3451
          raise errors.OpPrereqError("Can't compute free disk space on"
3452
                                     " node %s" % node)
3453
        if req_size > info['vg_free']:
3454
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3455
                                     " %d MB available, %d MB required" %
3456
                                     (node, info['vg_free'], req_size))
3457

    
3458
    # hypervisor parameter validation
3459
    hvinfo = self.rpc.call_hypervisor_validate_params(nodenames,
3460
                                                      self.op.hypervisor,
3461
                                                      self.op.hvparams)
3462
    for node in nodenames:
3463
      info = hvinfo.get(node, None)
3464
      if not info or not isinstance(info, (tuple, list)):
3465
        raise errors.OpPrereqError("Cannot get current information"
3466
                                   " from node '%s' (%s)" % (node, info))
3467
      if not info[0]:
3468
        raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3469
                                   " %s" % info[1])
3470

    
3471
    # os verification
3472
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3473
    if not os_obj:
3474
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3475
                                 " primary node"  % self.op.os_type)
3476

    
3477
    # bridge check on primary node
3478
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3479
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3480
                                 " destination node '%s'" %
3481
                                 (self.op.bridge, pnode.name))
3482

    
3483
    # memory check on primary node
3484
    if self.op.start:
3485
      _CheckNodeFreeMemory(self, self.pnode.name,
3486
                           "creating instance %s" % self.op.instance_name,
3487
                           self.op.mem_size, self.op.hypervisor)
3488

    
3489
    if self.op.start:
3490
      self.instance_status = 'up'
3491
    else:
3492
      self.instance_status = 'down'
3493

    
3494
  def Exec(self, feedback_fn):
3495
    """Create and add the instance to the cluster.
3496

3497
    """
3498
    instance = self.op.instance_name
3499
    pnode_name = self.pnode.name
3500

    
3501
    if self.op.mac == "auto":
3502
      mac_address = self.cfg.GenerateMAC()
3503
    else:
3504
      mac_address = self.op.mac
3505

    
3506
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3507
    if self.inst_ip is not None:
3508
      nic.ip = self.inst_ip
3509

    
3510
    ht_kind = self.op.hypervisor
3511
    if ht_kind in constants.HTS_REQ_PORT:
3512
      network_port = self.cfg.AllocatePort()
3513
    else:
3514
      network_port = None
3515

    
3516
    ##if self.op.vnc_bind_address is None:
3517
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3518

    
3519
    # this is needed because os.path.join does not accept None arguments
3520
    if self.op.file_storage_dir is None:
3521
      string_file_storage_dir = ""
3522
    else:
3523
      string_file_storage_dir = self.op.file_storage_dir
3524

    
3525
    # build the full file storage dir path
3526
    file_storage_dir = os.path.normpath(os.path.join(
3527
                                        self.cfg.GetFileStorageDir(),
3528
                                        string_file_storage_dir, instance))
3529

    
3530

    
3531
    disks = _GenerateDiskTemplate(self,
3532
                                  self.op.disk_template,
3533
                                  instance, pnode_name,
3534
                                  self.secondaries, self.op.disk_size,
3535
                                  self.op.swap_size,
3536
                                  file_storage_dir,
3537
                                  self.op.file_driver)
3538

    
3539
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3540
                            primary_node=pnode_name,
3541
                            memory=self.op.mem_size,
3542
                            vcpus=self.op.vcpus,
3543
                            nics=[nic], disks=disks,
3544
                            disk_template=self.op.disk_template,
3545
                            status=self.instance_status,
3546
                            network_port=network_port,
3547
                            hvparams=self.op.hvparams,
3548
                            hypervisor=self.op.hypervisor,
3549
                            )
3550

    
3551
    feedback_fn("* creating instance disks...")
3552
    if not _CreateDisks(self, iobj):
3553
      _RemoveDisks(self, iobj)
3554
      self.cfg.ReleaseDRBDMinors(instance)
3555
      raise errors.OpExecError("Device creation failed, reverting...")
3556

    
3557
    feedback_fn("adding instance %s to cluster config" % instance)
3558

    
3559
    self.cfg.AddInstance(iobj)
3560
    # Declare that we don't want to remove the instance lock anymore, as we've
3561
    # added the instance to the config
3562
    del self.remove_locks[locking.LEVEL_INSTANCE]
3563
    # Remove the temp. assignements for the instance's drbds
3564
    self.cfg.ReleaseDRBDMinors(instance)
3565

    
3566
    if self.op.wait_for_sync:
3567
      disk_abort = not _WaitForSync(self, iobj)
3568
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3569
      # make sure the disks are not degraded (still sync-ing is ok)
3570
      time.sleep(15)
3571
      feedback_fn("* checking mirrors status")
3572
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3573
    else:
3574
      disk_abort = False
3575

    
3576
    if disk_abort:
3577
      _RemoveDisks(self, iobj)
3578
      self.cfg.RemoveInstance(iobj.name)
3579
      # Make sure the instance lock gets removed
3580
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3581
      raise errors.OpExecError("There are some degraded disks for"
3582
                               " this instance")
3583

    
3584
    feedback_fn("creating os for instance %s on node %s" %
3585
                (instance, pnode_name))
3586

    
3587
    if iobj.disk_template != constants.DT_DISKLESS:
3588
      if self.op.mode == constants.INSTANCE_CREATE:
3589
        feedback_fn("* running the instance OS create scripts...")
3590
        if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3591
          raise errors.OpExecError("could not add os for instance %s"
3592
                                   " on node %s" %
3593
                                   (instance, pnode_name))
3594

    
3595
      elif self.op.mode == constants.INSTANCE_IMPORT:
3596
        feedback_fn("* running the instance OS import scripts...")
3597
        src_node = self.op.src_node
3598
        src_image = self.src_image
3599
        cluster_name = self.cfg.GetClusterName()
3600
        if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3601
                                                src_node, src_image,
3602
                                                cluster_name):
3603
          raise errors.OpExecError("Could not import os for instance"
3604
                                   " %s on node %s" %
3605
                                   (instance, pnode_name))
3606
      else:
3607
        # also checked in the prereq part
3608
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3609
                                     % self.op.mode)
3610

    
3611
    if self.op.start:
3612
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3613
      feedback_fn("* starting instance...")
3614
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3615
        raise errors.OpExecError("Could not start instance")
3616

    
3617

    
3618
class LUConnectConsole(NoHooksLU):
3619
  """Connect to an instance's console.
3620

3621
  This is somewhat special in that it returns the command line that
3622
  you need to run on the master node in order to connect to the
3623
  console.
3624

3625
  """
3626
  _OP_REQP = ["instance_name"]
3627
  REQ_BGL = False
3628

    
3629
  def ExpandNames(self):
3630
    self._ExpandAndLockInstance()
3631

    
3632
  def CheckPrereq(self):
3633
    """Check prerequisites.
3634

3635
    This checks that the instance is in the cluster.
3636

3637
    """
3638
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3639
    assert self.instance is not None, \
3640
      "Cannot retrieve locked instance %s" % self.op.instance_name
3641

    
3642
  def Exec(self, feedback_fn):
3643
    """Connect to the console of an instance
3644

3645
    """
3646
    instance = self.instance
3647
    node = instance.primary_node
3648

    
3649
    node_insts = self.rpc.call_instance_list([node],
3650
                                             [instance.hypervisor])[node]
3651
    if node_insts is False:
3652
      raise errors.OpExecError("Can't connect to node %s." % node)
3653

    
3654
    if instance.name not in node_insts:
3655
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3656

    
3657
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3658

    
3659
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3660
    console_cmd = hyper.GetShellCommandForConsole(instance)
3661

    
3662
    # build ssh cmdline
3663
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3664

    
3665

    
3666
class LUReplaceDisks(LogicalUnit):
3667
  """Replace the disks of an instance.
3668

3669
  """
3670
  HPATH = "mirrors-replace"
3671
  HTYPE = constants.HTYPE_INSTANCE
3672
  _OP_REQP = ["instance_name", "mode", "disks"]
3673
  REQ_BGL = False
3674

    
3675
  def ExpandNames(self):
3676
    self._ExpandAndLockInstance()
3677

    
3678
    if not hasattr(self.op, "remote_node"):
3679
      self.op.remote_node = None
3680

    
3681
    ia_name = getattr(self.op, "iallocator", None)
3682
    if ia_name is not None:
3683
      if self.op.remote_node is not None:
3684
        raise errors.OpPrereqError("Give either the iallocator or the new"
3685
                                   " secondary, not both")
3686
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3687
    elif self.op.remote_node is not None:
3688
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3689
      if remote_node is None:
3690
        raise errors.OpPrereqError("Node '%s' not known" %
3691
                                   self.op.remote_node)
3692
      self.op.remote_node = remote_node
3693
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3694
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3695
    else:
3696
      self.needed_locks[locking.LEVEL_NODE] = []
3697
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3698

    
3699
  def DeclareLocks(self, level):
3700
    # If we're not already locking all nodes in the set we have to declare the
3701
    # instance's primary/secondary nodes.
3702
    if (level == locking.LEVEL_NODE and
3703
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3704
      self._LockInstancesNodes()
3705

    
3706
  def _RunAllocator(self):
3707
    """Compute a new secondary node using an IAllocator.
3708

3709
    """
3710
    ial = IAllocator(self,
3711
                     mode=constants.IALLOCATOR_MODE_RELOC,
3712
                     name=self.op.instance_name,
3713
                     relocate_from=[self.sec_node])
3714

    
3715
    ial.Run(self.op.iallocator)
3716

    
3717
    if not ial.success:
3718
      raise errors.OpPrereqError("Can't compute nodes using"
3719
                                 " iallocator '%s': %s" % (self.op.iallocator,
3720
                                                           ial.info))
3721
    if len(ial.nodes) != ial.required_nodes:
3722
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3723
                                 " of nodes (%s), required %s" %
3724
                                 (len(ial.nodes), ial.required_nodes))
3725
    self.op.remote_node = ial.nodes[0]
3726
    logger.ToStdout("Selected new secondary for the instance: %s" %
3727
                    self.op.remote_node)
3728

    
3729
  def BuildHooksEnv(self):
3730
    """Build hooks env.
3731

3732
    This runs on the master, the primary and all the secondaries.
3733

3734
    """
3735
    env = {
3736
      "MODE": self.op.mode,
3737
      "NEW_SECONDARY": self.op.remote_node,
3738
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3739
      }
3740
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3741
    nl = [
3742
      self.cfg.GetMasterNode(),
3743
      self.instance.primary_node,
3744
      ]
3745
    if self.op.remote_node is not None:
3746
      nl.append(self.op.remote_node)
3747
    return env, nl, nl
3748

    
3749
  def CheckPrereq(self):
3750
    """Check prerequisites.
3751

3752
    This checks that the instance is in the cluster.
3753

3754
    """
3755
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3756
    assert instance is not None, \
3757
      "Cannot retrieve locked instance %s" % self.op.instance_name
3758
    self.instance = instance
3759

    
3760
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3761
      raise errors.OpPrereqError("Instance's disk layout is not"
3762
                                 " network mirrored.")
3763

    
3764
    if len(instance.secondary_nodes) != 1:
3765
      raise errors.OpPrereqError("The instance has a strange layout,"
3766
                                 " expected one secondary but found %d" %
3767
                                 len(instance.secondary_nodes))
3768

    
3769
    self.sec_node = instance.secondary_nodes[0]
3770

    
3771
    ia_name = getattr(self.op, "iallocator", None)
3772
    if ia_name is not None:
3773
      self._RunAllocator()
3774

    
3775
    remote_node = self.op.remote_node
3776
    if remote_node is not None:
3777
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3778
      assert self.remote_node_info is not None, \
3779
        "Cannot retrieve locked node %s" % remote_node
3780
    else:
3781
      self.remote_node_info = None
3782
    if remote_node == instance.primary_node:
3783
      raise errors.OpPrereqError("The specified node is the primary node of"
3784
                                 " the instance.")
3785
    elif remote_node == self.sec_node:
3786
      if self.op.mode == constants.REPLACE_DISK_SEC:
3787
        # this is for DRBD8, where we can't execute the same mode of
3788
        # replacement as for drbd7 (no different port allocated)
3789
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3790
                                   " replacement")
3791
    if instance.disk_template == constants.DT_DRBD8:
3792
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3793
          remote_node is not None):
3794
        # switch to replace secondary mode
3795
        self.op.mode = constants.REPLACE_DISK_SEC
3796

    
3797
      if self.op.mode == constants.REPLACE_DISK_ALL:
3798
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3799
                                   " secondary disk replacement, not"
3800
                                   " both at once")
3801
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3802
        if remote_node is not None:
3803
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3804
                                     " the secondary while doing a primary"
3805
                                     " node disk replacement")
3806
        self.tgt_node = instance.primary_node
3807
        self.oth_node = instance.secondary_nodes[0]
3808
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3809
        self.new_node = remote_node # this can be None, in which case
3810
                                    # we don't change the secondary
3811
        self.tgt_node = instance.secondary_nodes[0]
3812
        self.oth_node = instance.primary_node
3813
      else:
3814
        raise errors.ProgrammerError("Unhandled disk replace mode")
3815

    
3816
    for name in self.op.disks:
3817
      if instance.FindDisk(name) is None:
3818
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3819
                                   (name, instance.name))
3820

    
3821
  def _ExecD8DiskOnly(self, feedback_fn):
3822
    """Replace a disk on the primary or secondary for dbrd8.
3823

3824
    The algorithm for replace is quite complicated:
3825
      - for each disk to be replaced:
3826
        - create new LVs on the target node with unique names
3827
        - detach old LVs from the drbd device
3828
        - rename old LVs to name_replaced.<time_t>
3829
        - rename new LVs to old LVs
3830
        - attach the new LVs (with the old names now) to the drbd device
3831
      - wait for sync across all devices
3832
      - for each modified disk:
3833
        - remove old LVs (which have the name name_replaces.<time_t>)
3834

3835
    Failures are not very well handled.
3836

3837
    """
3838
    steps_total = 6
3839
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3840
    instance = self.instance
3841
    iv_names = {}
3842
    vgname = self.cfg.GetVGName()
3843
    # start of work
3844
    cfg = self.cfg
3845
    tgt_node = self.tgt_node
3846
    oth_node = self.oth_node
3847

    
3848
    # Step: check device activation
3849
    self.proc.LogStep(1, steps_total, "check device existence")
3850
    info("checking volume groups")
3851
    my_vg = cfg.GetVGName()
3852
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3853
    if not results:
3854
      raise errors.OpExecError("Can't list volume groups on the nodes")
3855
    for node in oth_node, tgt_node:
3856
      res = results.get(node, False)
3857
      if not res or my_vg not in res:
3858
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3859
                                 (my_vg, node))
3860
    for dev in instance.disks:
3861
      if not dev.iv_name in self.op.disks:
3862
        continue
3863
      for node in tgt_node, oth_node:
3864
        info("checking %s on %s" % (dev.iv_name, node))
3865
        cfg.SetDiskID(dev, node)
3866
        if not self.rpc.call_blockdev_find(node, dev):
3867
          raise errors.OpExecError("Can't find device %s on node %s" %
3868
                                   (dev.iv_name, node))
3869

    
3870
    # Step: check other node consistency
3871
    self.proc.LogStep(2, steps_total, "check peer consistency")
3872
    for dev in instance.disks:
3873
      if not dev.iv_name in self.op.disks:
3874
        continue
3875
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3876
      if not _CheckDiskConsistency(self, dev, oth_node,
3877
                                   oth_node==instance.primary_node):
3878
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3879
                                 " to replace disks on this node (%s)" %
3880
                                 (oth_node, tgt_node))
3881

    
3882
    # Step: create new storage
3883
    self.proc.LogStep(3, steps_total, "allocate new storage")
3884
    for dev in instance.disks:
3885
      if not dev.iv_name in self.op.disks:
3886
        continue
3887
      size = dev.size
3888
      cfg.SetDiskID(dev, tgt_node)
3889
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3890
      names = _GenerateUniqueNames(self, lv_names)
3891
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3892
                             logical_id=(vgname, names[0]))
3893
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3894
                             logical_id=(vgname, names[1]))
3895
      new_lvs = [lv_data, lv_meta]
3896
      old_lvs = dev.children
3897
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3898
      info("creating new local storage on %s for %s" %
3899
           (tgt_node, dev.iv_name))
3900
      # since we *always* want to create this LV, we use the
3901
      # _Create...OnPrimary (which forces the creation), even if we
3902
      # are talking about the secondary node
3903
      for new_lv in new_lvs:
3904
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3905
                                        _GetInstanceInfoText(instance)):
3906
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3907
                                   " node '%s'" %
3908
                                   (new_lv.logical_id[1], tgt_node))
3909

    
3910
    # Step: for each lv, detach+rename*2+attach
3911
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3912
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3913
      info("detaching %s drbd from local storage" % dev.iv_name)
3914
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3915
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3916
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3917
      #dev.children = []
3918
      #cfg.Update(instance)
3919

    
3920
      # ok, we created the new LVs, so now we know we have the needed
3921
      # storage; as such, we proceed on the target node to rename
3922
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3923
      # using the assumption that logical_id == physical_id (which in
3924
      # turn is the unique_id on that node)
3925

    
3926
      # FIXME(iustin): use a better name for the replaced LVs
3927
      temp_suffix = int(time.time())
3928
      ren_fn = lambda d, suff: (d.physical_id[0],
3929
                                d.physical_id[1] + "_replaced-%s" % suff)
3930
      # build the rename list based on what LVs exist on the node
3931
      rlist = []
3932
      for to_ren in old_lvs:
3933
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
3934
        if find_res is not None: # device exists
3935
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3936

    
3937
      info("renaming the old LVs on the target node")
3938
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3939
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3940
      # now we rename the new LVs to the old LVs
3941
      info("renaming the new LVs on the target node")
3942
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3943
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3944
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3945

    
3946
      for old, new in zip(old_lvs, new_lvs):
3947
        new.logical_id = old.logical_id
3948
        cfg.SetDiskID(new, tgt_node)
3949

    
3950
      for disk in old_lvs:
3951
        disk.logical_id = ren_fn(disk, temp_suffix)
3952
        cfg.SetDiskID(disk, tgt_node)
3953

    
3954
      # now that the new lvs have the old name, we can add them to the device
3955
      info("adding new mirror component on %s" % tgt_node)
3956
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3957
        for new_lv in new_lvs:
3958
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
3959
            warning("Can't rollback device %s", hint="manually cleanup unused"
3960
                    " logical volumes")
3961
        raise errors.OpExecError("Can't add local storage to drbd")
3962

    
3963
      dev.children = new_lvs
3964
      cfg.Update(instance)
3965

    
3966
    # Step: wait for sync
3967

    
3968
    # this can fail as the old devices are degraded and _WaitForSync
3969
    # does a combined result over all disks, so we don't check its
3970
    # return value
3971
    self.proc.LogStep(5, steps_total, "sync devices")
3972
    _WaitForSync(self, instance, unlock=True)
3973

    
3974
    # so check manually all the devices
3975
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3976
      cfg.SetDiskID(dev, instance.primary_node)
3977
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
3978
      if is_degr:
3979
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3980

    
3981
    # Step: remove old storage
3982
    self.proc.LogStep(6, steps_total, "removing old storage")
3983
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3984
      info("remove logical volumes for %s" % name)
3985
      for lv in old_lvs:
3986
        cfg.SetDiskID(lv, tgt_node)
3987
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
3988
          warning("Can't remove old LV", hint="manually remove unused LVs")
3989
          continue
3990

    
3991
  def _ExecD8Secondary(self, feedback_fn):
3992
    """Replace the secondary node for drbd8.
3993

3994
    The algorithm for replace is quite complicated:
3995
      - for all disks of the instance:
3996
        - create new LVs on the new node with same names
3997
        - shutdown the drbd device on the old secondary
3998
        - disconnect the drbd network on the primary
3999
        - create the drbd device on the new secondary
4000
        - network attach the drbd on the primary, using an artifice:
4001
          the drbd code for Attach() will connect to the network if it
4002
          finds a device which is connected to the good local disks but
4003
          not network enabled
4004
      - wait for sync across all devices
4005
      - remove all disks from the old secondary
4006

4007
    Failures are not very well handled.
4008

4009
    """
4010
    steps_total = 6
4011
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4012
    instance = self.instance
4013
    iv_names = {}
4014
    vgname = self.cfg.GetVGName()
4015
    # start of work
4016
    cfg = self.cfg
4017
    old_node = self.tgt_node
4018
    new_node = self.new_node
4019
    pri_node = instance.primary_node
4020

    
4021
    # Step: check device activation
4022
    self.proc.LogStep(1, steps_total, "check device existence")
4023
    info("checking volume groups")
4024
    my_vg = cfg.GetVGName()
4025
    results = self.rpc.call_vg_list([pri_node, new_node])
4026
    if not results:
4027
      raise errors.OpExecError("Can't list volume groups on the nodes")
4028
    for node in pri_node, new_node:
4029
      res = results.get(node, False)
4030
      if not res or my_vg not in res:
4031
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4032
                                 (my_vg, node))
4033
    for dev in instance.disks:
4034
      if not dev.iv_name in self.op.disks:
4035
        continue
4036
      info("checking %s on %s" % (dev.iv_name, pri_node))
4037
      cfg.SetDiskID(dev, pri_node)
4038
      if not self.rpc.call_blockdev_find(pri_node, dev):
4039
        raise errors.OpExecError("Can't find device %s on node %s" %
4040
                                 (dev.iv_name, pri_node))
4041

    
4042
    # Step: check other node consistency
4043
    self.proc.LogStep(2, steps_total, "check peer consistency")
4044
    for dev in instance.disks:
4045
      if not dev.iv_name in self.op.disks:
4046
        continue
4047
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4048
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4049
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4050
                                 " unsafe to replace the secondary" %
4051
                                 pri_node)
4052

    
4053
    # Step: create new storage
4054
    self.proc.LogStep(3, steps_total, "allocate new storage")
4055
    for dev in instance.disks:
4056
      size = dev.size
4057
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4058
      # since we *always* want to create this LV, we use the
4059
      # _Create...OnPrimary (which forces the creation), even if we
4060
      # are talking about the secondary node
4061
      for new_lv in dev.children:
4062
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4063
                                        _GetInstanceInfoText(instance)):
4064
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4065
                                   " node '%s'" %
4066
                                   (new_lv.logical_id[1], new_node))
4067

    
4068

    
4069
    # Step 4: dbrd minors and drbd setups changes
4070
    # after this, we must manually remove the drbd minors on both the
4071
    # error and the success paths
4072
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4073
                                   instance.name)
4074
    logging.debug("Allocated minors %s" % (minors,))
4075
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4076
    for dev, new_minor in zip(instance.disks, minors):
4077
      size = dev.size
4078
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4079
      # create new devices on new_node
4080
      if pri_node == dev.logical_id[0]:
4081
        new_logical_id = (pri_node, new_node,
4082
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4083
                          dev.logical_id[5])
4084
      else:
4085
        new_logical_id = (new_node, pri_node,
4086
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4087
                          dev.logical_id[5])
4088
      iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4089
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4090
                    new_logical_id)
4091
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4092
                              logical_id=new_logical_id,
4093
                              children=dev.children)
4094
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4095
                                        new_drbd, False,
4096
                                        _GetInstanceInfoText(instance)):
4097
        self.cfg.ReleaseDRBDMinors(instance.name)
4098
        raise errors.OpExecError("Failed to create new DRBD on"
4099
                                 " node '%s'" % new_node)
4100

    
4101
    for dev in instance.disks:
4102
      # we have new devices, shutdown the drbd on the old secondary
4103
      info("shutting down drbd for %s on old node" % dev.iv_name)
4104
      cfg.SetDiskID(dev, old_node)
4105
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4106
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4107
                hint="Please cleanup this device manually as soon as possible")
4108

    
4109
    info("detaching primary drbds from the network (=> standalone)")
4110
    done = 0
4111
    for dev in instance.disks:
4112
      cfg.SetDiskID(dev, pri_node)
4113
      # set the network part of the physical (unique in bdev terms) id
4114
      # to None, meaning detach from network
4115
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4116
      # and 'find' the device, which will 'fix' it to match the
4117
      # standalone state
4118
      if self.rpc.call_blockdev_find(pri_node, dev):
4119
        done += 1
4120
      else:
4121
        warning("Failed to detach drbd %s from network, unusual case" %
4122
                dev.iv_name)
4123

    
4124
    if not done:
4125
      # no detaches succeeded (very unlikely)
4126
      self.cfg.ReleaseDRBDMinors(instance.name)
4127
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4128

    
4129
    # if we managed to detach at least one, we update all the disks of
4130
    # the instance to point to the new secondary
4131
    info("updating instance configuration")
4132
    for dev, _, new_logical_id in iv_names.itervalues():
4133
      dev.logical_id = new_logical_id
4134
      cfg.SetDiskID(dev, pri_node)
4135
    cfg.Update(instance)
4136
    # we can remove now the temp minors as now the new values are
4137
    # written to the config file (and therefore stable)
4138
    self.cfg.ReleaseDRBDMinors(instance.name)
4139

    
4140
    # and now perform the drbd attach
4141
    info("attaching primary drbds to new secondary (standalone => connected)")
4142
    failures = []
4143
    for dev in instance.disks:
4144
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4145
      # since the attach is smart, it's enough to 'find' the device,
4146
      # it will automatically activate the network, if the physical_id
4147
      # is correct
4148
      cfg.SetDiskID(dev, pri_node)
4149
      logging.debug("Disk to attach: %s", dev)
4150
      if not self.rpc.call_blockdev_find(pri_node, dev):
4151
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4152
                "please do a gnt-instance info to see the status of disks")
4153

    
4154
    # this can fail as the old devices are degraded and _WaitForSync
4155
    # does a combined result over all disks, so we don't check its
4156
    # return value
4157
    self.proc.LogStep(5, steps_total, "sync devices")
4158
    _WaitForSync(self, instance, unlock=True)
4159

    
4160
    # so check manually all the devices
4161
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4162
      cfg.SetDiskID(dev, pri_node)
4163
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4164
      if is_degr:
4165
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4166

    
4167
    self.proc.LogStep(6, steps_total, "removing old storage")
4168
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4169
      info("remove logical volumes for %s" % name)
4170
      for lv in old_lvs:
4171
        cfg.SetDiskID(lv, old_node)
4172
        if not self.rpc.call_blockdev_remove(old_node, lv):
4173
          warning("Can't remove LV on old secondary",
4174
                  hint="Cleanup stale volumes by hand")
4175

    
4176
  def Exec(self, feedback_fn):
4177
    """Execute disk replacement.
4178

4179
    This dispatches the disk replacement to the appropriate handler.
4180

4181
    """
4182
    instance = self.instance
4183

    
4184
    # Activate the instance disks if we're replacing them on a down instance
4185
    if instance.status == "down":
4186
      _StartInstanceDisks(self, instance, True)
4187

    
4188
    if instance.disk_template == constants.DT_DRBD8:
4189
      if self.op.remote_node is None:
4190
        fn = self._ExecD8DiskOnly
4191
      else:
4192
        fn = self._ExecD8Secondary
4193
    else:
4194
      raise errors.ProgrammerError("Unhandled disk replacement case")
4195

    
4196
    ret = fn(feedback_fn)
4197

    
4198
    # Deactivate the instance disks if we're replacing them on a down instance
4199
    if instance.status == "down":
4200
      _SafeShutdownInstanceDisks(self, instance)
4201

    
4202
    return ret
4203

    
4204

    
4205
class LUGrowDisk(LogicalUnit):
4206
  """Grow a disk of an instance.
4207

4208
  """
4209
  HPATH = "disk-grow"
4210
  HTYPE = constants.HTYPE_INSTANCE
4211
  _OP_REQP = ["instance_name", "disk", "amount"]
4212
  REQ_BGL = False
4213

    
4214
  def ExpandNames(self):
4215
    self._ExpandAndLockInstance()
4216
    self.needed_locks[locking.LEVEL_NODE] = []
4217
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4218

    
4219
  def DeclareLocks(self, level):
4220
    if level == locking.LEVEL_NODE:
4221
      self._LockInstancesNodes()
4222

    
4223
  def BuildHooksEnv(self):
4224
    """Build hooks env.
4225

4226
    This runs on the master, the primary and all the secondaries.
4227

4228
    """
4229
    env = {
4230
      "DISK": self.op.disk,
4231
      "AMOUNT": self.op.amount,
4232
      }
4233
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4234
    nl = [
4235
      self.cfg.GetMasterNode(),
4236
      self.instance.primary_node,
4237
      ]
4238
    return env, nl, nl
4239

    
4240
  def CheckPrereq(self):
4241
    """Check prerequisites.
4242

4243
    This checks that the instance is in the cluster.
4244

4245
    """
4246
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4247
    assert instance is not None, \
4248
      "Cannot retrieve locked instance %s" % self.op.instance_name
4249

    
4250
    self.instance = instance
4251

    
4252
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4253
      raise errors.OpPrereqError("Instance's disk layout does not support"
4254
                                 " growing.")
4255

    
4256
    if instance.FindDisk(self.op.disk) is None:
4257
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4258
                                 (self.op.disk, instance.name))
4259

    
4260
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4261
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4262
                                       instance.hypervisor)
4263
    for node in nodenames:
4264
      info = nodeinfo.get(node, None)
4265
      if not info:
4266
        raise errors.OpPrereqError("Cannot get current information"
4267
                                   " from node '%s'" % node)
4268
      vg_free = info.get('vg_free', None)
4269
      if not isinstance(vg_free, int):
4270
        raise errors.OpPrereqError("Can't compute free disk space on"
4271
                                   " node %s" % node)
4272
      if self.op.amount > info['vg_free']:
4273
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4274
                                   " %d MiB available, %d MiB required" %
4275
                                   (node, info['vg_free'], self.op.amount))
4276

    
4277
  def Exec(self, feedback_fn):
4278
    """Execute disk grow.
4279

4280
    """
4281
    instance = self.instance
4282
    disk = instance.FindDisk(self.op.disk)
4283
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4284
      self.cfg.SetDiskID(disk, node)
4285
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4286
      if (not result or not isinstance(result, (list, tuple)) or
4287
          len(result) != 2):
4288
        raise errors.OpExecError("grow request failed to node %s" % node)
4289
      elif not result[0]:
4290
        raise errors.OpExecError("grow request failed to node %s: %s" %
4291
                                 (node, result[1]))
4292
    disk.RecordGrow(self.op.amount)
4293
    self.cfg.Update(instance)
4294
    return
4295

    
4296

    
4297
class LUQueryInstanceData(NoHooksLU):
4298
  """Query runtime instance data.
4299

4300
  """
4301
  _OP_REQP = ["instances"]
4302
  REQ_BGL = False
4303

    
4304
  def ExpandNames(self):
4305
    self.needed_locks = {}
4306
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4307

    
4308
    if not isinstance(self.op.instances, list):
4309
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4310

    
4311
    if self.op.instances:
4312
      self.wanted_names = []
4313
      for name in self.op.instances:
4314
        full_name = self.cfg.ExpandInstanceName(name)
4315
        if full_name is None:
4316
          raise errors.OpPrereqError("Instance '%s' not known" %
4317
                                     self.op.instance_name)
4318
        self.wanted_names.append(full_name)
4319
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4320
    else:
4321
      self.wanted_names = None
4322
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4323

    
4324
    self.needed_locks[locking.LEVEL_NODE] = []
4325
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4326

    
4327
  def DeclareLocks(self, level):
4328
    if level == locking.LEVEL_NODE:
4329
      self._LockInstancesNodes()
4330

    
4331
  def CheckPrereq(self):
4332
    """Check prerequisites.
4333

4334
    This only checks the optional instance list against the existing names.
4335

4336
    """
4337
    if self.wanted_names is None:
4338
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4339

    
4340
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4341
                             in self.wanted_names]
4342
    return
4343

    
4344
  def _ComputeDiskStatus(self, instance, snode, dev):
4345
    """Compute block device status.
4346

4347
    """
4348
    self.cfg.SetDiskID(dev, instance.primary_node)
4349
    dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4350
    if dev.dev_type in constants.LDS_DRBD:
4351
      # we change the snode then (otherwise we use the one passed in)
4352
      if dev.logical_id[0] == instance.primary_node:
4353
        snode = dev.logical_id[1]
4354
      else:
4355
        snode = dev.logical_id[0]
4356

    
4357
    if snode:
4358
      self.cfg.SetDiskID(dev, snode)
4359
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4360
    else:
4361
      dev_sstatus = None
4362

    
4363
    if dev.children:
4364
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4365
                      for child in dev.children]
4366
    else:
4367
      dev_children = []
4368

    
4369
    data = {
4370
      "iv_name": dev.iv_name,
4371
      "dev_type": dev.dev_type,
4372
      "logical_id": dev.logical_id,
4373
      "physical_id": dev.physical_id,
4374
      "pstatus": dev_pstatus,
4375
      "sstatus": dev_sstatus,
4376
      "children": dev_children,
4377
      }
4378

    
4379
    return data
4380

    
4381
  def Exec(self, feedback_fn):
4382
    """Gather and return data"""
4383
    result = {}
4384
    for instance in self.wanted_instances:
4385
      remote_info = self.rpc.call_instance_info(instance.primary_node,
4386
                                                instance.name,
4387
                                                instance.hypervisor)
4388
      if remote_info and "state" in remote_info:
4389
        remote_state = "up"
4390
      else:
4391
        remote_state = "down"
4392
      if instance.status == "down":
4393
        config_state = "down"
4394
      else:
4395
        config_state = "up"
4396

    
4397
      disks = [self._ComputeDiskStatus(instance, None, device)
4398
               for device in instance.disks]
4399

    
4400
      idict = {
4401
        "name": instance.name,
4402
        "config_state": config_state,
4403
        "run_state": remote_state,
4404
        "pnode": instance.primary_node,
4405
        "snodes": instance.secondary_nodes,
4406
        "os": instance.os,
4407
        "memory": instance.memory,
4408
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4409
        "disks": disks,
4410
        "vcpus": instance.vcpus,
4411
        "hypervisor": instance.hypervisor,
4412
        }
4413

    
4414
      htkind = instance.hypervisor
4415
      if htkind == constants.HT_XEN_PVM:
4416
        idict["kernel_path"] = instance.kernel_path
4417
        idict["initrd_path"] = instance.initrd_path
4418

    
4419
      if htkind == constants.HT_XEN_HVM:
4420
        idict["hvm_boot_order"] = instance.hvm_boot_order
4421
        idict["hvm_acpi"] = instance.hvm_acpi
4422
        idict["hvm_pae"] = instance.hvm_pae
4423
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4424
        idict["hvm_nic_type"] = instance.hvm_nic_type
4425
        idict["hvm_disk_type"] = instance.hvm_disk_type
4426

    
4427
      if htkind in constants.HTS_REQ_PORT:
4428
        if instance.vnc_bind_address is None:
4429
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4430
        else:
4431
          vnc_bind_address = instance.vnc_bind_address
4432
        if instance.network_port is None:
4433
          vnc_console_port = None
4434
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4435
          vnc_console_port = "%s:%s" % (instance.primary_node,
4436
                                       instance.network_port)
4437
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4438
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4439
                                                   instance.network_port,
4440
                                                   instance.primary_node)
4441
        else:
4442
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4443
                                        instance.network_port)
4444
        idict["vnc_console_port"] = vnc_console_port
4445
        idict["vnc_bind_address"] = vnc_bind_address
4446
        idict["network_port"] = instance.network_port
4447

    
4448
      result[instance.name] = idict
4449

    
4450
    return result
4451

    
4452

    
4453
class LUSetInstanceParams(LogicalUnit):
4454
  """Modifies an instances's parameters.
4455

4456
  """
4457
  HPATH = "instance-modify"
4458
  HTYPE = constants.HTYPE_INSTANCE
4459
  _OP_REQP = ["instance_name"]
4460
  REQ_BGL = False
4461

    
4462
  def ExpandNames(self):
4463
    self._ExpandAndLockInstance()
4464

    
4465
  def BuildHooksEnv(self):
4466
    """Build hooks env.
4467

4468
    This runs on the master, primary and secondaries.
4469

4470
    """
4471
    args = dict()
4472
    if self.mem:
4473
      args['memory'] = self.mem
4474
    if self.vcpus:
4475
      args['vcpus'] = self.vcpus
4476
    if self.do_ip or self.do_bridge or self.mac:
4477
      if self.do_ip:
4478
        ip = self.ip
4479
      else:
4480
        ip = self.instance.nics[0].ip
4481
      if self.bridge:
4482
        bridge = self.bridge
4483
      else:
4484
        bridge = self.instance.nics[0].bridge
4485
      if self.mac:
4486
        mac = self.mac
4487
      else:
4488
        mac = self.instance.nics[0].mac
4489
      args['nics'] = [(ip, bridge, mac)]
4490
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4491
    nl = [self.cfg.GetMasterNode(),
4492
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4493
    return env, nl, nl
4494

    
4495
  def CheckPrereq(self):
4496
    """Check prerequisites.
4497

4498
    This only checks the instance list against the existing names.
4499

4500
    """
4501
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4502
    # a separate CheckArguments function, if we implement one, so the operation
4503
    # can be aborted without waiting for any lock, should it have an error...
4504
    self.mem = getattr(self.op, "mem", None)
4505
    self.vcpus = getattr(self.op, "vcpus", None)
4506
    self.ip = getattr(self.op, "ip", None)
4507
    self.mac = getattr(self.op, "mac", None)
4508
    self.bridge = getattr(self.op, "bridge", None)
4509
    self.kernel_path = getattr(self.op, "kernel_path", None)
4510
    self.initrd_path = getattr(self.op, "initrd_path", None)
4511
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4512
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4513
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4514
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4515
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4516
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4517
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4518
    self.force = getattr(self.op, "force", None)
4519
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4520
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4521
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4522
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4523
    if all_parms.count(None) == len(all_parms):
4524
      raise errors.OpPrereqError("No changes submitted")
4525
    if self.mem is not None:
4526
      try:
4527
        self.mem = int(self.mem)
4528
      except ValueError, err:
4529
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4530
    if self.vcpus is not None:
4531
      try:
4532
        self.vcpus = int(self.vcpus)
4533
      except ValueError, err:
4534
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4535
    if self.ip is not None:
4536
      self.do_ip = True
4537
      if self.ip.lower() == "none":
4538
        self.ip = None
4539
      else:
4540
        if not utils.IsValidIP(self.ip):
4541
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4542
    else:
4543
      self.do_ip = False
4544
    self.do_bridge = (self.bridge is not None)
4545
    if self.mac is not None:
4546
      if self.cfg.IsMacInUse(self.mac):
4547
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4548
                                   self.mac)
4549
      if not utils.IsValidMac(self.mac):
4550
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4551

    
4552
    if self.kernel_path is not None:
4553
      self.do_kernel_path = True
4554
      if self.kernel_path == constants.VALUE_NONE:
4555
        raise errors.OpPrereqError("Can't set instance to no kernel")
4556

    
4557
      if self.kernel_path != constants.VALUE_DEFAULT:
4558
        if not os.path.isabs(self.kernel_path):
4559
          raise errors.OpPrereqError("The kernel path must be an absolute"
4560
                                    " filename")
4561
    else:
4562
      self.do_kernel_path = False
4563

    
4564
    if self.initrd_path is not None:
4565
      self.do_initrd_path = True
4566
      if self.initrd_path not in (constants.VALUE_NONE,
4567
                                  constants.VALUE_DEFAULT):
4568
        if not os.path.isabs(self.initrd_path):
4569
          raise errors.OpPrereqError("The initrd path must be an absolute"
4570
                                    " filename")
4571
    else:
4572
      self.do_initrd_path = False
4573

    
4574
    # boot order verification
4575
    if self.hvm_boot_order is not None:
4576
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4577
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4578
          raise errors.OpPrereqError("invalid boot order specified,"
4579
                                     " must be one or more of [acdn]"
4580
                                     " or 'default'")
4581

    
4582
    # hvm_cdrom_image_path verification
4583
    if self.op.hvm_cdrom_image_path is not None:
4584
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4585
              self.op.hvm_cdrom_image_path.lower() == "none"):
4586
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4587
                                   " be an absolute path or None, not %s" %
4588
                                   self.op.hvm_cdrom_image_path)
4589
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4590
              self.op.hvm_cdrom_image_path.lower() == "none"):
4591
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4592
                                   " regular file or a symlink pointing to"
4593
                                   " an existing regular file, not %s" %
4594
                                   self.op.hvm_cdrom_image_path)
4595

    
4596
    # vnc_bind_address verification
4597
    if self.op.vnc_bind_address is not None:
4598
      if not utils.IsValidIP(self.op.vnc_bind_address):
4599
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4600
                                   " like a valid IP address" %
4601
                                   self.op.vnc_bind_address)
4602

    
4603
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4604
    assert self.instance is not None, \
4605
      "Cannot retrieve locked instance %s" % self.op.instance_name
4606
    self.warn = []
4607
    if self.mem is not None and not self.force:
4608
      pnode = self.instance.primary_node
4609
      nodelist = [pnode]
4610
      nodelist.extend(instance.secondary_nodes)
4611
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4612
                                                  instance.hypervisor)
4613
      nodeinfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
4614
                                         instance.hypervisor)
4615

    
4616
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4617
        # Assume the primary node is unreachable and go ahead
4618
        self.warn.append("Can't get info from primary node %s" % pnode)
4619
      else:
4620
        if instance_info:
4621
          current_mem = instance_info['memory']
4622
        else:
4623
          # Assume instance not running
4624
          # (there is a slight race condition here, but it's not very probable,
4625
          # and we have no other way to check)
4626
          current_mem = 0
4627
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4628
        if miss_mem > 0:
4629
          raise errors.OpPrereqError("This change will prevent the instance"
4630
                                     " from starting, due to %d MB of memory"
4631
                                     " missing on its primary node" % miss_mem)
4632

    
4633
      for node in instance.secondary_nodes:
4634
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4635
          self.warn.append("Can't get info from secondary node %s" % node)
4636
        elif self.mem > nodeinfo[node]['memory_free']:
4637
          self.warn.append("Not enough memory to failover instance to secondary"
4638
                           " node %s" % node)
4639

    
4640
    # Xen HVM device type checks
4641
    if instance.hypervisor == constants.HT_XEN_HVM:
4642
      if self.op.hvm_nic_type is not None:
4643
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4644
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4645
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4646
      if self.op.hvm_disk_type is not None:
4647
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4648
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4649
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4650

    
4651
    return
4652

    
4653
  def Exec(self, feedback_fn):
4654
    """Modifies an instance.
4655

4656
    All parameters take effect only at the next restart of the instance.
4657
    """
4658
    # Process here the warnings from CheckPrereq, as we don't have a
4659
    # feedback_fn there.
4660
    for warn in self.warn:
4661
      feedback_fn("WARNING: %s" % warn)
4662

    
4663
    result = []
4664
    instance = self.instance
4665
    if self.mem:
4666
      instance.memory = self.mem
4667
      result.append(("mem", self.mem))
4668
    if self.vcpus:
4669
      instance.vcpus = self.vcpus
4670
      result.append(("vcpus",  self.vcpus))
4671
    if self.do_ip:
4672
      instance.nics[0].ip = self.ip
4673
      result.append(("ip", self.ip))
4674
    if self.bridge:
4675
      instance.nics[0].bridge = self.bridge
4676
      result.append(("bridge", self.bridge))
4677
    if self.mac:
4678
      instance.nics[0].mac = self.mac
4679
      result.append(("mac", self.mac))
4680
    if self.do_kernel_path:
4681
      instance.kernel_path = self.kernel_path
4682
      result.append(("kernel_path", self.kernel_path))
4683
    if self.do_initrd_path:
4684
      instance.initrd_path = self.initrd_path
4685
      result.append(("initrd_path", self.initrd_path))
4686
    if self.hvm_boot_order:
4687
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4688
        instance.hvm_boot_order = None
4689
      else:
4690
        instance.hvm_boot_order = self.hvm_boot_order
4691
      result.append(("hvm_boot_order", self.hvm_boot_order))
4692
    if self.hvm_acpi is not None:
4693
      instance.hvm_acpi = self.hvm_acpi
4694
      result.append(("hvm_acpi", self.hvm_acpi))
4695
    if self.hvm_pae is not None:
4696
      instance.hvm_pae = self.hvm_pae
4697
      result.append(("hvm_pae", self.hvm_pae))
4698
    if self.hvm_nic_type is not None:
4699
      instance.hvm_nic_type = self.hvm_nic_type
4700
      result.append(("hvm_nic_type", self.hvm_nic_type))
4701
    if self.hvm_disk_type is not None:
4702
      instance.hvm_disk_type = self.hvm_disk_type
4703
      result.append(("hvm_disk_type", self.hvm_disk_type))
4704
    if self.hvm_cdrom_image_path:
4705
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4706
        instance.hvm_cdrom_image_path = None
4707
      else:
4708
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4709
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4710
    if self.vnc_bind_address:
4711
      instance.vnc_bind_address = self.vnc_bind_address
4712
      result.append(("vnc_bind_address", self.vnc_bind_address))
4713

    
4714
    self.cfg.Update(instance)
4715

    
4716
    return result
4717

    
4718

    
4719
class LUQueryExports(NoHooksLU):
4720
  """Query the exports list
4721

4722
  """
4723
  _OP_REQP = ['nodes']
4724
  REQ_BGL = False
4725

    
4726
  def ExpandNames(self):
4727
    self.needed_locks = {}
4728
    self.share_locks[locking.LEVEL_NODE] = 1
4729
    if not self.op.nodes:
4730
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4731
    else:
4732
      self.needed_locks[locking.LEVEL_NODE] = \
4733
        _GetWantedNodes(self, self.op.nodes)
4734

    
4735
  def CheckPrereq(self):
4736
    """Check prerequisites.
4737

4738
    """
4739
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4740

    
4741
  def Exec(self, feedback_fn):
4742
    """Compute the list of all the exported system images.
4743

4744
    Returns:
4745
      a dictionary with the structure node->(export-list)
4746
      where export-list is a list of the instances exported on
4747
      that node.
4748

4749
    """
4750
    return self.rpc.call_export_list(self.nodes)
4751

    
4752

    
4753
class LUExportInstance(LogicalUnit):
4754
  """Export an instance to an image in the cluster.
4755

4756
  """
4757
  HPATH = "instance-export"
4758
  HTYPE = constants.HTYPE_INSTANCE
4759
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4760
  REQ_BGL = False
4761

    
4762
  def ExpandNames(self):
4763
    self._ExpandAndLockInstance()
4764
    # FIXME: lock only instance primary and destination node
4765
    #
4766
    # Sad but true, for now we have do lock all nodes, as we don't know where
4767
    # the previous export might be, and and in this LU we search for it and
4768
    # remove it from its current node. In the future we could fix this by:
4769
    #  - making a tasklet to search (share-lock all), then create the new one,
4770
    #    then one to remove, after
4771
    #  - removing the removal operation altoghether
4772
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4773

    
4774
  def DeclareLocks(self, level):
4775
    """Last minute lock declaration."""
4776
    # All nodes are locked anyway, so nothing to do here.
4777

    
4778
  def BuildHooksEnv(self):
4779
    """Build hooks env.
4780

4781
    This will run on the master, primary node and target node.
4782

4783
    """
4784
    env = {
4785
      "EXPORT_NODE": self.op.target_node,
4786
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4787
      }
4788
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4789
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4790
          self.op.target_node]
4791
    return env, nl, nl
4792

    
4793
  def CheckPrereq(self):
4794
    """Check prerequisites.
4795

4796
    This checks that the instance and node names are valid.
4797

4798
    """
4799
    instance_name = self.op.instance_name
4800
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4801
    assert self.instance is not None, \
4802
          "Cannot retrieve locked instance %s" % self.op.instance_name
4803

    
4804
    self.dst_node = self.cfg.GetNodeInfo(
4805
      self.cfg.ExpandNodeName(self.op.target_node))
4806

    
4807
    assert self.dst_node is not None, \
4808
          "Cannot retrieve locked node %s" % self.op.target_node
4809

    
4810
    # instance disk type verification
4811
    for disk in self.instance.disks:
4812
      if disk.dev_type == constants.LD_FILE:
4813
        raise errors.OpPrereqError("Export not supported for instances with"
4814
                                   " file-based disks")
4815

    
4816
  def Exec(self, feedback_fn):
4817
    """Export an instance to an image in the cluster.
4818

4819
    """
4820
    instance = self.instance
4821
    dst_node = self.dst_node
4822
    src_node = instance.primary_node
4823
    if self.op.shutdown:
4824
      # shutdown the instance, but not the disks
4825
      if not self.rpc.call_instance_shutdown(src_node, instance):
4826
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4827
                                 (instance.name, src_node))
4828

    
4829
    vgname = self.cfg.GetVGName()
4830

    
4831
    snap_disks = []
4832

    
4833
    try:
4834
      for disk in instance.disks:
4835
        if disk.iv_name == "sda":
4836
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4837
          new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4838

    
4839
          if not new_dev_name:
4840
            logger.Error("could not snapshot block device %s on node %s" %
4841
                         (disk.logical_id[1], src_node))
4842
          else:
4843
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4844
                                      logical_id=(vgname, new_dev_name),
4845
                                      physical_id=(vgname, new_dev_name),
4846
                                      iv_name=disk.iv_name)
4847
            snap_disks.append(new_dev)
4848

    
4849
    finally:
4850
      if self.op.shutdown and instance.status == "up":
4851
        if not self.rpc.call_instance_start(src_node, instance, None):
4852
          _ShutdownInstanceDisks(self, instance)
4853
          raise errors.OpExecError("Could not start instance")
4854

    
4855
    # TODO: check for size
4856

    
4857
    cluster_name = self.cfg.GetClusterName()
4858
    for dev in snap_disks:
4859
      if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4860
                                      instance, cluster_name):
4861
        logger.Error("could not export block device %s from node %s to node %s"
4862
                     % (dev.logical_id[1], src_node, dst_node.name))
4863
      if not self.rpc.call_blockdev_remove(src_node, dev):
4864
        logger.Error("could not remove snapshot block device %s from node %s" %
4865
                     (dev.logical_id[1], src_node))
4866

    
4867
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4868
      logger.Error("could not finalize export for instance %s on node %s" %
4869
                   (instance.name, dst_node.name))
4870

    
4871
    nodelist = self.cfg.GetNodeList()
4872
    nodelist.remove(dst_node.name)
4873

    
4874
    # on one-node clusters nodelist will be empty after the removal
4875
    # if we proceed the backup would be removed because OpQueryExports
4876
    # substitutes an empty list with the full cluster node list.
4877
    if nodelist:
4878
      exportlist = self.rpc.call_export_list(nodelist)
4879
      for node in exportlist:
4880
        if instance.name in exportlist[node]:
4881
          if not self.rpc.call_export_remove(node, instance.name):
4882
            logger.Error("could not remove older export for instance %s"
4883
                         " on node %s" % (instance.name, node))
4884

    
4885

    
4886
class LURemoveExport(NoHooksLU):
4887
  """Remove exports related to the named instance.
4888

4889
  """
4890
  _OP_REQP = ["instance_name"]
4891
  REQ_BGL = False
4892

    
4893
  def ExpandNames(self):
4894
    self.needed_locks = {}
4895
    # We need all nodes to be locked in order for RemoveExport to work, but we
4896
    # don't need to lock the instance itself, as nothing will happen to it (and
4897
    # we can remove exports also for a removed instance)
4898
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4899

    
4900
  def CheckPrereq(self):
4901
    """Check prerequisites.
4902
    """
4903
    pass
4904

    
4905
  def Exec(self, feedback_fn):
4906
    """Remove any export.
4907

4908
    """
4909
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4910
    # If the instance was not found we'll try with the name that was passed in.
4911
    # This will only work if it was an FQDN, though.
4912
    fqdn_warn = False
4913
    if not instance_name:
4914
      fqdn_warn = True
4915
      instance_name = self.op.instance_name
4916

    
4917
    exportlist = self.rpc.call_export_list(self.acquired_locks[
4918
      locking.LEVEL_NODE])
4919
    found = False
4920
    for node in exportlist:
4921
      if instance_name in exportlist[node]:
4922
        found = True
4923
        if not self.rpc.call_export_remove(node, instance_name):
4924
          logger.Error("could not remove export for instance %s"
4925
                       " on node %s" % (instance_name, node))
4926

    
4927
    if fqdn_warn and not found:
4928
      feedback_fn("Export not found. If trying to remove an export belonging"
4929
                  " to a deleted instance please use its Fully Qualified"
4930
                  " Domain Name.")
4931

    
4932

    
4933
class TagsLU(NoHooksLU):
4934
  """Generic tags LU.
4935

4936
  This is an abstract class which is the parent of all the other tags LUs.
4937

4938
  """
4939

    
4940
  def ExpandNames(self):
4941
    self.needed_locks = {}
4942
    if self.op.kind == constants.TAG_NODE:
4943
      name = self.cfg.ExpandNodeName(self.op.name)
4944
      if name is None:
4945
        raise errors.OpPrereqError("Invalid node name (%s)" %
4946
                                   (self.op.name,))
4947
      self.op.name = name
4948
      self.needed_locks[locking.LEVEL_NODE] = name
4949
    elif self.op.kind == constants.TAG_INSTANCE:
4950
      name = self.cfg.ExpandInstanceName(self.op.name)
4951
      if name is None:
4952
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4953
                                   (self.op.name,))
4954
      self.op.name = name
4955
      self.needed_locks[locking.LEVEL_INSTANCE] = name
4956

    
4957
  def CheckPrereq(self):
4958
    """Check prerequisites.
4959

4960
    """
4961
    if self.op.kind == constants.TAG_CLUSTER:
4962
      self.target = self.cfg.GetClusterInfo()
4963
    elif self.op.kind == constants.TAG_NODE:
4964
      self.target = self.cfg.GetNodeInfo(self.op.name)
4965
    elif self.op.kind == constants.TAG_INSTANCE:
4966
      self.target = self.cfg.GetInstanceInfo(self.op.name)
4967
    else:
4968
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4969
                                 str(self.op.kind))
4970

    
4971

    
4972
class LUGetTags(TagsLU):
4973
  """Returns the tags of a given object.
4974

4975
  """
4976
  _OP_REQP = ["kind", "name"]
4977
  REQ_BGL = False
4978

    
4979
  def Exec(self, feedback_fn):
4980
    """Returns the tag list.
4981

4982
    """
4983
    return list(self.target.GetTags())
4984

    
4985

    
4986
class LUSearchTags(NoHooksLU):
4987
  """Searches the tags for a given pattern.
4988

4989
  """
4990
  _OP_REQP = ["pattern"]
4991
  REQ_BGL = False
4992

    
4993
  def ExpandNames(self):
4994
    self.needed_locks = {}
4995

    
4996
  def CheckPrereq(self):
4997
    """Check prerequisites.
4998

4999
    This checks the pattern passed for validity by compiling it.
5000

5001
    """
5002
    try:
5003
      self.re = re.compile(self.op.pattern)
5004
    except re.error, err:
5005
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5006
                                 (self.op.pattern, err))
5007

    
5008
  def Exec(self, feedback_fn):
5009
    """Returns the tag list.
5010

5011
    """
5012
    cfg = self.cfg
5013
    tgts = [("/cluster", cfg.GetClusterInfo())]
5014
    ilist = cfg.GetAllInstancesInfo().values()
5015
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5016
    nlist = cfg.GetAllNodesInfo().values()
5017
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5018
    results = []
5019
    for path, target in tgts:
5020
      for tag in target.GetTags():
5021
        if self.re.search(tag):
5022
          results.append((path, tag))
5023
    return results
5024

    
5025

    
5026
class LUAddTags(TagsLU):
5027
  """Sets a tag on a given object.
5028

5029
  """
5030
  _OP_REQP = ["kind", "name", "tags"]
5031
  REQ_BGL = False
5032

    
5033
  def CheckPrereq(self):
5034
    """Check prerequisites.
5035

5036
    This checks the type and length of the tag name and value.
5037

5038
    """
5039
    TagsLU.CheckPrereq(self)
5040
    for tag in self.op.tags:
5041
      objects.TaggableObject.ValidateTag(tag)
5042

    
5043
  def Exec(self, feedback_fn):
5044
    """Sets the tag.
5045

5046
    """
5047
    try:
5048
      for tag in self.op.tags:
5049
        self.target.AddTag(tag)
5050
    except errors.TagError, err:
5051
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5052
    try:
5053
      self.cfg.Update(self.target)
5054
    except errors.ConfigurationError:
5055
      raise errors.OpRetryError("There has been a modification to the"
5056
                                " config file and the operation has been"
5057
                                " aborted. Please retry.")
5058

    
5059

    
5060
class LUDelTags(TagsLU):
5061
  """Delete a list of tags from a given object.
5062

5063
  """
5064
  _OP_REQP = ["kind", "name", "tags"]
5065
  REQ_BGL = False
5066

    
5067
  def CheckPrereq(self):
5068
    """Check prerequisites.
5069

5070
    This checks that we have the given tag.
5071

5072
    """
5073
    TagsLU.CheckPrereq(self)
5074
    for tag in self.op.tags:
5075
      objects.TaggableObject.ValidateTag(tag)
5076
    del_tags = frozenset(self.op.tags)
5077
    cur_tags = self.target.GetTags()
5078
    if not del_tags <= cur_tags:
5079
      diff_tags = del_tags - cur_tags
5080
      diff_names = ["'%s'" % tag for tag in diff_tags]
5081
      diff_names.sort()
5082
      raise errors.OpPrereqError("Tag(s) %s not found" %
5083
                                 (",".join(diff_names)))
5084

    
5085
  def Exec(self, feedback_fn):
5086
    """Remove the tag from the object.
5087

5088
    """
5089
    for tag in self.op.tags:
5090
      self.target.RemoveTag(tag)
5091
    try:
5092
      self.cfg.Update(self.target)
5093
    except errors.ConfigurationError:
5094
      raise errors.OpRetryError("There has been a modification to the"
5095
                                " config file and the operation has been"
5096
                                " aborted. Please retry.")
5097

    
5098

    
5099
class LUTestDelay(NoHooksLU):
5100
  """Sleep for a specified amount of time.
5101

5102
  This LU sleeps on the master and/or nodes for a specified amount of
5103
  time.
5104

5105
  """
5106
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5107
  REQ_BGL = False
5108

    
5109
  def ExpandNames(self):
5110
    """Expand names and set required locks.
5111

5112
    This expands the node list, if any.
5113

5114
    """
5115
    self.needed_locks = {}
5116
    if self.op.on_nodes:
5117
      # _GetWantedNodes can be used here, but is not always appropriate to use
5118
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5119
      # more information.
5120
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5121
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5122

    
5123
  def CheckPrereq(self):
5124
    """Check prerequisites.
5125

5126
    """
5127

    
5128
  def Exec(self, feedback_fn):
5129
    """Do the actual sleep.
5130

5131
    """
5132
    if self.op.on_master:
5133
      if not utils.TestDelay(self.op.duration):
5134
        raise errors.OpExecError("Error during master delay test")
5135
    if self.op.on_nodes:
5136
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5137
      if not result:
5138
        raise errors.OpExecError("Complete failure from rpc call")
5139
      for node, node_result in result.items():
5140
        if not node_result:
5141
          raise errors.OpExecError("Failure during rpc call to node %s,"
5142
                                   " result: %s" % (node, node_result))
5143

    
5144

    
5145
class IAllocator(object):
5146
  """IAllocator framework.
5147

5148
  An IAllocator instance has three sets of attributes:
5149
    - cfg that is needed to query the cluster
5150
    - input data (all members of the _KEYS class attribute are required)
5151
    - four buffer attributes (in|out_data|text), that represent the
5152
      input (to the external script) in text and data structure format,
5153
      and the output from it, again in two formats
5154
    - the result variables from the script (success, info, nodes) for
5155
      easy usage
5156

5157
  """
5158
  _ALLO_KEYS = [
5159
    "mem_size", "disks", "disk_template",
5160
    "os", "tags", "nics", "vcpus",
5161
    ]
5162
  _RELO_KEYS = [
5163
    "relocate_from",
5164
    ]
5165

    
5166
  def __init__(self, lu, mode, name, **kwargs):
5167
    self.lu = lu
5168
    # init buffer variables
5169
    self.in_text = self.out_text = self.in_data = self.out_data = None
5170
    # init all input fields so that pylint is happy
5171
    self.mode = mode
5172
    self.name = name
5173
    self.mem_size = self.disks = self.disk_template = None
5174
    self.os = self.tags = self.nics = self.vcpus = None
5175
    self.relocate_from = None
5176
    # computed fields
5177
    self.required_nodes = None
5178
    # init result fields
5179
    self.success = self.info = self.nodes = None
5180
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5181
      keyset = self._ALLO_KEYS
5182
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5183
      keyset = self._RELO_KEYS
5184
    else:
5185
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5186
                                   " IAllocator" % self.mode)
5187
    for key in kwargs:
5188
      if key not in keyset:
5189
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5190
                                     " IAllocator" % key)
5191
      setattr(self, key, kwargs[key])
5192
    for key in keyset:
5193
      if key not in kwargs:
5194
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5195
                                     " IAllocator" % key)
5196
    self._BuildInputData()
5197

    
5198
  def _ComputeClusterData(self):
5199
    """Compute the generic allocator input data.
5200

5201
    This is the data that is independent of the actual operation.
5202

5203
    """
5204
    cfg = self.lu.cfg
5205
    cluster_info = cfg.GetClusterInfo()
5206
    # cluster data
5207
    data = {
5208
      "version": 1,
5209
      "cluster_name": cfg.GetClusterName(),
5210
      "cluster_tags": list(cluster_info.GetTags()),
5211
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5212
      # we don't have job IDs
5213
      }
5214

    
5215
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5216

    
5217
    # node data
5218
    node_results = {}
5219
    node_list = cfg.GetNodeList()
5220
    # FIXME: here we have only one hypervisor information, but
5221
    # instance can belong to different hypervisors
5222
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5223
                                           cfg.GetHypervisorType())
5224
    for nname in node_list:
5225
      ninfo = cfg.GetNodeInfo(nname)
5226
      if nname not in node_data or not isinstance(node_data[nname], dict):
5227
        raise errors.OpExecError("Can't get data for node %s" % nname)
5228
      remote_info = node_data[nname]
5229
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5230
                   'vg_size', 'vg_free', 'cpu_total']:
5231
        if attr not in remote_info:
5232
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5233
                                   (nname, attr))
5234
        try:
5235
          remote_info[attr] = int(remote_info[attr])
5236
        except ValueError, err:
5237
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5238
                                   " %s" % (nname, attr, str(err)))
5239
      # compute memory used by primary instances
5240
      i_p_mem = i_p_up_mem = 0
5241
      for iinfo in i_list:
5242
        if iinfo.primary_node == nname:
5243
          i_p_mem += iinfo.memory
5244
          if iinfo.status == "up":
5245
            i_p_up_mem += iinfo.memory
5246

    
5247
      # compute memory used by instances
5248
      pnr = {
5249
        "tags": list(ninfo.GetTags()),
5250
        "total_memory": remote_info['memory_total'],
5251
        "reserved_memory": remote_info['memory_dom0'],
5252
        "free_memory": remote_info['memory_free'],
5253
        "i_pri_memory": i_p_mem,
5254
        "i_pri_up_memory": i_p_up_mem,
5255
        "total_disk": remote_info['vg_size'],
5256
        "free_disk": remote_info['vg_free'],
5257
        "primary_ip": ninfo.primary_ip,
5258
        "secondary_ip": ninfo.secondary_ip,
5259
        "total_cpus": remote_info['cpu_total'],
5260
        }
5261
      node_results[nname] = pnr
5262
    data["nodes"] = node_results
5263

    
5264
    # instance data
5265
    instance_data = {}
5266
    for iinfo in i_list:
5267
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5268
                  for n in iinfo.nics]
5269
      pir = {
5270
        "tags": list(iinfo.GetTags()),
5271
        "should_run": iinfo.status == "up",
5272
        "vcpus": iinfo.vcpus,
5273
        "memory": iinfo.memory,
5274
        "os": iinfo.os,
5275
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5276
        "nics": nic_data,
5277
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5278
        "disk_template": iinfo.disk_template,
5279
        "hypervisor": iinfo.hypervisor,
5280
        }
5281
      instance_data[iinfo.name] = pir
5282

    
5283
    data["instances"] = instance_data
5284

    
5285
    self.in_data = data
5286

    
5287
  def _AddNewInstance(self):
5288
    """Add new instance data to allocator structure.
5289

5290
    This in combination with _AllocatorGetClusterData will create the
5291
    correct structure needed as input for the allocator.
5292

5293
    The checks for the completeness of the opcode must have already been
5294
    done.
5295

5296
    """
5297
    data = self.in_data
5298
    if len(self.disks) != 2:
5299
      raise errors.OpExecError("Only two-disk configurations supported")
5300

    
5301
    disk_space = _ComputeDiskSize(self.disk_template,
5302
                                  self.disks[0]["size"], self.disks[1]["size"])
5303

    
5304
    if self.disk_template in constants.DTS_NET_MIRROR:
5305
      self.required_nodes = 2
5306
    else:
5307
      self.required_nodes = 1
5308
    request = {
5309
      "type": "allocate",
5310
      "name": self.name,
5311
      "disk_template": self.disk_template,
5312
      "tags": self.tags,
5313
      "os": self.os,
5314
      "vcpus": self.vcpus,
5315
      "memory": self.mem_size,
5316
      "disks": self.disks,
5317
      "disk_space_total": disk_space,
5318
      "nics": self.nics,
5319
      "required_nodes": self.required_nodes,
5320
      }
5321
    data["request"] = request
5322

    
5323
  def _AddRelocateInstance(self):
5324
    """Add relocate instance data to allocator structure.
5325

5326
    This in combination with _IAllocatorGetClusterData will create the
5327
    correct structure needed as input for the allocator.
5328

5329
    The checks for the completeness of the opcode must have already been
5330
    done.
5331

5332
    """
5333
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5334
    if instance is None:
5335
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5336
                                   " IAllocator" % self.name)
5337

    
5338
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5339
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5340

    
5341
    if len(instance.secondary_nodes) != 1:
5342
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5343

    
5344
    self.required_nodes = 1
5345

    
5346
    disk_space = _ComputeDiskSize(instance.disk_template,
5347
                                  instance.disks[0].size,
5348
                                  instance.disks[1].size)
5349

    
5350
    request = {
5351
      "type": "relocate",
5352
      "name": self.name,
5353
      "disk_space_total": disk_space,
5354
      "required_nodes": self.required_nodes,
5355
      "relocate_from": self.relocate_from,
5356
      }
5357
    self.in_data["request"] = request
5358

    
5359
  def _BuildInputData(self):
5360
    """Build input data structures.
5361

5362
    """
5363
    self._ComputeClusterData()
5364

    
5365
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5366
      self._AddNewInstance()
5367
    else:
5368
      self._AddRelocateInstance()
5369

    
5370
    self.in_text = serializer.Dump(self.in_data)
5371

    
5372
  def Run(self, name, validate=True, call_fn=None):
5373
    """Run an instance allocator and return the results.
5374

5375
    """
5376
    if call_fn is None:
5377
      call_fn = self.lu.rpc.call_iallocator_runner
5378
    data = self.in_text
5379

    
5380
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5381

    
5382
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5383
      raise errors.OpExecError("Invalid result from master iallocator runner")
5384

    
5385
    rcode, stdout, stderr, fail = result
5386

    
5387
    if rcode == constants.IARUN_NOTFOUND:
5388
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5389
    elif rcode == constants.IARUN_FAILURE:
5390
      raise errors.OpExecError("Instance allocator call failed: %s,"
5391
                               " output: %s" % (fail, stdout+stderr))
5392
    self.out_text = stdout
5393
    if validate:
5394
      self._ValidateResult()
5395

    
5396
  def _ValidateResult(self):
5397
    """Process the allocator results.
5398

5399
    This will process and if successful save the result in
5400
    self.out_data and the other parameters.
5401

5402
    """
5403
    try:
5404
      rdict = serializer.Load(self.out_text)
5405
    except Exception, err:
5406
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5407

    
5408
    if not isinstance(rdict, dict):
5409
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5410

    
5411
    for key in "success", "info", "nodes":
5412
      if key not in rdict:
5413
        raise errors.OpExecError("Can't parse iallocator results:"
5414
                                 " missing key '%s'" % key)
5415
      setattr(self, key, rdict[key])
5416

    
5417
    if not isinstance(rdict["nodes"], list):
5418
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5419
                               " is not a list")
5420
    self.out_data = rdict
5421

    
5422

    
5423
class LUTestAllocator(NoHooksLU):
5424
  """Run allocator tests.
5425

5426
  This LU runs the allocator tests
5427

5428
  """
5429
  _OP_REQP = ["direction", "mode", "name"]
5430

    
5431
  def CheckPrereq(self):
5432
    """Check prerequisites.
5433

5434
    This checks the opcode parameters depending on the director and mode test.
5435

5436
    """
5437
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5438
      for attr in ["name", "mem_size", "disks", "disk_template",
5439
                   "os", "tags", "nics", "vcpus"]:
5440
        if not hasattr(self.op, attr):
5441
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5442
                                     attr)
5443
      iname = self.cfg.ExpandInstanceName(self.op.name)
5444
      if iname is not None:
5445
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5446
                                   iname)
5447
      if not isinstance(self.op.nics, list):
5448
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5449
      for row in self.op.nics:
5450
        if (not isinstance(row, dict) or
5451
            "mac" not in row or
5452
            "ip" not in row or
5453
            "bridge" not in row):
5454
          raise errors.OpPrereqError("Invalid contents of the"
5455
                                     " 'nics' parameter")
5456
      if not isinstance(self.op.disks, list):
5457
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5458
      if len(self.op.disks) != 2:
5459
        raise errors.OpPrereqError("Only two-disk configurations supported")
5460
      for row in self.op.disks:
5461
        if (not isinstance(row, dict) or
5462
            "size" not in row or
5463
            not isinstance(row["size"], int) or
5464
            "mode" not in row or
5465
            row["mode"] not in ['r', 'w']):
5466
          raise errors.OpPrereqError("Invalid contents of the"
5467
                                     " 'disks' parameter")
5468
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5469
      if not hasattr(self.op, "name"):
5470
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5471
      fname = self.cfg.ExpandInstanceName(self.op.name)
5472
      if fname is None:
5473
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5474
                                   self.op.name)
5475
      self.op.name = fname
5476
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5477
    else:
5478
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5479
                                 self.op.mode)
5480

    
5481
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5482
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5483
        raise errors.OpPrereqError("Missing allocator name")
5484
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5485
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5486
                                 self.op.direction)
5487

    
5488
  def Exec(self, feedback_fn):
5489
    """Run the allocator test.
5490

5491
    """
5492
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5493
      ial = IAllocator(self,
5494
                       mode=self.op.mode,
5495
                       name=self.op.name,
5496
                       mem_size=self.op.mem_size,
5497
                       disks=self.op.disks,
5498
                       disk_template=self.op.disk_template,
5499
                       os=self.op.os,
5500
                       tags=self.op.tags,
5501
                       nics=self.op.nics,
5502
                       vcpus=self.op.vcpus,
5503
                       )
5504
    else:
5505
      ial = IAllocator(self,
5506
                       mode=self.op.mode,
5507
                       name=self.op.name,
5508
                       relocate_from=list(self.relocate_from),
5509
                       )
5510

    
5511
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5512
      result = ial.in_text
5513
    else:
5514
      ial.Run(self.op.allocator, validate=False)
5515
      result = ial.out_text
5516
    return result