Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 62c9ec92

History | View | Annotate | Download (188.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_BGL = True
69

    
70
  def __init__(self, processor, op, context):
71
    """Constructor for LogicalUnit.
72

73
    This needs to be overriden in derived classes in order to check op
74
    validity.
75

76
    """
77
    self.proc = processor
78
    self.op = op
79
    self.cfg = context.cfg
80
    self.context = context
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90

    
91
    for attr_name in self._OP_REQP:
92
      attr_val = getattr(op, attr_name, None)
93
      if attr_val is None:
94
        raise errors.OpPrereqError("Required parameter '%s' missing" %
95
                                   attr_name)
96

    
97
    if not self.cfg.IsCluster():
98
      raise errors.OpPrereqError("Cluster not initialized yet,"
99
                                 " use 'gnt-cluster init' first.")
100
    if self.REQ_MASTER:
101
      master = self.cfg.GetMasterNode()
102
      if master != utils.HostInfo().name:
103
        raise errors.OpPrereqError("Commands must be run on the master"
104
                                   " node %s" % master)
105

    
106
  def __GetSSH(self):
107
    """Returns the SshRunner object
108

109
    """
110
    if not self.__ssh:
111
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
112
    return self.__ssh
113

    
114
  ssh = property(fget=__GetSSH)
115

    
116
  def ExpandNames(self):
117
    """Expand names for this LU.
118

119
    This method is called before starting to execute the opcode, and it should
120
    update all the parameters of the opcode to their canonical form (e.g. a
121
    short node name must be fully expanded after this method has successfully
122
    completed). This way locking, hooks, logging, ecc. can work correctly.
123

124
    LUs which implement this method must also populate the self.needed_locks
125
    member, as a dict with lock levels as keys, and a list of needed lock names
126
    as values. Rules:
127
      - Use an empty dict if you don't need any lock
128
      - If you don't need any lock at a particular level omit that level
129
      - Don't put anything for the BGL level
130
      - If you want all locks at a level use locking.ALL_SET as a value
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: locking.ALL_SET,
140
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265
  def _LockInstancesNodes(self, primary_only=False):
266
    """Helper function to declare instances' nodes for locking.
267

268
    This function should be called after locking one or more instances to lock
269
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270
    with all primary or secondary nodes for instances already locked and
271
    present in self.needed_locks[locking.LEVEL_INSTANCE].
272

273
    It should be called from DeclareLocks, and for safety only works if
274
    self.recalculate_locks[locking.LEVEL_NODE] is set.
275

276
    In the future it may grow parameters to just lock some instance's nodes, or
277
    to just lock primaries or secondary nodes, if needed.
278

279
    If should be called in DeclareLocks in a way similar to:
280

281
    if level == locking.LEVEL_NODE:
282
      self._LockInstancesNodes()
283

284
    @type primary_only: boolean
285
    @param primary_only: only lock primary nodes of locked instances
286

287
    """
288
    assert locking.LEVEL_NODE in self.recalculate_locks, \
289
      "_LockInstancesNodes helper function called with no nodes to recalculate"
290

    
291
    # TODO: check if we're really been called with the instance locks held
292

    
293
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294
    # future we might want to have different behaviors depending on the value
295
    # of self.recalculate_locks[locking.LEVEL_NODE]
296
    wanted_nodes = []
297
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298
      instance = self.context.cfg.GetInstanceInfo(instance_name)
299
      wanted_nodes.append(instance.primary_node)
300
      if not primary_only:
301
        wanted_nodes.extend(instance.secondary_nodes)
302

    
303
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
307

    
308
    del self.recalculate_locks[locking.LEVEL_NODE]
309

    
310

    
311
class NoHooksLU(LogicalUnit):
312
  """Simple LU which runs no hooks.
313

314
  This LU is intended as a parent for other LogicalUnits which will
315
  run no hooks, in order to reduce duplicate code.
316

317
  """
318
  HPATH = None
319
  HTYPE = None
320

    
321

    
322
def _GetWantedNodes(lu, nodes):
323
  """Returns list of checked and expanded node names.
324

325
  Args:
326
    nodes: List of nodes (strings) or None for all
327

328
  """
329
  if not isinstance(nodes, list):
330
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
331

    
332
  if not nodes:
333
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334
      " non-empty list of nodes whose name is to be expanded.")
335

    
336
  wanted = []
337
  for name in nodes:
338
    node = lu.cfg.ExpandNodeName(name)
339
    if node is None:
340
      raise errors.OpPrereqError("No such node name '%s'" % name)
341
    wanted.append(node)
342

    
343
  return utils.NiceSort(wanted)
344

    
345

    
346
def _GetWantedInstances(lu, instances):
347
  """Returns list of checked and expanded instance names.
348

349
  Args:
350
    instances: List of instances (strings) or None for all
351

352
  """
353
  if not isinstance(instances, list):
354
    raise errors.OpPrereqError("Invalid argument type 'instances'")
355

    
356
  if instances:
357
    wanted = []
358

    
359
    for name in instances:
360
      instance = lu.cfg.ExpandInstanceName(name)
361
      if instance is None:
362
        raise errors.OpPrereqError("No such instance name '%s'" % name)
363
      wanted.append(instance)
364

    
365
  else:
366
    wanted = lu.cfg.GetInstanceList()
367
  return utils.NiceSort(wanted)
368

    
369

    
370
def _CheckOutputFields(static, dynamic, selected):
371
  """Checks whether all selected fields are valid.
372

373
  Args:
374
    static: Static fields
375
    dynamic: Dynamic fields
376

377
  """
378
  static_fields = frozenset(static)
379
  dynamic_fields = frozenset(dynamic)
380

    
381
  all_fields = static_fields | dynamic_fields
382

    
383
  if not all_fields.issuperset(selected):
384
    raise errors.OpPrereqError("Unknown output fields selected: %s"
385
                               % ",".join(frozenset(selected).
386
                                          difference(all_fields)))
387

    
388

    
389
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390
                          memory, vcpus, nics):
391
  """Builds instance related env variables for hooks from single variables.
392

393
  Args:
394
    secondary_nodes: List of secondary nodes as strings
395
  """
396
  env = {
397
    "OP_TARGET": name,
398
    "INSTANCE_NAME": name,
399
    "INSTANCE_PRIMARY": primary_node,
400
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401
    "INSTANCE_OS_TYPE": os_type,
402
    "INSTANCE_STATUS": status,
403
    "INSTANCE_MEMORY": memory,
404
    "INSTANCE_VCPUS": vcpus,
405
  }
406

    
407
  if nics:
408
    nic_count = len(nics)
409
    for idx, (ip, bridge, mac) in enumerate(nics):
410
      if ip is None:
411
        ip = ""
412
      env["INSTANCE_NIC%d_IP" % idx] = ip
413
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
415
  else:
416
    nic_count = 0
417

    
418
  env["INSTANCE_NIC_COUNT"] = nic_count
419

    
420
  return env
421

    
422

    
423
def _BuildInstanceHookEnvByObject(instance, override=None):
424
  """Builds instance related env variables for hooks from an object.
425

426
  Args:
427
    instance: objects.Instance object of instance
428
    override: dict of values to override
429
  """
430
  args = {
431
    'name': instance.name,
432
    'primary_node': instance.primary_node,
433
    'secondary_nodes': instance.secondary_nodes,
434
    'os_type': instance.os,
435
    'status': instance.os,
436
    'memory': instance.memory,
437
    'vcpus': instance.vcpus,
438
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
439
  }
440
  if override:
441
    args.update(override)
442
  return _BuildInstanceHookEnv(**args)
443

    
444

    
445
def _CheckInstanceBridgesExist(instance):
446
  """Check that the brigdes needed by an instance exist.
447

448
  """
449
  # check bridges existance
450
  brlist = [nic.bridge for nic in instance.nics]
451
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
452
    raise errors.OpPrereqError("one or more target bridges %s does not"
453
                               " exist on destination node '%s'" %
454
                               (brlist, instance.primary_node))
455

    
456

    
457
class LUDestroyCluster(NoHooksLU):
458
  """Logical unit for destroying the cluster.
459

460
  """
461
  _OP_REQP = []
462

    
463
  def CheckPrereq(self):
464
    """Check prerequisites.
465

466
    This checks whether the cluster is empty.
467

468
    Any errors are signalled by raising errors.OpPrereqError.
469

470
    """
471
    master = self.cfg.GetMasterNode()
472

    
473
    nodelist = self.cfg.GetNodeList()
474
    if len(nodelist) != 1 or nodelist[0] != master:
475
      raise errors.OpPrereqError("There are still %d node(s) in"
476
                                 " this cluster." % (len(nodelist) - 1))
477
    instancelist = self.cfg.GetInstanceList()
478
    if instancelist:
479
      raise errors.OpPrereqError("There are still %d instance(s) in"
480
                                 " this cluster." % len(instancelist))
481

    
482
  def Exec(self, feedback_fn):
483
    """Destroys the cluster.
484

485
    """
486
    master = self.cfg.GetMasterNode()
487
    if not rpc.call_node_stop_master(master, False):
488
      raise errors.OpExecError("Could not disable the master role")
489
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
490
    utils.CreateBackup(priv_key)
491
    utils.CreateBackup(pub_key)
492
    return master
493

    
494

    
495
class LUVerifyCluster(LogicalUnit):
496
  """Verifies the cluster status.
497

498
  """
499
  HPATH = "cluster-verify"
500
  HTYPE = constants.HTYPE_CLUSTER
501
  _OP_REQP = ["skip_checks"]
502
  REQ_BGL = False
503

    
504
  def ExpandNames(self):
505
    self.needed_locks = {
506
      locking.LEVEL_NODE: locking.ALL_SET,
507
      locking.LEVEL_INSTANCE: locking.ALL_SET,
508
    }
509
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
510

    
511
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
512
                  remote_version, feedback_fn):
513
    """Run multiple tests against a node.
514

515
    Test list:
516
      - compares ganeti version
517
      - checks vg existance and size > 20G
518
      - checks config file checksum
519
      - checks ssh to other nodes
520

521
    Args:
522
      node: name of the node to check
523
      file_list: required list of files
524
      local_cksum: dictionary of local files and their checksums
525

526
    """
527
    # compares ganeti version
528
    local_version = constants.PROTOCOL_VERSION
529
    if not remote_version:
530
      feedback_fn("  - ERROR: connection to %s failed" % (node))
531
      return True
532

    
533
    if local_version != remote_version:
534
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
535
                      (local_version, node, remote_version))
536
      return True
537

    
538
    # checks vg existance and size > 20G
539

    
540
    bad = False
541
    if not vglist:
542
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
543
                      (node,))
544
      bad = True
545
    else:
546
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
547
                                            constants.MIN_VG_SIZE)
548
      if vgstatus:
549
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
550
        bad = True
551

    
552
    # checks config file checksum
553
    # checks ssh to any
554

    
555
    if 'filelist' not in node_result:
556
      bad = True
557
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
558
    else:
559
      remote_cksum = node_result['filelist']
560
      for file_name in file_list:
561
        if file_name not in remote_cksum:
562
          bad = True
563
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
564
        elif remote_cksum[file_name] != local_cksum[file_name]:
565
          bad = True
566
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
567

    
568
    if 'nodelist' not in node_result:
569
      bad = True
570
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
571
    else:
572
      if node_result['nodelist']:
573
        bad = True
574
        for node in node_result['nodelist']:
575
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
576
                          (node, node_result['nodelist'][node]))
577
    if 'node-net-test' not in node_result:
578
      bad = True
579
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
580
    else:
581
      if node_result['node-net-test']:
582
        bad = True
583
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
584
        for node in nlist:
585
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
586
                          (node, node_result['node-net-test'][node]))
587

    
588
    hyp_result = node_result.get('hypervisor', None)
589
    if hyp_result is not None:
590
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
591
    return bad
592

    
593
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
594
                      node_instance, feedback_fn):
595
    """Verify an instance.
596

597
    This function checks to see if the required block devices are
598
    available on the instance's node.
599

600
    """
601
    bad = False
602

    
603
    node_current = instanceconfig.primary_node
604

    
605
    node_vol_should = {}
606
    instanceconfig.MapLVsByNode(node_vol_should)
607

    
608
    for node in node_vol_should:
609
      for volume in node_vol_should[node]:
610
        if node not in node_vol_is or volume not in node_vol_is[node]:
611
          feedback_fn("  - ERROR: volume %s missing on node %s" %
612
                          (volume, node))
613
          bad = True
614

    
615
    if not instanceconfig.status == 'down':
616
      if (node_current not in node_instance or
617
          not instance in node_instance[node_current]):
618
        feedback_fn("  - ERROR: instance %s not running on node %s" %
619
                        (instance, node_current))
620
        bad = True
621

    
622
    for node in node_instance:
623
      if (not node == node_current):
624
        if instance in node_instance[node]:
625
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
626
                          (instance, node))
627
          bad = True
628

    
629
    return bad
630

    
631
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
632
    """Verify if there are any unknown volumes in the cluster.
633

634
    The .os, .swap and backup volumes are ignored. All other volumes are
635
    reported as unknown.
636

637
    """
638
    bad = False
639

    
640
    for node in node_vol_is:
641
      for volume in node_vol_is[node]:
642
        if node not in node_vol_should or volume not in node_vol_should[node]:
643
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
644
                      (volume, node))
645
          bad = True
646
    return bad
647

    
648
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
649
    """Verify the list of running instances.
650

651
    This checks what instances are running but unknown to the cluster.
652

653
    """
654
    bad = False
655
    for node in node_instance:
656
      for runninginstance in node_instance[node]:
657
        if runninginstance not in instancelist:
658
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
659
                          (runninginstance, node))
660
          bad = True
661
    return bad
662

    
663
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
664
    """Verify N+1 Memory Resilience.
665

666
    Check that if one single node dies we can still start all the instances it
667
    was primary for.
668

669
    """
670
    bad = False
671

    
672
    for node, nodeinfo in node_info.iteritems():
673
      # This code checks that every node which is now listed as secondary has
674
      # enough memory to host all instances it is supposed to should a single
675
      # other node in the cluster fail.
676
      # FIXME: not ready for failover to an arbitrary node
677
      # FIXME: does not support file-backed instances
678
      # WARNING: we currently take into account down instances as well as up
679
      # ones, considering that even if they're down someone might want to start
680
      # them even in the event of a node failure.
681
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
682
        needed_mem = 0
683
        for instance in instances:
684
          needed_mem += instance_cfg[instance].memory
685
        if nodeinfo['mfree'] < needed_mem:
686
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
687
                      " failovers should node %s fail" % (node, prinode))
688
          bad = True
689
    return bad
690

    
691
  def CheckPrereq(self):
692
    """Check prerequisites.
693

694
    Transform the list of checks we're going to skip into a set and check that
695
    all its members are valid.
696

697
    """
698
    self.skip_set = frozenset(self.op.skip_checks)
699
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
700
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
701

    
702
  def BuildHooksEnv(self):
703
    """Build hooks env.
704

705
    Cluster-Verify hooks just rone in the post phase and their failure makes
706
    the output be logged in the verify output and the verification to fail.
707

708
    """
709
    all_nodes = self.cfg.GetNodeList()
710
    # TODO: populate the environment with useful information for verify hooks
711
    env = {}
712
    return env, [], all_nodes
713

    
714
  def Exec(self, feedback_fn):
715
    """Verify integrity of cluster, performing various test on nodes.
716

717
    """
718
    bad = False
719
    feedback_fn("* Verifying global settings")
720
    for msg in self.cfg.VerifyConfig():
721
      feedback_fn("  - ERROR: %s" % msg)
722

    
723
    vg_name = self.cfg.GetVGName()
724
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
725
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
726
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
727
    i_non_redundant = [] # Non redundant instances
728
    node_volume = {}
729
    node_instance = {}
730
    node_info = {}
731
    instance_cfg = {}
732

    
733
    # FIXME: verify OS list
734
    # do local checksums
735
    file_names = []
736
    file_names.append(constants.SSL_CERT_FILE)
737
    file_names.append(constants.CLUSTER_CONF_FILE)
738
    local_checksums = utils.FingerprintFiles(file_names)
739

    
740
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
741
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
742
    all_instanceinfo = rpc.call_instance_list(nodelist)
743
    all_vglist = rpc.call_vg_list(nodelist)
744
    node_verify_param = {
745
      'filelist': file_names,
746
      'nodelist': nodelist,
747
      'hypervisor': None,
748
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
749
                        for node in nodeinfo]
750
      }
751
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param,
752
                                      self.cfg.GetClusterName())
753
    all_rversion = rpc.call_version(nodelist)
754
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
755

    
756
    for node in nodelist:
757
      feedback_fn("* Verifying node %s" % node)
758
      result = self._VerifyNode(node, file_names, local_checksums,
759
                                all_vglist[node], all_nvinfo[node],
760
                                all_rversion[node], feedback_fn)
761
      bad = bad or result
762

    
763
      # node_volume
764
      volumeinfo = all_volumeinfo[node]
765

    
766
      if isinstance(volumeinfo, basestring):
767
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
768
                    (node, volumeinfo[-400:].encode('string_escape')))
769
        bad = True
770
        node_volume[node] = {}
771
      elif not isinstance(volumeinfo, dict):
772
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
773
        bad = True
774
        continue
775
      else:
776
        node_volume[node] = volumeinfo
777

    
778
      # node_instance
779
      nodeinstance = all_instanceinfo[node]
780
      if type(nodeinstance) != list:
781
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
782
        bad = True
783
        continue
784

    
785
      node_instance[node] = nodeinstance
786

    
787
      # node_info
788
      nodeinfo = all_ninfo[node]
789
      if not isinstance(nodeinfo, dict):
790
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
791
        bad = True
792
        continue
793

    
794
      try:
795
        node_info[node] = {
796
          "mfree": int(nodeinfo['memory_free']),
797
          "dfree": int(nodeinfo['vg_free']),
798
          "pinst": [],
799
          "sinst": [],
800
          # dictionary holding all instances this node is secondary for,
801
          # grouped by their primary node. Each key is a cluster node, and each
802
          # value is a list of instances which have the key as primary and the
803
          # current node as secondary.  this is handy to calculate N+1 memory
804
          # availability if you can only failover from a primary to its
805
          # secondary.
806
          "sinst-by-pnode": {},
807
        }
808
      except ValueError:
809
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
810
        bad = True
811
        continue
812

    
813
    node_vol_should = {}
814

    
815
    for instance in instancelist:
816
      feedback_fn("* Verifying instance %s" % instance)
817
      inst_config = self.cfg.GetInstanceInfo(instance)
818
      result =  self._VerifyInstance(instance, inst_config, node_volume,
819
                                     node_instance, feedback_fn)
820
      bad = bad or result
821

    
822
      inst_config.MapLVsByNode(node_vol_should)
823

    
824
      instance_cfg[instance] = inst_config
825

    
826
      pnode = inst_config.primary_node
827
      if pnode in node_info:
828
        node_info[pnode]['pinst'].append(instance)
829
      else:
830
        feedback_fn("  - ERROR: instance %s, connection to primary node"
831
                    " %s failed" % (instance, pnode))
832
        bad = True
833

    
834
      # If the instance is non-redundant we cannot survive losing its primary
835
      # node, so we are not N+1 compliant. On the other hand we have no disk
836
      # templates with more than one secondary so that situation is not well
837
      # supported either.
838
      # FIXME: does not support file-backed instances
839
      if len(inst_config.secondary_nodes) == 0:
840
        i_non_redundant.append(instance)
841
      elif len(inst_config.secondary_nodes) > 1:
842
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
843
                    % instance)
844

    
845
      for snode in inst_config.secondary_nodes:
846
        if snode in node_info:
847
          node_info[snode]['sinst'].append(instance)
848
          if pnode not in node_info[snode]['sinst-by-pnode']:
849
            node_info[snode]['sinst-by-pnode'][pnode] = []
850
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
851
        else:
852
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
853
                      " %s failed" % (instance, snode))
854

    
855
    feedback_fn("* Verifying orphan volumes")
856
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
857
                                       feedback_fn)
858
    bad = bad or result
859

    
860
    feedback_fn("* Verifying remaining instances")
861
    result = self._VerifyOrphanInstances(instancelist, node_instance,
862
                                         feedback_fn)
863
    bad = bad or result
864

    
865
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
866
      feedback_fn("* Verifying N+1 Memory redundancy")
867
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
868
      bad = bad or result
869

    
870
    feedback_fn("* Other Notes")
871
    if i_non_redundant:
872
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
873
                  % len(i_non_redundant))
874

    
875
    return not bad
876

    
877
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
878
    """Analize the post-hooks' result, handle it, and send some
879
    nicely-formatted feedback back to the user.
880

881
    Args:
882
      phase: the hooks phase that has just been run
883
      hooks_results: the results of the multi-node hooks rpc call
884
      feedback_fn: function to send feedback back to the caller
885
      lu_result: previous Exec result
886

887
    """
888
    # We only really run POST phase hooks, and are only interested in
889
    # their results
890
    if phase == constants.HOOKS_PHASE_POST:
891
      # Used to change hooks' output to proper indentation
892
      indent_re = re.compile('^', re.M)
893
      feedback_fn("* Hooks Results")
894
      if not hooks_results:
895
        feedback_fn("  - ERROR: general communication failure")
896
        lu_result = 1
897
      else:
898
        for node_name in hooks_results:
899
          show_node_header = True
900
          res = hooks_results[node_name]
901
          if res is False or not isinstance(res, list):
902
            feedback_fn("    Communication failure")
903
            lu_result = 1
904
            continue
905
          for script, hkr, output in res:
906
            if hkr == constants.HKR_FAIL:
907
              # The node header is only shown once, if there are
908
              # failing hooks on that node
909
              if show_node_header:
910
                feedback_fn("  Node %s:" % node_name)
911
                show_node_header = False
912
              feedback_fn("    ERROR: Script %s failed, output:" % script)
913
              output = indent_re.sub('      ', output)
914
              feedback_fn("%s" % output)
915
              lu_result = 1
916

    
917
      return lu_result
918

    
919

    
920
class LUVerifyDisks(NoHooksLU):
921
  """Verifies the cluster disks status.
922

923
  """
924
  _OP_REQP = []
925
  REQ_BGL = False
926

    
927
  def ExpandNames(self):
928
    self.needed_locks = {
929
      locking.LEVEL_NODE: locking.ALL_SET,
930
      locking.LEVEL_INSTANCE: locking.ALL_SET,
931
    }
932
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
933

    
934
  def CheckPrereq(self):
935
    """Check prerequisites.
936

937
    This has no prerequisites.
938

939
    """
940
    pass
941

    
942
  def Exec(self, feedback_fn):
943
    """Verify integrity of cluster disks.
944

945
    """
946
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
947

    
948
    vg_name = self.cfg.GetVGName()
949
    nodes = utils.NiceSort(self.cfg.GetNodeList())
950
    instances = [self.cfg.GetInstanceInfo(name)
951
                 for name in self.cfg.GetInstanceList()]
952

    
953
    nv_dict = {}
954
    for inst in instances:
955
      inst_lvs = {}
956
      if (inst.status != "up" or
957
          inst.disk_template not in constants.DTS_NET_MIRROR):
958
        continue
959
      inst.MapLVsByNode(inst_lvs)
960
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
961
      for node, vol_list in inst_lvs.iteritems():
962
        for vol in vol_list:
963
          nv_dict[(node, vol)] = inst
964

    
965
    if not nv_dict:
966
      return result
967

    
968
    node_lvs = rpc.call_volume_list(nodes, vg_name)
969

    
970
    to_act = set()
971
    for node in nodes:
972
      # node_volume
973
      lvs = node_lvs[node]
974

    
975
      if isinstance(lvs, basestring):
976
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
977
        res_nlvm[node] = lvs
978
      elif not isinstance(lvs, dict):
979
        logger.Info("connection to node %s failed or invalid data returned" %
980
                    (node,))
981
        res_nodes.append(node)
982
        continue
983

    
984
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
985
        inst = nv_dict.pop((node, lv_name), None)
986
        if (not lv_online and inst is not None
987
            and inst.name not in res_instances):
988
          res_instances.append(inst.name)
989

    
990
    # any leftover items in nv_dict are missing LVs, let's arrange the
991
    # data better
992
    for key, inst in nv_dict.iteritems():
993
      if inst.name not in res_missing:
994
        res_missing[inst.name] = []
995
      res_missing[inst.name].append(key)
996

    
997
    return result
998

    
999

    
1000
class LURenameCluster(LogicalUnit):
1001
  """Rename the cluster.
1002

1003
  """
1004
  HPATH = "cluster-rename"
1005
  HTYPE = constants.HTYPE_CLUSTER
1006
  _OP_REQP = ["name"]
1007

    
1008
  def BuildHooksEnv(self):
1009
    """Build hooks env.
1010

1011
    """
1012
    env = {
1013
      "OP_TARGET": self.cfg.GetClusterName(),
1014
      "NEW_NAME": self.op.name,
1015
      }
1016
    mn = self.cfg.GetMasterNode()
1017
    return env, [mn], [mn]
1018

    
1019
  def CheckPrereq(self):
1020
    """Verify that the passed name is a valid one.
1021

1022
    """
1023
    hostname = utils.HostInfo(self.op.name)
1024

    
1025
    new_name = hostname.name
1026
    self.ip = new_ip = hostname.ip
1027
    old_name = self.cfg.GetClusterName()
1028
    old_ip = self.cfg.GetMasterIP()
1029
    if new_name == old_name and new_ip == old_ip:
1030
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1031
                                 " cluster has changed")
1032
    if new_ip != old_ip:
1033
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1034
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1035
                                   " reachable on the network. Aborting." %
1036
                                   new_ip)
1037

    
1038
    self.op.name = new_name
1039

    
1040
  def Exec(self, feedback_fn):
1041
    """Rename the cluster.
1042

1043
    """
1044
    clustername = self.op.name
1045
    ip = self.ip
1046

    
1047
    # shutdown the master IP
1048
    master = self.cfg.GetMasterNode()
1049
    if not rpc.call_node_stop_master(master, False):
1050
      raise errors.OpExecError("Could not disable the master role")
1051

    
1052
    try:
1053
      # modify the sstore
1054
      # TODO: sstore
1055
      ss.SetKey(ss.SS_MASTER_IP, ip)
1056
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1057

    
1058
      # Distribute updated ss config to all nodes
1059
      myself = self.cfg.GetNodeInfo(master)
1060
      dist_nodes = self.cfg.GetNodeList()
1061
      if myself.name in dist_nodes:
1062
        dist_nodes.remove(myself.name)
1063

    
1064
      logger.Debug("Copying updated ssconf data to all nodes")
1065
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1066
        fname = ss.KeyToFilename(keyname)
1067
        result = rpc.call_upload_file(dist_nodes, fname)
1068
        for to_node in dist_nodes:
1069
          if not result[to_node]:
1070
            logger.Error("copy of file %s to node %s failed" %
1071
                         (fname, to_node))
1072
    finally:
1073
      if not rpc.call_node_start_master(master, False):
1074
        logger.Error("Could not re-enable the master role on the master,"
1075
                     " please restart manually.")
1076

    
1077

    
1078
def _RecursiveCheckIfLVMBased(disk):
1079
  """Check if the given disk or its children are lvm-based.
1080

1081
  Args:
1082
    disk: ganeti.objects.Disk object
1083

1084
  Returns:
1085
    boolean indicating whether a LD_LV dev_type was found or not
1086

1087
  """
1088
  if disk.children:
1089
    for chdisk in disk.children:
1090
      if _RecursiveCheckIfLVMBased(chdisk):
1091
        return True
1092
  return disk.dev_type == constants.LD_LV
1093

    
1094

    
1095
class LUSetClusterParams(LogicalUnit):
1096
  """Change the parameters of the cluster.
1097

1098
  """
1099
  HPATH = "cluster-modify"
1100
  HTYPE = constants.HTYPE_CLUSTER
1101
  _OP_REQP = []
1102
  REQ_BGL = False
1103

    
1104
  def ExpandNames(self):
1105
    # FIXME: in the future maybe other cluster params won't require checking on
1106
    # all nodes to be modified.
1107
    self.needed_locks = {
1108
      locking.LEVEL_NODE: locking.ALL_SET,
1109
    }
1110
    self.share_locks[locking.LEVEL_NODE] = 1
1111

    
1112
  def BuildHooksEnv(self):
1113
    """Build hooks env.
1114

1115
    """
1116
    env = {
1117
      "OP_TARGET": self.cfg.GetClusterName(),
1118
      "NEW_VG_NAME": self.op.vg_name,
1119
      }
1120
    mn = self.cfg.GetMasterNode()
1121
    return env, [mn], [mn]
1122

    
1123
  def CheckPrereq(self):
1124
    """Check prerequisites.
1125

1126
    This checks whether the given params don't conflict and
1127
    if the given volume group is valid.
1128

1129
    """
1130
    # FIXME: This only works because there is only one parameter that can be
1131
    # changed or removed.
1132
    if not self.op.vg_name:
1133
      instances = self.cfg.GetAllInstancesInfo().values()
1134
      for inst in instances:
1135
        for disk in inst.disks:
1136
          if _RecursiveCheckIfLVMBased(disk):
1137
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1138
                                       " lvm-based instances exist")
1139

    
1140
    # if vg_name not None, checks given volume group on all nodes
1141
    if self.op.vg_name:
1142
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1143
      vglist = rpc.call_vg_list(node_list)
1144
      for node in node_list:
1145
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1146
                                              constants.MIN_VG_SIZE)
1147
        if vgstatus:
1148
          raise errors.OpPrereqError("Error on node '%s': %s" %
1149
                                     (node, vgstatus))
1150

    
1151
  def Exec(self, feedback_fn):
1152
    """Change the parameters of the cluster.
1153

1154
    """
1155
    if self.op.vg_name != self.cfg.GetVGName():
1156
      self.cfg.SetVGName(self.op.vg_name)
1157
    else:
1158
      feedback_fn("Cluster LVM configuration already in desired"
1159
                  " state, not changing")
1160

    
1161

    
1162
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1163
  """Sleep and poll for an instance's disk to sync.
1164

1165
  """
1166
  if not instance.disks:
1167
    return True
1168

    
1169
  if not oneshot:
1170
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1171

    
1172
  node = instance.primary_node
1173

    
1174
  for dev in instance.disks:
1175
    cfgw.SetDiskID(dev, node)
1176

    
1177
  retries = 0
1178
  while True:
1179
    max_time = 0
1180
    done = True
1181
    cumul_degraded = False
1182
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1183
    if not rstats:
1184
      proc.LogWarning("Can't get any data from node %s" % node)
1185
      retries += 1
1186
      if retries >= 10:
1187
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1188
                                 " aborting." % node)
1189
      time.sleep(6)
1190
      continue
1191
    retries = 0
1192
    for i in range(len(rstats)):
1193
      mstat = rstats[i]
1194
      if mstat is None:
1195
        proc.LogWarning("Can't compute data for node %s/%s" %
1196
                        (node, instance.disks[i].iv_name))
1197
        continue
1198
      # we ignore the ldisk parameter
1199
      perc_done, est_time, is_degraded, _ = mstat
1200
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1201
      if perc_done is not None:
1202
        done = False
1203
        if est_time is not None:
1204
          rem_time = "%d estimated seconds remaining" % est_time
1205
          max_time = est_time
1206
        else:
1207
          rem_time = "no time estimate"
1208
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1209
                     (instance.disks[i].iv_name, perc_done, rem_time))
1210
    if done or oneshot:
1211
      break
1212

    
1213
    time.sleep(min(60, max_time))
1214

    
1215
  if done:
1216
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1217
  return not cumul_degraded
1218

    
1219

    
1220
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1221
  """Check that mirrors are not degraded.
1222

1223
  The ldisk parameter, if True, will change the test from the
1224
  is_degraded attribute (which represents overall non-ok status for
1225
  the device(s)) to the ldisk (representing the local storage status).
1226

1227
  """
1228
  cfgw.SetDiskID(dev, node)
1229
  if ldisk:
1230
    idx = 6
1231
  else:
1232
    idx = 5
1233

    
1234
  result = True
1235
  if on_primary or dev.AssembleOnSecondary():
1236
    rstats = rpc.call_blockdev_find(node, dev)
1237
    if not rstats:
1238
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1239
      result = False
1240
    else:
1241
      result = result and (not rstats[idx])
1242
  if dev.children:
1243
    for child in dev.children:
1244
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1245

    
1246
  return result
1247

    
1248

    
1249
class LUDiagnoseOS(NoHooksLU):
1250
  """Logical unit for OS diagnose/query.
1251

1252
  """
1253
  _OP_REQP = ["output_fields", "names"]
1254
  REQ_BGL = False
1255

    
1256
  def ExpandNames(self):
1257
    if self.op.names:
1258
      raise errors.OpPrereqError("Selective OS query not supported")
1259

    
1260
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1261
    _CheckOutputFields(static=[],
1262
                       dynamic=self.dynamic_fields,
1263
                       selected=self.op.output_fields)
1264

    
1265
    # Lock all nodes, in shared mode
1266
    self.needed_locks = {}
1267
    self.share_locks[locking.LEVEL_NODE] = 1
1268
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1269

    
1270
  def CheckPrereq(self):
1271
    """Check prerequisites.
1272

1273
    """
1274

    
1275
  @staticmethod
1276
  def _DiagnoseByOS(node_list, rlist):
1277
    """Remaps a per-node return list into an a per-os per-node dictionary
1278

1279
      Args:
1280
        node_list: a list with the names of all nodes
1281
        rlist: a map with node names as keys and OS objects as values
1282

1283
      Returns:
1284
        map: a map with osnames as keys and as value another map, with
1285
             nodes as
1286
             keys and list of OS objects as values
1287
             e.g. {"debian-etch": {"node1": [<object>,...],
1288
                                   "node2": [<object>,]}
1289
                  }
1290

1291
    """
1292
    all_os = {}
1293
    for node_name, nr in rlist.iteritems():
1294
      if not nr:
1295
        continue
1296
      for os_obj in nr:
1297
        if os_obj.name not in all_os:
1298
          # build a list of nodes for this os containing empty lists
1299
          # for each node in node_list
1300
          all_os[os_obj.name] = {}
1301
          for nname in node_list:
1302
            all_os[os_obj.name][nname] = []
1303
        all_os[os_obj.name][node_name].append(os_obj)
1304
    return all_os
1305

    
1306
  def Exec(self, feedback_fn):
1307
    """Compute the list of OSes.
1308

1309
    """
1310
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1311
    node_data = rpc.call_os_diagnose(node_list)
1312
    if node_data == False:
1313
      raise errors.OpExecError("Can't gather the list of OSes")
1314
    pol = self._DiagnoseByOS(node_list, node_data)
1315
    output = []
1316
    for os_name, os_data in pol.iteritems():
1317
      row = []
1318
      for field in self.op.output_fields:
1319
        if field == "name":
1320
          val = os_name
1321
        elif field == "valid":
1322
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1323
        elif field == "node_status":
1324
          val = {}
1325
          for node_name, nos_list in os_data.iteritems():
1326
            val[node_name] = [(v.status, v.path) for v in nos_list]
1327
        else:
1328
          raise errors.ParameterError(field)
1329
        row.append(val)
1330
      output.append(row)
1331

    
1332
    return output
1333

    
1334

    
1335
class LURemoveNode(LogicalUnit):
1336
  """Logical unit for removing a node.
1337

1338
  """
1339
  HPATH = "node-remove"
1340
  HTYPE = constants.HTYPE_NODE
1341
  _OP_REQP = ["node_name"]
1342

    
1343
  def BuildHooksEnv(self):
1344
    """Build hooks env.
1345

1346
    This doesn't run on the target node in the pre phase as a failed
1347
    node would then be impossible to remove.
1348

1349
    """
1350
    env = {
1351
      "OP_TARGET": self.op.node_name,
1352
      "NODE_NAME": self.op.node_name,
1353
      }
1354
    all_nodes = self.cfg.GetNodeList()
1355
    all_nodes.remove(self.op.node_name)
1356
    return env, all_nodes, all_nodes
1357

    
1358
  def CheckPrereq(self):
1359
    """Check prerequisites.
1360

1361
    This checks:
1362
     - the node exists in the configuration
1363
     - it does not have primary or secondary instances
1364
     - it's not the master
1365

1366
    Any errors are signalled by raising errors.OpPrereqError.
1367

1368
    """
1369
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1370
    if node is None:
1371
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1372

    
1373
    instance_list = self.cfg.GetInstanceList()
1374

    
1375
    masternode = self.cfg.GetMasterNode()
1376
    if node.name == masternode:
1377
      raise errors.OpPrereqError("Node is the master node,"
1378
                                 " you need to failover first.")
1379

    
1380
    for instance_name in instance_list:
1381
      instance = self.cfg.GetInstanceInfo(instance_name)
1382
      if node.name == instance.primary_node:
1383
        raise errors.OpPrereqError("Instance %s still running on the node,"
1384
                                   " please remove first." % instance_name)
1385
      if node.name in instance.secondary_nodes:
1386
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1387
                                   " please remove first." % instance_name)
1388
    self.op.node_name = node.name
1389
    self.node = node
1390

    
1391
  def Exec(self, feedback_fn):
1392
    """Removes the node from the cluster.
1393

1394
    """
1395
    node = self.node
1396
    logger.Info("stopping the node daemon and removing configs from node %s" %
1397
                node.name)
1398

    
1399
    self.context.RemoveNode(node.name)
1400

    
1401
    rpc.call_node_leave_cluster(node.name)
1402

    
1403

    
1404
class LUQueryNodes(NoHooksLU):
1405
  """Logical unit for querying nodes.
1406

1407
  """
1408
  _OP_REQP = ["output_fields", "names"]
1409
  REQ_BGL = False
1410

    
1411
  def ExpandNames(self):
1412
    self.dynamic_fields = frozenset([
1413
      "dtotal", "dfree",
1414
      "mtotal", "mnode", "mfree",
1415
      "bootid",
1416
      "ctotal",
1417
      ])
1418

    
1419
    self.static_fields = frozenset([
1420
      "name", "pinst_cnt", "sinst_cnt",
1421
      "pinst_list", "sinst_list",
1422
      "pip", "sip", "tags",
1423
      "serial_no",
1424
      ])
1425

    
1426
    _CheckOutputFields(static=self.static_fields,
1427
                       dynamic=self.dynamic_fields,
1428
                       selected=self.op.output_fields)
1429

    
1430
    self.needed_locks = {}
1431
    self.share_locks[locking.LEVEL_NODE] = 1
1432

    
1433
    if self.op.names:
1434
      self.wanted = _GetWantedNodes(self, self.op.names)
1435
    else:
1436
      self.wanted = locking.ALL_SET
1437

    
1438
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1439
    if self.do_locking:
1440
      # if we don't request only static fields, we need to lock the nodes
1441
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1442

    
1443

    
1444
  def CheckPrereq(self):
1445
    """Check prerequisites.
1446

1447
    """
1448
    # The validation of the node list is done in the _GetWantedNodes,
1449
    # if non empty, and if empty, there's no validation to do
1450
    pass
1451

    
1452
  def Exec(self, feedback_fn):
1453
    """Computes the list of nodes and their attributes.
1454

1455
    """
1456
    all_info = self.cfg.GetAllNodesInfo()
1457
    if self.do_locking:
1458
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1459
    elif self.wanted != locking.ALL_SET:
1460
      nodenames = self.wanted
1461
      missing = set(nodenames).difference(all_info.keys())
1462
      if missing:
1463
        raise self.OpExecError(
1464
          "Some nodes were removed before retrieving their data: %s" % missing)
1465
    else:
1466
      nodenames = all_info.keys()
1467
    nodelist = [all_info[name] for name in nodenames]
1468

    
1469
    # begin data gathering
1470

    
1471
    if self.dynamic_fields.intersection(self.op.output_fields):
1472
      live_data = {}
1473
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1474
      for name in nodenames:
1475
        nodeinfo = node_data.get(name, None)
1476
        if nodeinfo:
1477
          live_data[name] = {
1478
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1479
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1480
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1481
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1482
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1483
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1484
            "bootid": nodeinfo['bootid'],
1485
            }
1486
        else:
1487
          live_data[name] = {}
1488
    else:
1489
      live_data = dict.fromkeys(nodenames, {})
1490

    
1491
    node_to_primary = dict([(name, set()) for name in nodenames])
1492
    node_to_secondary = dict([(name, set()) for name in nodenames])
1493

    
1494
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1495
                             "sinst_cnt", "sinst_list"))
1496
    if inst_fields & frozenset(self.op.output_fields):
1497
      instancelist = self.cfg.GetInstanceList()
1498

    
1499
      for instance_name in instancelist:
1500
        inst = self.cfg.GetInstanceInfo(instance_name)
1501
        if inst.primary_node in node_to_primary:
1502
          node_to_primary[inst.primary_node].add(inst.name)
1503
        for secnode in inst.secondary_nodes:
1504
          if secnode in node_to_secondary:
1505
            node_to_secondary[secnode].add(inst.name)
1506

    
1507
    # end data gathering
1508

    
1509
    output = []
1510
    for node in nodelist:
1511
      node_output = []
1512
      for field in self.op.output_fields:
1513
        if field == "name":
1514
          val = node.name
1515
        elif field == "pinst_list":
1516
          val = list(node_to_primary[node.name])
1517
        elif field == "sinst_list":
1518
          val = list(node_to_secondary[node.name])
1519
        elif field == "pinst_cnt":
1520
          val = len(node_to_primary[node.name])
1521
        elif field == "sinst_cnt":
1522
          val = len(node_to_secondary[node.name])
1523
        elif field == "pip":
1524
          val = node.primary_ip
1525
        elif field == "sip":
1526
          val = node.secondary_ip
1527
        elif field == "tags":
1528
          val = list(node.GetTags())
1529
        elif field == "serial_no":
1530
          val = node.serial_no
1531
        elif field in self.dynamic_fields:
1532
          val = live_data[node.name].get(field, None)
1533
        else:
1534
          raise errors.ParameterError(field)
1535
        node_output.append(val)
1536
      output.append(node_output)
1537

    
1538
    return output
1539

    
1540

    
1541
class LUQueryNodeVolumes(NoHooksLU):
1542
  """Logical unit for getting volumes on node(s).
1543

1544
  """
1545
  _OP_REQP = ["nodes", "output_fields"]
1546
  REQ_BGL = False
1547

    
1548
  def ExpandNames(self):
1549
    _CheckOutputFields(static=["node"],
1550
                       dynamic=["phys", "vg", "name", "size", "instance"],
1551
                       selected=self.op.output_fields)
1552

    
1553
    self.needed_locks = {}
1554
    self.share_locks[locking.LEVEL_NODE] = 1
1555
    if not self.op.nodes:
1556
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1557
    else:
1558
      self.needed_locks[locking.LEVEL_NODE] = \
1559
        _GetWantedNodes(self, self.op.nodes)
1560

    
1561
  def CheckPrereq(self):
1562
    """Check prerequisites.
1563

1564
    This checks that the fields required are valid output fields.
1565

1566
    """
1567
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1568

    
1569
  def Exec(self, feedback_fn):
1570
    """Computes the list of nodes and their attributes.
1571

1572
    """
1573
    nodenames = self.nodes
1574
    volumes = rpc.call_node_volumes(nodenames)
1575

    
1576
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1577
             in self.cfg.GetInstanceList()]
1578

    
1579
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1580

    
1581
    output = []
1582
    for node in nodenames:
1583
      if node not in volumes or not volumes[node]:
1584
        continue
1585

    
1586
      node_vols = volumes[node][:]
1587
      node_vols.sort(key=lambda vol: vol['dev'])
1588

    
1589
      for vol in node_vols:
1590
        node_output = []
1591
        for field in self.op.output_fields:
1592
          if field == "node":
1593
            val = node
1594
          elif field == "phys":
1595
            val = vol['dev']
1596
          elif field == "vg":
1597
            val = vol['vg']
1598
          elif field == "name":
1599
            val = vol['name']
1600
          elif field == "size":
1601
            val = int(float(vol['size']))
1602
          elif field == "instance":
1603
            for inst in ilist:
1604
              if node not in lv_by_node[inst]:
1605
                continue
1606
              if vol['name'] in lv_by_node[inst][node]:
1607
                val = inst.name
1608
                break
1609
            else:
1610
              val = '-'
1611
          else:
1612
            raise errors.ParameterError(field)
1613
          node_output.append(str(val))
1614

    
1615
        output.append(node_output)
1616

    
1617
    return output
1618

    
1619

    
1620
class LUAddNode(LogicalUnit):
1621
  """Logical unit for adding node to the cluster.
1622

1623
  """
1624
  HPATH = "node-add"
1625
  HTYPE = constants.HTYPE_NODE
1626
  _OP_REQP = ["node_name"]
1627

    
1628
  def BuildHooksEnv(self):
1629
    """Build hooks env.
1630

1631
    This will run on all nodes before, and on all nodes + the new node after.
1632

1633
    """
1634
    env = {
1635
      "OP_TARGET": self.op.node_name,
1636
      "NODE_NAME": self.op.node_name,
1637
      "NODE_PIP": self.op.primary_ip,
1638
      "NODE_SIP": self.op.secondary_ip,
1639
      }
1640
    nodes_0 = self.cfg.GetNodeList()
1641
    nodes_1 = nodes_0 + [self.op.node_name, ]
1642
    return env, nodes_0, nodes_1
1643

    
1644
  def CheckPrereq(self):
1645
    """Check prerequisites.
1646

1647
    This checks:
1648
     - the new node is not already in the config
1649
     - it is resolvable
1650
     - its parameters (single/dual homed) matches the cluster
1651

1652
    Any errors are signalled by raising errors.OpPrereqError.
1653

1654
    """
1655
    node_name = self.op.node_name
1656
    cfg = self.cfg
1657

    
1658
    dns_data = utils.HostInfo(node_name)
1659

    
1660
    node = dns_data.name
1661
    primary_ip = self.op.primary_ip = dns_data.ip
1662
    secondary_ip = getattr(self.op, "secondary_ip", None)
1663
    if secondary_ip is None:
1664
      secondary_ip = primary_ip
1665
    if not utils.IsValidIP(secondary_ip):
1666
      raise errors.OpPrereqError("Invalid secondary IP given")
1667
    self.op.secondary_ip = secondary_ip
1668

    
1669
    node_list = cfg.GetNodeList()
1670
    if not self.op.readd and node in node_list:
1671
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1672
                                 node)
1673
    elif self.op.readd and node not in node_list:
1674
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1675

    
1676
    for existing_node_name in node_list:
1677
      existing_node = cfg.GetNodeInfo(existing_node_name)
1678

    
1679
      if self.op.readd and node == existing_node_name:
1680
        if (existing_node.primary_ip != primary_ip or
1681
            existing_node.secondary_ip != secondary_ip):
1682
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1683
                                     " address configuration as before")
1684
        continue
1685

    
1686
      if (existing_node.primary_ip == primary_ip or
1687
          existing_node.secondary_ip == primary_ip or
1688
          existing_node.primary_ip == secondary_ip or
1689
          existing_node.secondary_ip == secondary_ip):
1690
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1691
                                   " existing node %s" % existing_node.name)
1692

    
1693
    # check that the type of the node (single versus dual homed) is the
1694
    # same as for the master
1695
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1696
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1697
    newbie_singlehomed = secondary_ip == primary_ip
1698
    if master_singlehomed != newbie_singlehomed:
1699
      if master_singlehomed:
1700
        raise errors.OpPrereqError("The master has no private ip but the"
1701
                                   " new node has one")
1702
      else:
1703
        raise errors.OpPrereqError("The master has a private ip but the"
1704
                                   " new node doesn't have one")
1705

    
1706
    # checks reachablity
1707
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1708
      raise errors.OpPrereqError("Node not reachable by ping")
1709

    
1710
    if not newbie_singlehomed:
1711
      # check reachability from my secondary ip to newbie's secondary ip
1712
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1713
                           source=myself.secondary_ip):
1714
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1715
                                   " based ping to noded port")
1716

    
1717
    self.new_node = objects.Node(name=node,
1718
                                 primary_ip=primary_ip,
1719
                                 secondary_ip=secondary_ip)
1720

    
1721
  def Exec(self, feedback_fn):
1722
    """Adds the new node to the cluster.
1723

1724
    """
1725
    new_node = self.new_node
1726
    node = new_node.name
1727

    
1728
    # check connectivity
1729
    result = rpc.call_version([node])[node]
1730
    if result:
1731
      if constants.PROTOCOL_VERSION == result:
1732
        logger.Info("communication to node %s fine, sw version %s match" %
1733
                    (node, result))
1734
      else:
1735
        raise errors.OpExecError("Version mismatch master version %s,"
1736
                                 " node version %s" %
1737
                                 (constants.PROTOCOL_VERSION, result))
1738
    else:
1739
      raise errors.OpExecError("Cannot get version from the new node")
1740

    
1741
    # setup ssh on node
1742
    logger.Info("copy ssh key to node %s" % node)
1743
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1744
    keyarray = []
1745
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1746
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1747
                priv_key, pub_key]
1748

    
1749
    for i in keyfiles:
1750
      f = open(i, 'r')
1751
      try:
1752
        keyarray.append(f.read())
1753
      finally:
1754
        f.close()
1755

    
1756
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1757
                               keyarray[3], keyarray[4], keyarray[5])
1758

    
1759
    if not result:
1760
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1761

    
1762
    # Add node to our /etc/hosts, and add key to known_hosts
1763
    utils.AddHostToEtcHosts(new_node.name)
1764

    
1765
    if new_node.secondary_ip != new_node.primary_ip:
1766
      if not rpc.call_node_tcp_ping(new_node.name,
1767
                                    constants.LOCALHOST_IP_ADDRESS,
1768
                                    new_node.secondary_ip,
1769
                                    constants.DEFAULT_NODED_PORT,
1770
                                    10, False):
1771
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1772
                                 " you gave (%s). Please fix and re-run this"
1773
                                 " command." % new_node.secondary_ip)
1774

    
1775
    node_verify_list = [self.cfg.GetMasterNode()]
1776
    node_verify_param = {
1777
      'nodelist': [node],
1778
      # TODO: do a node-net-test as well?
1779
    }
1780

    
1781
    result = rpc.call_node_verify(node_verify_list, node_verify_param,
1782
                                  self.cfg.GetClusterName())
1783
    for verifier in node_verify_list:
1784
      if not result[verifier]:
1785
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1786
                                 " for remote verification" % verifier)
1787
      if result[verifier]['nodelist']:
1788
        for failed in result[verifier]['nodelist']:
1789
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1790
                      (verifier, result[verifier]['nodelist'][failed]))
1791
        raise errors.OpExecError("ssh/hostname verification failed.")
1792

    
1793
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1794
    # including the node just added
1795
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1796
    dist_nodes = self.cfg.GetNodeList()
1797
    if not self.op.readd:
1798
      dist_nodes.append(node)
1799
    if myself.name in dist_nodes:
1800
      dist_nodes.remove(myself.name)
1801

    
1802
    logger.Debug("Copying hosts and known_hosts to all nodes")
1803
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1804
      result = rpc.call_upload_file(dist_nodes, fname)
1805
      for to_node in dist_nodes:
1806
        if not result[to_node]:
1807
          logger.Error("copy of file %s to node %s failed" %
1808
                       (fname, to_node))
1809

    
1810
    to_copy = []
1811
    if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
1812
      to_copy.append(constants.VNC_PASSWORD_FILE)
1813
    for fname in to_copy:
1814
      result = rpc.call_upload_file([node], fname)
1815
      if not result[node]:
1816
        logger.Error("could not copy file %s to node %s" % (fname, node))
1817

    
1818
    if self.op.readd:
1819
      self.context.ReaddNode(new_node)
1820
    else:
1821
      self.context.AddNode(new_node)
1822

    
1823

    
1824
class LUQueryClusterInfo(NoHooksLU):
1825
  """Query cluster configuration.
1826

1827
  """
1828
  _OP_REQP = []
1829
  REQ_MASTER = False
1830
  REQ_BGL = False
1831

    
1832
  def ExpandNames(self):
1833
    self.needed_locks = {}
1834

    
1835
  def CheckPrereq(self):
1836
    """No prerequsites needed for this LU.
1837

1838
    """
1839
    pass
1840

    
1841
  def Exec(self, feedback_fn):
1842
    """Return cluster config.
1843

1844
    """
1845
    result = {
1846
      "name": self.cfg.GetClusterName(),
1847
      "software_version": constants.RELEASE_VERSION,
1848
      "protocol_version": constants.PROTOCOL_VERSION,
1849
      "config_version": constants.CONFIG_VERSION,
1850
      "os_api_version": constants.OS_API_VERSION,
1851
      "export_version": constants.EXPORT_VERSION,
1852
      "master": self.cfg.GetMasterNode(),
1853
      "architecture": (platform.architecture()[0], platform.machine()),
1854
      "hypervisor_type": self.cfg.GetHypervisorType(),
1855
      }
1856

    
1857
    return result
1858

    
1859

    
1860
class LUQueryConfigValues(NoHooksLU):
1861
  """Return configuration values.
1862

1863
  """
1864
  _OP_REQP = []
1865
  REQ_BGL = False
1866

    
1867
  def ExpandNames(self):
1868
    self.needed_locks = {}
1869

    
1870
    static_fields = ["cluster_name", "master_node"]
1871
    _CheckOutputFields(static=static_fields,
1872
                       dynamic=[],
1873
                       selected=self.op.output_fields)
1874

    
1875
  def CheckPrereq(self):
1876
    """No prerequisites.
1877

1878
    """
1879
    pass
1880

    
1881
  def Exec(self, feedback_fn):
1882
    """Dump a representation of the cluster config to the standard output.
1883

1884
    """
1885
    values = []
1886
    for field in self.op.output_fields:
1887
      if field == "cluster_name":
1888
        values.append(self.cfg.GetClusterName())
1889
      elif field == "master_node":
1890
        values.append(self.cfg.GetMasterNode())
1891
      else:
1892
        raise errors.ParameterError(field)
1893
    return values
1894

    
1895

    
1896
class LUActivateInstanceDisks(NoHooksLU):
1897
  """Bring up an instance's disks.
1898

1899
  """
1900
  _OP_REQP = ["instance_name"]
1901
  REQ_BGL = False
1902

    
1903
  def ExpandNames(self):
1904
    self._ExpandAndLockInstance()
1905
    self.needed_locks[locking.LEVEL_NODE] = []
1906
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1907

    
1908
  def DeclareLocks(self, level):
1909
    if level == locking.LEVEL_NODE:
1910
      self._LockInstancesNodes()
1911

    
1912
  def CheckPrereq(self):
1913
    """Check prerequisites.
1914

1915
    This checks that the instance is in the cluster.
1916

1917
    """
1918
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1919
    assert self.instance is not None, \
1920
      "Cannot retrieve locked instance %s" % self.op.instance_name
1921

    
1922
  def Exec(self, feedback_fn):
1923
    """Activate the disks.
1924

1925
    """
1926
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1927
    if not disks_ok:
1928
      raise errors.OpExecError("Cannot activate block devices")
1929

    
1930
    return disks_info
1931

    
1932

    
1933
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1934
  """Prepare the block devices for an instance.
1935

1936
  This sets up the block devices on all nodes.
1937

1938
  Args:
1939
    instance: a ganeti.objects.Instance object
1940
    ignore_secondaries: if true, errors on secondary nodes won't result
1941
                        in an error return from the function
1942

1943
  Returns:
1944
    false if the operation failed
1945
    list of (host, instance_visible_name, node_visible_name) if the operation
1946
         suceeded with the mapping from node devices to instance devices
1947
  """
1948
  device_info = []
1949
  disks_ok = True
1950
  iname = instance.name
1951
  # With the two passes mechanism we try to reduce the window of
1952
  # opportunity for the race condition of switching DRBD to primary
1953
  # before handshaking occured, but we do not eliminate it
1954

    
1955
  # The proper fix would be to wait (with some limits) until the
1956
  # connection has been made and drbd transitions from WFConnection
1957
  # into any other network-connected state (Connected, SyncTarget,
1958
  # SyncSource, etc.)
1959

    
1960
  # 1st pass, assemble on all nodes in secondary mode
1961
  for inst_disk in instance.disks:
1962
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1963
      cfg.SetDiskID(node_disk, node)
1964
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1965
      if not result:
1966
        logger.Error("could not prepare block device %s on node %s"
1967
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1968
        if not ignore_secondaries:
1969
          disks_ok = False
1970

    
1971
  # FIXME: race condition on drbd migration to primary
1972

    
1973
  # 2nd pass, do only the primary node
1974
  for inst_disk in instance.disks:
1975
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1976
      if node != instance.primary_node:
1977
        continue
1978
      cfg.SetDiskID(node_disk, node)
1979
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1980
      if not result:
1981
        logger.Error("could not prepare block device %s on node %s"
1982
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1983
        disks_ok = False
1984
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1985

    
1986
  # leave the disks configured for the primary node
1987
  # this is a workaround that would be fixed better by
1988
  # improving the logical/physical id handling
1989
  for disk in instance.disks:
1990
    cfg.SetDiskID(disk, instance.primary_node)
1991

    
1992
  return disks_ok, device_info
1993

    
1994

    
1995
def _StartInstanceDisks(cfg, instance, force):
1996
  """Start the disks of an instance.
1997

1998
  """
1999
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2000
                                           ignore_secondaries=force)
2001
  if not disks_ok:
2002
    _ShutdownInstanceDisks(instance, cfg)
2003
    if force is not None and not force:
2004
      logger.Error("If the message above refers to a secondary node,"
2005
                   " you can retry the operation using '--force'.")
2006
    raise errors.OpExecError("Disk consistency error")
2007

    
2008

    
2009
class LUDeactivateInstanceDisks(NoHooksLU):
2010
  """Shutdown an instance's disks.
2011

2012
  """
2013
  _OP_REQP = ["instance_name"]
2014
  REQ_BGL = False
2015

    
2016
  def ExpandNames(self):
2017
    self._ExpandAndLockInstance()
2018
    self.needed_locks[locking.LEVEL_NODE] = []
2019
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2020

    
2021
  def DeclareLocks(self, level):
2022
    if level == locking.LEVEL_NODE:
2023
      self._LockInstancesNodes()
2024

    
2025
  def CheckPrereq(self):
2026
    """Check prerequisites.
2027

2028
    This checks that the instance is in the cluster.
2029

2030
    """
2031
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2032
    assert self.instance is not None, \
2033
      "Cannot retrieve locked instance %s" % self.op.instance_name
2034

    
2035
  def Exec(self, feedback_fn):
2036
    """Deactivate the disks
2037

2038
    """
2039
    instance = self.instance
2040
    _SafeShutdownInstanceDisks(instance, self.cfg)
2041

    
2042

    
2043
def _SafeShutdownInstanceDisks(instance, cfg):
2044
  """Shutdown block devices of an instance.
2045

2046
  This function checks if an instance is running, before calling
2047
  _ShutdownInstanceDisks.
2048

2049
  """
2050
  ins_l = rpc.call_instance_list([instance.primary_node])
2051
  ins_l = ins_l[instance.primary_node]
2052
  if not type(ins_l) is list:
2053
    raise errors.OpExecError("Can't contact node '%s'" %
2054
                             instance.primary_node)
2055

    
2056
  if instance.name in ins_l:
2057
    raise errors.OpExecError("Instance is running, can't shutdown"
2058
                             " block devices.")
2059

    
2060
  _ShutdownInstanceDisks(instance, cfg)
2061

    
2062

    
2063
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2064
  """Shutdown block devices of an instance.
2065

2066
  This does the shutdown on all nodes of the instance.
2067

2068
  If the ignore_primary is false, errors on the primary node are
2069
  ignored.
2070

2071
  """
2072
  result = True
2073
  for disk in instance.disks:
2074
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2075
      cfg.SetDiskID(top_disk, node)
2076
      if not rpc.call_blockdev_shutdown(node, top_disk):
2077
        logger.Error("could not shutdown block device %s on node %s" %
2078
                     (disk.iv_name, node))
2079
        if not ignore_primary or node != instance.primary_node:
2080
          result = False
2081
  return result
2082

    
2083

    
2084
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2085
  """Checks if a node has enough free memory.
2086

2087
  This function check if a given node has the needed amount of free
2088
  memory. In case the node has less memory or we cannot get the
2089
  information from the node, this function raise an OpPrereqError
2090
  exception.
2091

2092
  Args:
2093
    - cfg: a ConfigWriter instance
2094
    - node: the node name
2095
    - reason: string to use in the error message
2096
    - requested: the amount of memory in MiB
2097

2098
  """
2099
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2100
  if not nodeinfo or not isinstance(nodeinfo, dict):
2101
    raise errors.OpPrereqError("Could not contact node %s for resource"
2102
                             " information" % (node,))
2103

    
2104
  free_mem = nodeinfo[node].get('memory_free')
2105
  if not isinstance(free_mem, int):
2106
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2107
                             " was '%s'" % (node, free_mem))
2108
  if requested > free_mem:
2109
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2110
                             " needed %s MiB, available %s MiB" %
2111
                             (node, reason, requested, free_mem))
2112

    
2113

    
2114
class LUStartupInstance(LogicalUnit):
2115
  """Starts an instance.
2116

2117
  """
2118
  HPATH = "instance-start"
2119
  HTYPE = constants.HTYPE_INSTANCE
2120
  _OP_REQP = ["instance_name", "force"]
2121
  REQ_BGL = False
2122

    
2123
  def ExpandNames(self):
2124
    self._ExpandAndLockInstance()
2125
    self.needed_locks[locking.LEVEL_NODE] = []
2126
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2127

    
2128
  def DeclareLocks(self, level):
2129
    if level == locking.LEVEL_NODE:
2130
      self._LockInstancesNodes()
2131

    
2132
  def BuildHooksEnv(self):
2133
    """Build hooks env.
2134

2135
    This runs on master, primary and secondary nodes of the instance.
2136

2137
    """
2138
    env = {
2139
      "FORCE": self.op.force,
2140
      }
2141
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2142
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2143
          list(self.instance.secondary_nodes))
2144
    return env, nl, nl
2145

    
2146
  def CheckPrereq(self):
2147
    """Check prerequisites.
2148

2149
    This checks that the instance is in the cluster.
2150

2151
    """
2152
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2153
    assert self.instance is not None, \
2154
      "Cannot retrieve locked instance %s" % self.op.instance_name
2155

    
2156
    # check bridges existance
2157
    _CheckInstanceBridgesExist(instance)
2158

    
2159
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2160
                         "starting instance %s" % instance.name,
2161
                         instance.memory)
2162

    
2163
  def Exec(self, feedback_fn):
2164
    """Start the instance.
2165

2166
    """
2167
    instance = self.instance
2168
    force = self.op.force
2169
    extra_args = getattr(self.op, "extra_args", "")
2170

    
2171
    self.cfg.MarkInstanceUp(instance.name)
2172

    
2173
    node_current = instance.primary_node
2174

    
2175
    _StartInstanceDisks(self.cfg, instance, force)
2176

    
2177
    if not rpc.call_instance_start(node_current, instance, extra_args):
2178
      _ShutdownInstanceDisks(instance, self.cfg)
2179
      raise errors.OpExecError("Could not start instance")
2180

    
2181

    
2182
class LURebootInstance(LogicalUnit):
2183
  """Reboot an instance.
2184

2185
  """
2186
  HPATH = "instance-reboot"
2187
  HTYPE = constants.HTYPE_INSTANCE
2188
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2189
  REQ_BGL = False
2190

    
2191
  def ExpandNames(self):
2192
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2193
                                   constants.INSTANCE_REBOOT_HARD,
2194
                                   constants.INSTANCE_REBOOT_FULL]:
2195
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2196
                                  (constants.INSTANCE_REBOOT_SOFT,
2197
                                   constants.INSTANCE_REBOOT_HARD,
2198
                                   constants.INSTANCE_REBOOT_FULL))
2199
    self._ExpandAndLockInstance()
2200
    self.needed_locks[locking.LEVEL_NODE] = []
2201
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2202

    
2203
  def DeclareLocks(self, level):
2204
    if level == locking.LEVEL_NODE:
2205
      primary_only = not constants.INSTANCE_REBOOT_FULL
2206
      self._LockInstancesNodes(primary_only=primary_only)
2207

    
2208
  def BuildHooksEnv(self):
2209
    """Build hooks env.
2210

2211
    This runs on master, primary and secondary nodes of the instance.
2212

2213
    """
2214
    env = {
2215
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2216
      }
2217
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2218
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2219
          list(self.instance.secondary_nodes))
2220
    return env, nl, nl
2221

    
2222
  def CheckPrereq(self):
2223
    """Check prerequisites.
2224

2225
    This checks that the instance is in the cluster.
2226

2227
    """
2228
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2229
    assert self.instance is not None, \
2230
      "Cannot retrieve locked instance %s" % self.op.instance_name
2231

    
2232
    # check bridges existance
2233
    _CheckInstanceBridgesExist(instance)
2234

    
2235
  def Exec(self, feedback_fn):
2236
    """Reboot the instance.
2237

2238
    """
2239
    instance = self.instance
2240
    ignore_secondaries = self.op.ignore_secondaries
2241
    reboot_type = self.op.reboot_type
2242
    extra_args = getattr(self.op, "extra_args", "")
2243

    
2244
    node_current = instance.primary_node
2245

    
2246
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2247
                       constants.INSTANCE_REBOOT_HARD]:
2248
      if not rpc.call_instance_reboot(node_current, instance,
2249
                                      reboot_type, extra_args):
2250
        raise errors.OpExecError("Could not reboot instance")
2251
    else:
2252
      if not rpc.call_instance_shutdown(node_current, instance):
2253
        raise errors.OpExecError("could not shutdown instance for full reboot")
2254
      _ShutdownInstanceDisks(instance, self.cfg)
2255
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2256
      if not rpc.call_instance_start(node_current, instance, extra_args):
2257
        _ShutdownInstanceDisks(instance, self.cfg)
2258
        raise errors.OpExecError("Could not start instance for full reboot")
2259

    
2260
    self.cfg.MarkInstanceUp(instance.name)
2261

    
2262

    
2263
class LUShutdownInstance(LogicalUnit):
2264
  """Shutdown an instance.
2265

2266
  """
2267
  HPATH = "instance-stop"
2268
  HTYPE = constants.HTYPE_INSTANCE
2269
  _OP_REQP = ["instance_name"]
2270
  REQ_BGL = False
2271

    
2272
  def ExpandNames(self):
2273
    self._ExpandAndLockInstance()
2274
    self.needed_locks[locking.LEVEL_NODE] = []
2275
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2276

    
2277
  def DeclareLocks(self, level):
2278
    if level == locking.LEVEL_NODE:
2279
      self._LockInstancesNodes()
2280

    
2281
  def BuildHooksEnv(self):
2282
    """Build hooks env.
2283

2284
    This runs on master, primary and secondary nodes of the instance.
2285

2286
    """
2287
    env = _BuildInstanceHookEnvByObject(self.instance)
2288
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2289
          list(self.instance.secondary_nodes))
2290
    return env, nl, nl
2291

    
2292
  def CheckPrereq(self):
2293
    """Check prerequisites.
2294

2295
    This checks that the instance is in the cluster.
2296

2297
    """
2298
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2299
    assert self.instance is not None, \
2300
      "Cannot retrieve locked instance %s" % self.op.instance_name
2301

    
2302
  def Exec(self, feedback_fn):
2303
    """Shutdown the instance.
2304

2305
    """
2306
    instance = self.instance
2307
    node_current = instance.primary_node
2308
    self.cfg.MarkInstanceDown(instance.name)
2309
    if not rpc.call_instance_shutdown(node_current, instance):
2310
      logger.Error("could not shutdown instance")
2311

    
2312
    _ShutdownInstanceDisks(instance, self.cfg)
2313

    
2314

    
2315
class LUReinstallInstance(LogicalUnit):
2316
  """Reinstall an instance.
2317

2318
  """
2319
  HPATH = "instance-reinstall"
2320
  HTYPE = constants.HTYPE_INSTANCE
2321
  _OP_REQP = ["instance_name"]
2322
  REQ_BGL = False
2323

    
2324
  def ExpandNames(self):
2325
    self._ExpandAndLockInstance()
2326
    self.needed_locks[locking.LEVEL_NODE] = []
2327
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2328

    
2329
  def DeclareLocks(self, level):
2330
    if level == locking.LEVEL_NODE:
2331
      self._LockInstancesNodes()
2332

    
2333
  def BuildHooksEnv(self):
2334
    """Build hooks env.
2335

2336
    This runs on master, primary and secondary nodes of the instance.
2337

2338
    """
2339
    env = _BuildInstanceHookEnvByObject(self.instance)
2340
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2341
          list(self.instance.secondary_nodes))
2342
    return env, nl, nl
2343

    
2344
  def CheckPrereq(self):
2345
    """Check prerequisites.
2346

2347
    This checks that the instance is in the cluster and is not running.
2348

2349
    """
2350
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2351
    assert instance is not None, \
2352
      "Cannot retrieve locked instance %s" % self.op.instance_name
2353

    
2354
    if instance.disk_template == constants.DT_DISKLESS:
2355
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2356
                                 self.op.instance_name)
2357
    if instance.status != "down":
2358
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2359
                                 self.op.instance_name)
2360
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2361
    if remote_info:
2362
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2363
                                 (self.op.instance_name,
2364
                                  instance.primary_node))
2365

    
2366
    self.op.os_type = getattr(self.op, "os_type", None)
2367
    if self.op.os_type is not None:
2368
      # OS verification
2369
      pnode = self.cfg.GetNodeInfo(
2370
        self.cfg.ExpandNodeName(instance.primary_node))
2371
      if pnode is None:
2372
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2373
                                   self.op.pnode)
2374
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2375
      if not os_obj:
2376
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2377
                                   " primary node"  % self.op.os_type)
2378

    
2379
    self.instance = instance
2380

    
2381
  def Exec(self, feedback_fn):
2382
    """Reinstall the instance.
2383

2384
    """
2385
    inst = self.instance
2386

    
2387
    if self.op.os_type is not None:
2388
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2389
      inst.os = self.op.os_type
2390
      self.cfg.Update(inst)
2391

    
2392
    _StartInstanceDisks(self.cfg, inst, None)
2393
    try:
2394
      feedback_fn("Running the instance OS create scripts...")
2395
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2396
        raise errors.OpExecError("Could not install OS for instance %s"
2397
                                 " on node %s" %
2398
                                 (inst.name, inst.primary_node))
2399
    finally:
2400
      _ShutdownInstanceDisks(inst, self.cfg)
2401

    
2402

    
2403
class LURenameInstance(LogicalUnit):
2404
  """Rename an instance.
2405

2406
  """
2407
  HPATH = "instance-rename"
2408
  HTYPE = constants.HTYPE_INSTANCE
2409
  _OP_REQP = ["instance_name", "new_name"]
2410

    
2411
  def BuildHooksEnv(self):
2412
    """Build hooks env.
2413

2414
    This runs on master, primary and secondary nodes of the instance.
2415

2416
    """
2417
    env = _BuildInstanceHookEnvByObject(self.instance)
2418
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2419
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2420
          list(self.instance.secondary_nodes))
2421
    return env, nl, nl
2422

    
2423
  def CheckPrereq(self):
2424
    """Check prerequisites.
2425

2426
    This checks that the instance is in the cluster and is not running.
2427

2428
    """
2429
    instance = self.cfg.GetInstanceInfo(
2430
      self.cfg.ExpandInstanceName(self.op.instance_name))
2431
    if instance is None:
2432
      raise errors.OpPrereqError("Instance '%s' not known" %
2433
                                 self.op.instance_name)
2434
    if instance.status != "down":
2435
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2436
                                 self.op.instance_name)
2437
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2438
    if remote_info:
2439
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2440
                                 (self.op.instance_name,
2441
                                  instance.primary_node))
2442
    self.instance = instance
2443

    
2444
    # new name verification
2445
    name_info = utils.HostInfo(self.op.new_name)
2446

    
2447
    self.op.new_name = new_name = name_info.name
2448
    instance_list = self.cfg.GetInstanceList()
2449
    if new_name in instance_list:
2450
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2451
                                 new_name)
2452

    
2453
    if not getattr(self.op, "ignore_ip", False):
2454
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2455
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2456
                                   (name_info.ip, new_name))
2457

    
2458

    
2459
  def Exec(self, feedback_fn):
2460
    """Reinstall the instance.
2461

2462
    """
2463
    inst = self.instance
2464
    old_name = inst.name
2465

    
2466
    if inst.disk_template == constants.DT_FILE:
2467
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2468

    
2469
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2470
    # Change the instance lock. This is definitely safe while we hold the BGL
2471
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2472
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2473

    
2474
    # re-read the instance from the configuration after rename
2475
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2476

    
2477
    if inst.disk_template == constants.DT_FILE:
2478
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2479
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2480
                                                old_file_storage_dir,
2481
                                                new_file_storage_dir)
2482

    
2483
      if not result:
2484
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2485
                                 " directory '%s' to '%s' (but the instance"
2486
                                 " has been renamed in Ganeti)" % (
2487
                                 inst.primary_node, old_file_storage_dir,
2488
                                 new_file_storage_dir))
2489

    
2490
      if not result[0]:
2491
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2492
                                 " (but the instance has been renamed in"
2493
                                 " Ganeti)" % (old_file_storage_dir,
2494
                                               new_file_storage_dir))
2495

    
2496
    _StartInstanceDisks(self.cfg, inst, None)
2497
    try:
2498
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2499
                                          "sda", "sdb"):
2500
        msg = ("Could not run OS rename script for instance %s on node %s"
2501
               " (but the instance has been renamed in Ganeti)" %
2502
               (inst.name, inst.primary_node))
2503
        logger.Error(msg)
2504
    finally:
2505
      _ShutdownInstanceDisks(inst, self.cfg)
2506

    
2507

    
2508
class LURemoveInstance(LogicalUnit):
2509
  """Remove an instance.
2510

2511
  """
2512
  HPATH = "instance-remove"
2513
  HTYPE = constants.HTYPE_INSTANCE
2514
  _OP_REQP = ["instance_name", "ignore_failures"]
2515
  REQ_BGL = False
2516

    
2517
  def ExpandNames(self):
2518
    self._ExpandAndLockInstance()
2519
    self.needed_locks[locking.LEVEL_NODE] = []
2520
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2521

    
2522
  def DeclareLocks(self, level):
2523
    if level == locking.LEVEL_NODE:
2524
      self._LockInstancesNodes()
2525

    
2526
  def BuildHooksEnv(self):
2527
    """Build hooks env.
2528

2529
    This runs on master, primary and secondary nodes of the instance.
2530

2531
    """
2532
    env = _BuildInstanceHookEnvByObject(self.instance)
2533
    nl = [self.cfg.GetMasterNode()]
2534
    return env, nl, nl
2535

    
2536
  def CheckPrereq(self):
2537
    """Check prerequisites.
2538

2539
    This checks that the instance is in the cluster.
2540

2541
    """
2542
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2543
    assert self.instance is not None, \
2544
      "Cannot retrieve locked instance %s" % self.op.instance_name
2545

    
2546
  def Exec(self, feedback_fn):
2547
    """Remove the instance.
2548

2549
    """
2550
    instance = self.instance
2551
    logger.Info("shutting down instance %s on node %s" %
2552
                (instance.name, instance.primary_node))
2553

    
2554
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2555
      if self.op.ignore_failures:
2556
        feedback_fn("Warning: can't shutdown instance")
2557
      else:
2558
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2559
                                 (instance.name, instance.primary_node))
2560

    
2561
    logger.Info("removing block devices for instance %s" % instance.name)
2562

    
2563
    if not _RemoveDisks(instance, self.cfg):
2564
      if self.op.ignore_failures:
2565
        feedback_fn("Warning: can't remove instance's disks")
2566
      else:
2567
        raise errors.OpExecError("Can't remove instance's disks")
2568

    
2569
    logger.Info("removing instance %s out of cluster config" % instance.name)
2570

    
2571
    self.cfg.RemoveInstance(instance.name)
2572
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2573

    
2574

    
2575
class LUQueryInstances(NoHooksLU):
2576
  """Logical unit for querying instances.
2577

2578
  """
2579
  _OP_REQP = ["output_fields", "names"]
2580
  REQ_BGL = False
2581

    
2582
  def ExpandNames(self):
2583
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2584
    self.static_fields = frozenset([
2585
      "name", "os", "pnode", "snodes",
2586
      "admin_state", "admin_ram",
2587
      "disk_template", "ip", "mac", "bridge",
2588
      "sda_size", "sdb_size", "vcpus", "tags",
2589
      "network_port", "kernel_path", "initrd_path",
2590
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2591
      "hvm_cdrom_image_path", "hvm_nic_type",
2592
      "hvm_disk_type", "vnc_bind_address",
2593
      "serial_no",
2594
      ])
2595
    _CheckOutputFields(static=self.static_fields,
2596
                       dynamic=self.dynamic_fields,
2597
                       selected=self.op.output_fields)
2598

    
2599
    self.needed_locks = {}
2600
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2601
    self.share_locks[locking.LEVEL_NODE] = 1
2602

    
2603
    if self.op.names:
2604
      self.wanted = _GetWantedInstances(self, self.op.names)
2605
    else:
2606
      self.wanted = locking.ALL_SET
2607

    
2608
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2609
    if self.do_locking:
2610
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2611
      self.needed_locks[locking.LEVEL_NODE] = []
2612
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2613

    
2614
  def DeclareLocks(self, level):
2615
    if level == locking.LEVEL_NODE and self.do_locking:
2616
      self._LockInstancesNodes()
2617

    
2618
  def CheckPrereq(self):
2619
    """Check prerequisites.
2620

2621
    """
2622
    pass
2623

    
2624
  def Exec(self, feedback_fn):
2625
    """Computes the list of nodes and their attributes.
2626

2627
    """
2628
    all_info = self.cfg.GetAllInstancesInfo()
2629
    if self.do_locking:
2630
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2631
    elif self.wanted != locking.ALL_SET:
2632
      instance_names = self.wanted
2633
      missing = set(instance_names).difference(all_info.keys())
2634
      if missing:
2635
        raise self.OpExecError(
2636
          "Some instances were removed before retrieving their data: %s"
2637
          % missing)
2638
    else:
2639
      instance_names = all_info.keys()
2640
    instance_list = [all_info[iname] for iname in instance_names]
2641

    
2642
    # begin data gathering
2643

    
2644
    nodes = frozenset([inst.primary_node for inst in instance_list])
2645

    
2646
    bad_nodes = []
2647
    if self.dynamic_fields.intersection(self.op.output_fields):
2648
      live_data = {}
2649
      node_data = rpc.call_all_instances_info(nodes)
2650
      for name in nodes:
2651
        result = node_data[name]
2652
        if result:
2653
          live_data.update(result)
2654
        elif result == False:
2655
          bad_nodes.append(name)
2656
        # else no instance is alive
2657
    else:
2658
      live_data = dict([(name, {}) for name in instance_names])
2659

    
2660
    # end data gathering
2661

    
2662
    output = []
2663
    for instance in instance_list:
2664
      iout = []
2665
      for field in self.op.output_fields:
2666
        if field == "name":
2667
          val = instance.name
2668
        elif field == "os":
2669
          val = instance.os
2670
        elif field == "pnode":
2671
          val = instance.primary_node
2672
        elif field == "snodes":
2673
          val = list(instance.secondary_nodes)
2674
        elif field == "admin_state":
2675
          val = (instance.status != "down")
2676
        elif field == "oper_state":
2677
          if instance.primary_node in bad_nodes:
2678
            val = None
2679
          else:
2680
            val = bool(live_data.get(instance.name))
2681
        elif field == "status":
2682
          if instance.primary_node in bad_nodes:
2683
            val = "ERROR_nodedown"
2684
          else:
2685
            running = bool(live_data.get(instance.name))
2686
            if running:
2687
              if instance.status != "down":
2688
                val = "running"
2689
              else:
2690
                val = "ERROR_up"
2691
            else:
2692
              if instance.status != "down":
2693
                val = "ERROR_down"
2694
              else:
2695
                val = "ADMIN_down"
2696
        elif field == "admin_ram":
2697
          val = instance.memory
2698
        elif field == "oper_ram":
2699
          if instance.primary_node in bad_nodes:
2700
            val = None
2701
          elif instance.name in live_data:
2702
            val = live_data[instance.name].get("memory", "?")
2703
          else:
2704
            val = "-"
2705
        elif field == "disk_template":
2706
          val = instance.disk_template
2707
        elif field == "ip":
2708
          val = instance.nics[0].ip
2709
        elif field == "bridge":
2710
          val = instance.nics[0].bridge
2711
        elif field == "mac":
2712
          val = instance.nics[0].mac
2713
        elif field == "sda_size" or field == "sdb_size":
2714
          disk = instance.FindDisk(field[:3])
2715
          if disk is None:
2716
            val = None
2717
          else:
2718
            val = disk.size
2719
        elif field == "vcpus":
2720
          val = instance.vcpus
2721
        elif field == "tags":
2722
          val = list(instance.GetTags())
2723
        elif field == "serial_no":
2724
          val = instance.serial_no
2725
        elif field in ("network_port", "kernel_path", "initrd_path",
2726
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2727
                       "hvm_cdrom_image_path", "hvm_nic_type",
2728
                       "hvm_disk_type", "vnc_bind_address"):
2729
          val = getattr(instance, field, None)
2730
          if val is not None:
2731
            pass
2732
          elif field in ("hvm_nic_type", "hvm_disk_type",
2733
                         "kernel_path", "initrd_path"):
2734
            val = "default"
2735
          else:
2736
            val = "-"
2737
        else:
2738
          raise errors.ParameterError(field)
2739
        iout.append(val)
2740
      output.append(iout)
2741

    
2742
    return output
2743

    
2744

    
2745
class LUFailoverInstance(LogicalUnit):
2746
  """Failover an instance.
2747

2748
  """
2749
  HPATH = "instance-failover"
2750
  HTYPE = constants.HTYPE_INSTANCE
2751
  _OP_REQP = ["instance_name", "ignore_consistency"]
2752
  REQ_BGL = False
2753

    
2754
  def ExpandNames(self):
2755
    self._ExpandAndLockInstance()
2756
    self.needed_locks[locking.LEVEL_NODE] = []
2757
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2758

    
2759
  def DeclareLocks(self, level):
2760
    if level == locking.LEVEL_NODE:
2761
      self._LockInstancesNodes()
2762

    
2763
  def BuildHooksEnv(self):
2764
    """Build hooks env.
2765

2766
    This runs on master, primary and secondary nodes of the instance.
2767

2768
    """
2769
    env = {
2770
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2771
      }
2772
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2773
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2774
    return env, nl, nl
2775

    
2776
  def CheckPrereq(self):
2777
    """Check prerequisites.
2778

2779
    This checks that the instance is in the cluster.
2780

2781
    """
2782
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2783
    assert self.instance is not None, \
2784
      "Cannot retrieve locked instance %s" % self.op.instance_name
2785

    
2786
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2787
      raise errors.OpPrereqError("Instance's disk layout is not"
2788
                                 " network mirrored, cannot failover.")
2789

    
2790
    secondary_nodes = instance.secondary_nodes
2791
    if not secondary_nodes:
2792
      raise errors.ProgrammerError("no secondary node but using "
2793
                                   "a mirrored disk template")
2794

    
2795
    target_node = secondary_nodes[0]
2796
    # check memory requirements on the secondary node
2797
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2798
                         instance.name, instance.memory)
2799

    
2800
    # check bridge existance
2801
    brlist = [nic.bridge for nic in instance.nics]
2802
    if not rpc.call_bridges_exist(target_node, brlist):
2803
      raise errors.OpPrereqError("One or more target bridges %s does not"
2804
                                 " exist on destination node '%s'" %
2805
                                 (brlist, target_node))
2806

    
2807
  def Exec(self, feedback_fn):
2808
    """Failover an instance.
2809

2810
    The failover is done by shutting it down on its present node and
2811
    starting it on the secondary.
2812

2813
    """
2814
    instance = self.instance
2815

    
2816
    source_node = instance.primary_node
2817
    target_node = instance.secondary_nodes[0]
2818

    
2819
    feedback_fn("* checking disk consistency between source and target")
2820
    for dev in instance.disks:
2821
      # for drbd, these are drbd over lvm
2822
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2823
        if instance.status == "up" and not self.op.ignore_consistency:
2824
          raise errors.OpExecError("Disk %s is degraded on target node,"
2825
                                   " aborting failover." % dev.iv_name)
2826

    
2827
    feedback_fn("* shutting down instance on source node")
2828
    logger.Info("Shutting down instance %s on node %s" %
2829
                (instance.name, source_node))
2830

    
2831
    if not rpc.call_instance_shutdown(source_node, instance):
2832
      if self.op.ignore_consistency:
2833
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2834
                     " anyway. Please make sure node %s is down"  %
2835
                     (instance.name, source_node, source_node))
2836
      else:
2837
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2838
                                 (instance.name, source_node))
2839

    
2840
    feedback_fn("* deactivating the instance's disks on source node")
2841
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2842
      raise errors.OpExecError("Can't shut down the instance's disks.")
2843

    
2844
    instance.primary_node = target_node
2845
    # distribute new instance config to the other nodes
2846
    self.cfg.Update(instance)
2847

    
2848
    # Only start the instance if it's marked as up
2849
    if instance.status == "up":
2850
      feedback_fn("* activating the instance's disks on target node")
2851
      logger.Info("Starting instance %s on node %s" %
2852
                  (instance.name, target_node))
2853

    
2854
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2855
                                               ignore_secondaries=True)
2856
      if not disks_ok:
2857
        _ShutdownInstanceDisks(instance, self.cfg)
2858
        raise errors.OpExecError("Can't activate the instance's disks")
2859

    
2860
      feedback_fn("* starting the instance on the target node")
2861
      if not rpc.call_instance_start(target_node, instance, None):
2862
        _ShutdownInstanceDisks(instance, self.cfg)
2863
        raise errors.OpExecError("Could not start instance %s on node %s." %
2864
                                 (instance.name, target_node))
2865

    
2866

    
2867
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2868
  """Create a tree of block devices on the primary node.
2869

2870
  This always creates all devices.
2871

2872
  """
2873
  if device.children:
2874
    for child in device.children:
2875
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2876
        return False
2877

    
2878
  cfg.SetDiskID(device, node)
2879
  new_id = rpc.call_blockdev_create(node, device, device.size,
2880
                                    instance.name, True, info)
2881
  if not new_id:
2882
    return False
2883
  if device.physical_id is None:
2884
    device.physical_id = new_id
2885
  return True
2886

    
2887

    
2888
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2889
  """Create a tree of block devices on a secondary node.
2890

2891
  If this device type has to be created on secondaries, create it and
2892
  all its children.
2893

2894
  If not, just recurse to children keeping the same 'force' value.
2895

2896
  """
2897
  if device.CreateOnSecondary():
2898
    force = True
2899
  if device.children:
2900
    for child in device.children:
2901
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2902
                                        child, force, info):
2903
        return False
2904

    
2905
  if not force:
2906
    return True
2907
  cfg.SetDiskID(device, node)
2908
  new_id = rpc.call_blockdev_create(node, device, device.size,
2909
                                    instance.name, False, info)
2910
  if not new_id:
2911
    return False
2912
  if device.physical_id is None:
2913
    device.physical_id = new_id
2914
  return True
2915

    
2916

    
2917
def _GenerateUniqueNames(cfg, exts):
2918
  """Generate a suitable LV name.
2919

2920
  This will generate a logical volume name for the given instance.
2921

2922
  """
2923
  results = []
2924
  for val in exts:
2925
    new_id = cfg.GenerateUniqueID()
2926
    results.append("%s%s" % (new_id, val))
2927
  return results
2928

    
2929

    
2930
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2931
                         p_minor, s_minor):
2932
  """Generate a drbd8 device complete with its children.
2933

2934
  """
2935
  port = cfg.AllocatePort()
2936
  vgname = cfg.GetVGName()
2937
  shared_secret = cfg.GenerateDRBDSecret()
2938
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2939
                          logical_id=(vgname, names[0]))
2940
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2941
                          logical_id=(vgname, names[1]))
2942
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2943
                          logical_id=(primary, secondary, port,
2944
                                      p_minor, s_minor,
2945
                                      shared_secret),
2946
                          children=[dev_data, dev_meta],
2947
                          iv_name=iv_name)
2948
  return drbd_dev
2949

    
2950

    
2951
def _GenerateDiskTemplate(cfg, template_name,
2952
                          instance_name, primary_node,
2953
                          secondary_nodes, disk_sz, swap_sz,
2954
                          file_storage_dir, file_driver):
2955
  """Generate the entire disk layout for a given template type.
2956

2957
  """
2958
  #TODO: compute space requirements
2959

    
2960
  vgname = cfg.GetVGName()
2961
  if template_name == constants.DT_DISKLESS:
2962
    disks = []
2963
  elif template_name == constants.DT_PLAIN:
2964
    if len(secondary_nodes) != 0:
2965
      raise errors.ProgrammerError("Wrong template configuration")
2966

    
2967
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2968
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2969
                           logical_id=(vgname, names[0]),
2970
                           iv_name = "sda")
2971
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2972
                           logical_id=(vgname, names[1]),
2973
                           iv_name = "sdb")
2974
    disks = [sda_dev, sdb_dev]
2975
  elif template_name == constants.DT_DRBD8:
2976
    if len(secondary_nodes) != 1:
2977
      raise errors.ProgrammerError("Wrong template configuration")
2978
    remote_node = secondary_nodes[0]
2979
    (minor_pa, minor_pb,
2980
     minor_sa, minor_sb) = cfg.AllocateDRBDMinor(
2981
      [primary_node, primary_node, remote_node, remote_node], instance_name)
2982

    
2983
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2984
                                       ".sdb_data", ".sdb_meta"])
2985
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2986
                                        disk_sz, names[0:2], "sda",
2987
                                        minor_pa, minor_sa)
2988
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2989
                                        swap_sz, names[2:4], "sdb",
2990
                                        minor_pb, minor_sb)
2991
    disks = [drbd_sda_dev, drbd_sdb_dev]
2992
  elif template_name == constants.DT_FILE:
2993
    if len(secondary_nodes) != 0:
2994
      raise errors.ProgrammerError("Wrong template configuration")
2995

    
2996
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2997
                                iv_name="sda", logical_id=(file_driver,
2998
                                "%s/sda" % file_storage_dir))
2999
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3000
                                iv_name="sdb", logical_id=(file_driver,
3001
                                "%s/sdb" % file_storage_dir))
3002
    disks = [file_sda_dev, file_sdb_dev]
3003
  else:
3004
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3005
  return disks
3006

    
3007

    
3008
def _GetInstanceInfoText(instance):
3009
  """Compute that text that should be added to the disk's metadata.
3010

3011
  """
3012
  return "originstname+%s" % instance.name
3013

    
3014

    
3015
def _CreateDisks(cfg, instance):
3016
  """Create all disks for an instance.
3017

3018
  This abstracts away some work from AddInstance.
3019

3020
  Args:
3021
    instance: the instance object
3022

3023
  Returns:
3024
    True or False showing the success of the creation process
3025

3026
  """
3027
  info = _GetInstanceInfoText(instance)
3028

    
3029
  if instance.disk_template == constants.DT_FILE:
3030
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3031
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3032
                                              file_storage_dir)
3033

    
3034
    if not result:
3035
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3036
      return False
3037

    
3038
    if not result[0]:
3039
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3040
      return False
3041

    
3042
  for device in instance.disks:
3043
    logger.Info("creating volume %s for instance %s" %
3044
                (device.iv_name, instance.name))
3045
    #HARDCODE
3046
    for secondary_node in instance.secondary_nodes:
3047
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3048
                                        device, False, info):
3049
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3050
                     (device.iv_name, device, secondary_node))
3051
        return False
3052
    #HARDCODE
3053
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3054
                                    instance, device, info):
3055
      logger.Error("failed to create volume %s on primary!" %
3056
                   device.iv_name)
3057
      return False
3058

    
3059
  return True
3060

    
3061

    
3062
def _RemoveDisks(instance, cfg):
3063
  """Remove all disks for an instance.
3064

3065
  This abstracts away some work from `AddInstance()` and
3066
  `RemoveInstance()`. Note that in case some of the devices couldn't
3067
  be removed, the removal will continue with the other ones (compare
3068
  with `_CreateDisks()`).
3069

3070
  Args:
3071
    instance: the instance object
3072

3073
  Returns:
3074
    True or False showing the success of the removal proces
3075

3076
  """
3077
  logger.Info("removing block devices for instance %s" % instance.name)
3078

    
3079
  result = True
3080
  for device in instance.disks:
3081
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3082
      cfg.SetDiskID(disk, node)
3083
      if not rpc.call_blockdev_remove(node, disk):
3084
        logger.Error("could not remove block device %s on node %s,"
3085
                     " continuing anyway" %
3086
                     (device.iv_name, node))
3087
        result = False
3088

    
3089
  if instance.disk_template == constants.DT_FILE:
3090
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3091
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3092
                                            file_storage_dir):
3093
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3094
      result = False
3095

    
3096
  return result
3097

    
3098

    
3099
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3100
  """Compute disk size requirements in the volume group
3101

3102
  This is currently hard-coded for the two-drive layout.
3103

3104
  """
3105
  # Required free disk space as a function of disk and swap space
3106
  req_size_dict = {
3107
    constants.DT_DISKLESS: None,
3108
    constants.DT_PLAIN: disk_size + swap_size,
3109
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3110
    constants.DT_DRBD8: disk_size + swap_size + 256,
3111
    constants.DT_FILE: None,
3112
  }
3113

    
3114
  if disk_template not in req_size_dict:
3115
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3116
                                 " is unknown" %  disk_template)
3117

    
3118
  return req_size_dict[disk_template]
3119

    
3120

    
3121
class LUCreateInstance(LogicalUnit):
3122
  """Create an instance.
3123

3124
  """
3125
  HPATH = "instance-add"
3126
  HTYPE = constants.HTYPE_INSTANCE
3127
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3128
              "disk_template", "swap_size", "mode", "start", "vcpus",
3129
              "wait_for_sync", "ip_check", "mac"]
3130
  REQ_BGL = False
3131

    
3132
  def _ExpandNode(self, node):
3133
    """Expands and checks one node name.
3134

3135
    """
3136
    node_full = self.cfg.ExpandNodeName(node)
3137
    if node_full is None:
3138
      raise errors.OpPrereqError("Unknown node %s" % node)
3139
    return node_full
3140

    
3141
  def ExpandNames(self):
3142
    """ExpandNames for CreateInstance.
3143

3144
    Figure out the right locks for instance creation.
3145

3146
    """
3147
    self.needed_locks = {}
3148

    
3149
    # set optional parameters to none if they don't exist
3150
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3151
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3152
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3153
                 "vnc_bind_address"]:
3154
      if not hasattr(self.op, attr):
3155
        setattr(self.op, attr, None)
3156

    
3157
    # verify creation mode
3158
    if self.op.mode not in (constants.INSTANCE_CREATE,
3159
                            constants.INSTANCE_IMPORT):
3160
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3161
                                 self.op.mode)
3162
    # disk template and mirror node verification
3163
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3164
      raise errors.OpPrereqError("Invalid disk template name")
3165

    
3166
    #### instance parameters check
3167

    
3168
    # instance name verification
3169
    hostname1 = utils.HostInfo(self.op.instance_name)
3170
    self.op.instance_name = instance_name = hostname1.name
3171

    
3172
    # this is just a preventive check, but someone might still add this
3173
    # instance in the meantime, and creation will fail at lock-add time
3174
    if instance_name in self.cfg.GetInstanceList():
3175
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3176
                                 instance_name)
3177

    
3178
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3179

    
3180
    # ip validity checks
3181
    ip = getattr(self.op, "ip", None)
3182
    if ip is None or ip.lower() == "none":
3183
      inst_ip = None
3184
    elif ip.lower() == "auto":
3185
      inst_ip = hostname1.ip
3186
    else:
3187
      if not utils.IsValidIP(ip):
3188
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3189
                                   " like a valid IP" % ip)
3190
      inst_ip = ip
3191
    self.inst_ip = self.op.ip = inst_ip
3192
    # used in CheckPrereq for ip ping check
3193
    self.check_ip = hostname1.ip
3194

    
3195
    # MAC address verification
3196
    if self.op.mac != "auto":
3197
      if not utils.IsValidMac(self.op.mac.lower()):
3198
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3199
                                   self.op.mac)
3200

    
3201
    # boot order verification
3202
    if self.op.hvm_boot_order is not None:
3203
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3204
        raise errors.OpPrereqError("invalid boot order specified,"
3205
                                   " must be one or more of [acdn]")
3206
    # file storage checks
3207
    if (self.op.file_driver and
3208
        not self.op.file_driver in constants.FILE_DRIVER):
3209
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3210
                                 self.op.file_driver)
3211

    
3212
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3213
      raise errors.OpPrereqError("File storage directory path not absolute")
3214

    
3215
    ### Node/iallocator related checks
3216
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3217
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3218
                                 " node must be given")
3219

    
3220
    if self.op.iallocator:
3221
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3222
    else:
3223
      self.op.pnode = self._ExpandNode(self.op.pnode)
3224
      nodelist = [self.op.pnode]
3225
      if self.op.snode is not None:
3226
        self.op.snode = self._ExpandNode(self.op.snode)
3227
        nodelist.append(self.op.snode)
3228
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3229

    
3230
    # in case of import lock the source node too
3231
    if self.op.mode == constants.INSTANCE_IMPORT:
3232
      src_node = getattr(self.op, "src_node", None)
3233
      src_path = getattr(self.op, "src_path", None)
3234

    
3235
      if src_node is None or src_path is None:
3236
        raise errors.OpPrereqError("Importing an instance requires source"
3237
                                   " node and path options")
3238

    
3239
      if not os.path.isabs(src_path):
3240
        raise errors.OpPrereqError("The source path must be absolute")
3241

    
3242
      self.op.src_node = src_node = self._ExpandNode(src_node)
3243
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3244
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3245

    
3246
    else: # INSTANCE_CREATE
3247
      if getattr(self.op, "os_type", None) is None:
3248
        raise errors.OpPrereqError("No guest OS specified")
3249

    
3250
  def _RunAllocator(self):
3251
    """Run the allocator based on input opcode.
3252

3253
    """
3254
    disks = [{"size": self.op.disk_size, "mode": "w"},
3255
             {"size": self.op.swap_size, "mode": "w"}]
3256
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3257
             "bridge": self.op.bridge}]
3258
    ial = IAllocator(self.cfg,
3259
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3260
                     name=self.op.instance_name,
3261
                     disk_template=self.op.disk_template,
3262
                     tags=[],
3263
                     os=self.op.os_type,
3264
                     vcpus=self.op.vcpus,
3265
                     mem_size=self.op.mem_size,
3266
                     disks=disks,
3267
                     nics=nics,
3268
                     )
3269

    
3270
    ial.Run(self.op.iallocator)
3271

    
3272
    if not ial.success:
3273
      raise errors.OpPrereqError("Can't compute nodes using"
3274
                                 " iallocator '%s': %s" % (self.op.iallocator,
3275
                                                           ial.info))
3276
    if len(ial.nodes) != ial.required_nodes:
3277
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3278
                                 " of nodes (%s), required %s" %
3279
                                 (self.op.iallocator, len(ial.nodes),
3280
                                  ial.required_nodes))
3281
    self.op.pnode = ial.nodes[0]
3282
    logger.ToStdout("Selected nodes for the instance: %s" %
3283
                    (", ".join(ial.nodes),))
3284
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3285
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3286
    if ial.required_nodes == 2:
3287
      self.op.snode = ial.nodes[1]
3288

    
3289
  def BuildHooksEnv(self):
3290
    """Build hooks env.
3291

3292
    This runs on master, primary and secondary nodes of the instance.
3293

3294
    """
3295
    env = {
3296
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3297
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3298
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3299
      "INSTANCE_ADD_MODE": self.op.mode,
3300
      }
3301
    if self.op.mode == constants.INSTANCE_IMPORT:
3302
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3303
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3304
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3305

    
3306
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3307
      primary_node=self.op.pnode,
3308
      secondary_nodes=self.secondaries,
3309
      status=self.instance_status,
3310
      os_type=self.op.os_type,
3311
      memory=self.op.mem_size,
3312
      vcpus=self.op.vcpus,
3313
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3314
    ))
3315

    
3316
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3317
          self.secondaries)
3318
    return env, nl, nl
3319

    
3320

    
3321
  def CheckPrereq(self):
3322
    """Check prerequisites.
3323

3324
    """
3325
    if (not self.cfg.GetVGName() and
3326
        self.op.disk_template not in constants.DTS_NOT_LVM):
3327
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3328
                                 " instances")
3329

    
3330
    if self.op.mode == constants.INSTANCE_IMPORT:
3331
      src_node = self.op.src_node
3332
      src_path = self.op.src_path
3333

    
3334
      export_info = rpc.call_export_info(src_node, src_path)
3335

    
3336
      if not export_info:
3337
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3338

    
3339
      if not export_info.has_section(constants.INISECT_EXP):
3340
        raise errors.ProgrammerError("Corrupted export config")
3341

    
3342
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3343
      if (int(ei_version) != constants.EXPORT_VERSION):
3344
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3345
                                   (ei_version, constants.EXPORT_VERSION))
3346

    
3347
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3348
        raise errors.OpPrereqError("Can't import instance with more than"
3349
                                   " one data disk")
3350

    
3351
      # FIXME: are the old os-es, disk sizes, etc. useful?
3352
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3353
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3354
                                                         'disk0_dump'))
3355
      self.src_image = diskimage
3356

    
3357
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3358

    
3359
    if self.op.start and not self.op.ip_check:
3360
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3361
                                 " adding an instance in start mode")
3362

    
3363
    if self.op.ip_check:
3364
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3365
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3366
                                   (self.check_ip, instance_name))
3367

    
3368
    # bridge verification
3369
    bridge = getattr(self.op, "bridge", None)
3370
    if bridge is None:
3371
      self.op.bridge = self.cfg.GetDefBridge()
3372
    else:
3373
      self.op.bridge = bridge
3374

    
3375
    #### allocator run
3376

    
3377
    if self.op.iallocator is not None:
3378
      self._RunAllocator()
3379

    
3380
    #### node related checks
3381

    
3382
    # check primary node
3383
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3384
    assert self.pnode is not None, \
3385
      "Cannot retrieve locked node %s" % self.op.pnode
3386
    self.secondaries = []
3387

    
3388
    # mirror node verification
3389
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3390
      if self.op.snode is None:
3391
        raise errors.OpPrereqError("The networked disk templates need"
3392
                                   " a mirror node")
3393
      if self.op.snode == pnode.name:
3394
        raise errors.OpPrereqError("The secondary node cannot be"
3395
                                   " the primary node.")
3396
      self.secondaries.append(self.op.snode)
3397

    
3398
    req_size = _ComputeDiskSize(self.op.disk_template,
3399
                                self.op.disk_size, self.op.swap_size)
3400

    
3401
    # Check lv size requirements
3402
    if req_size is not None:
3403
      nodenames = [pnode.name] + self.secondaries
3404
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3405
      for node in nodenames:
3406
        info = nodeinfo.get(node, None)
3407
        if not info:
3408
          raise errors.OpPrereqError("Cannot get current information"
3409
                                     " from node '%s'" % node)
3410
        vg_free = info.get('vg_free', None)
3411
        if not isinstance(vg_free, int):
3412
          raise errors.OpPrereqError("Can't compute free disk space on"
3413
                                     " node %s" % node)
3414
        if req_size > info['vg_free']:
3415
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3416
                                     " %d MB available, %d MB required" %
3417
                                     (node, info['vg_free'], req_size))
3418

    
3419
    # os verification
3420
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3421
    if not os_obj:
3422
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3423
                                 " primary node"  % self.op.os_type)
3424

    
3425
    if self.op.kernel_path == constants.VALUE_NONE:
3426
      raise errors.OpPrereqError("Can't set instance kernel to none")
3427

    
3428
    # bridge check on primary node
3429
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3430
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3431
                                 " destination node '%s'" %
3432
                                 (self.op.bridge, pnode.name))
3433

    
3434
    # memory check on primary node
3435
    if self.op.start:
3436
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3437
                           "creating instance %s" % self.op.instance_name,
3438
                           self.op.mem_size)
3439

    
3440
    # hvm_cdrom_image_path verification
3441
    if self.op.hvm_cdrom_image_path is not None:
3442
      # FIXME (als): shouldn't these checks happen on the destination node?
3443
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3444
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3445
                                   " be an absolute path or None, not %s" %
3446
                                   self.op.hvm_cdrom_image_path)
3447
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3448
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3449
                                   " regular file or a symlink pointing to"
3450
                                   " an existing regular file, not %s" %
3451
                                   self.op.hvm_cdrom_image_path)
3452

    
3453
    # vnc_bind_address verification
3454
    if self.op.vnc_bind_address is not None:
3455
      if not utils.IsValidIP(self.op.vnc_bind_address):
3456
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3457
                                   " like a valid IP address" %
3458
                                   self.op.vnc_bind_address)
3459

    
3460
    # Xen HVM device type checks
3461
    if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
3462
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3463
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3464
                                   " hypervisor" % self.op.hvm_nic_type)
3465
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3466
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3467
                                   " hypervisor" % self.op.hvm_disk_type)
3468

    
3469
    if self.op.start:
3470
      self.instance_status = 'up'
3471
    else:
3472
      self.instance_status = 'down'
3473

    
3474
  def Exec(self, feedback_fn):
3475
    """Create and add the instance to the cluster.
3476

3477
    """
3478
    instance = self.op.instance_name
3479
    pnode_name = self.pnode.name
3480

    
3481
    if self.op.mac == "auto":
3482
      mac_address = self.cfg.GenerateMAC()
3483
    else:
3484
      mac_address = self.op.mac
3485

    
3486
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3487
    if self.inst_ip is not None:
3488
      nic.ip = self.inst_ip
3489

    
3490
    ht_kind = self.cfg.GetHypervisorType()
3491
    if ht_kind in constants.HTS_REQ_PORT:
3492
      network_port = self.cfg.AllocatePort()
3493
    else:
3494
      network_port = None
3495

    
3496
    if self.op.vnc_bind_address is None:
3497
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3498

    
3499
    # this is needed because os.path.join does not accept None arguments
3500
    if self.op.file_storage_dir is None:
3501
      string_file_storage_dir = ""
3502
    else:
3503
      string_file_storage_dir = self.op.file_storage_dir
3504

    
3505
    # build the full file storage dir path
3506
    file_storage_dir = os.path.normpath(os.path.join(
3507
                                        self.cfg.GetFileStorageDir(),
3508
                                        string_file_storage_dir, instance))
3509

    
3510

    
3511
    disks = _GenerateDiskTemplate(self.cfg,
3512
                                  self.op.disk_template,
3513
                                  instance, pnode_name,
3514
                                  self.secondaries, self.op.disk_size,
3515
                                  self.op.swap_size,
3516
                                  file_storage_dir,
3517
                                  self.op.file_driver)
3518

    
3519
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3520
                            primary_node=pnode_name,
3521
                            memory=self.op.mem_size,
3522
                            vcpus=self.op.vcpus,
3523
                            nics=[nic], disks=disks,
3524
                            disk_template=self.op.disk_template,
3525
                            status=self.instance_status,
3526
                            network_port=network_port,
3527
                            kernel_path=self.op.kernel_path,
3528
                            initrd_path=self.op.initrd_path,
3529
                            hvm_boot_order=self.op.hvm_boot_order,
3530
                            hvm_acpi=self.op.hvm_acpi,
3531
                            hvm_pae=self.op.hvm_pae,
3532
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3533
                            vnc_bind_address=self.op.vnc_bind_address,
3534
                            hvm_nic_type=self.op.hvm_nic_type,
3535
                            hvm_disk_type=self.op.hvm_disk_type,
3536
                            )
3537

    
3538
    feedback_fn("* creating instance disks...")
3539
    if not _CreateDisks(self.cfg, iobj):
3540
      _RemoveDisks(iobj, self.cfg)
3541
      self.cfg.ReleaseDRBDMinors(instance)
3542
      raise errors.OpExecError("Device creation failed, reverting...")
3543

    
3544
    feedback_fn("adding instance %s to cluster config" % instance)
3545

    
3546
    self.cfg.AddInstance(iobj)
3547
    # Declare that we don't want to remove the instance lock anymore, as we've
3548
    # added the instance to the config
3549
    del self.remove_locks[locking.LEVEL_INSTANCE]
3550
    # Remove the temp. assignements for the instance's drbds
3551
    self.cfg.ReleaseDRBDMinors(instance)
3552

    
3553
    if self.op.wait_for_sync:
3554
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3555
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3556
      # make sure the disks are not degraded (still sync-ing is ok)
3557
      time.sleep(15)
3558
      feedback_fn("* checking mirrors status")
3559
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3560
    else:
3561
      disk_abort = False
3562

    
3563
    if disk_abort:
3564
      _RemoveDisks(iobj, self.cfg)
3565
      self.cfg.RemoveInstance(iobj.name)
3566
      # Make sure the instance lock gets removed
3567
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3568
      raise errors.OpExecError("There are some degraded disks for"
3569
                               " this instance")
3570

    
3571
    feedback_fn("creating os for instance %s on node %s" %
3572
                (instance, pnode_name))
3573

    
3574
    if iobj.disk_template != constants.DT_DISKLESS:
3575
      if self.op.mode == constants.INSTANCE_CREATE:
3576
        feedback_fn("* running the instance OS create scripts...")
3577
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3578
          raise errors.OpExecError("could not add os for instance %s"
3579
                                   " on node %s" %
3580
                                   (instance, pnode_name))
3581

    
3582
      elif self.op.mode == constants.INSTANCE_IMPORT:
3583
        feedback_fn("* running the instance OS import scripts...")
3584
        src_node = self.op.src_node
3585
        src_image = self.src_image
3586
        cluster_name = self.cfg.GetClusterName()
3587
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3588
                                           src_node, src_image, cluster_name):
3589
          raise errors.OpExecError("Could not import os for instance"
3590
                                   " %s on node %s" %
3591
                                   (instance, pnode_name))
3592
      else:
3593
        # also checked in the prereq part
3594
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3595
                                     % self.op.mode)
3596

    
3597
    if self.op.start:
3598
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3599
      feedback_fn("* starting instance...")
3600
      if not rpc.call_instance_start(pnode_name, iobj, None):
3601
        raise errors.OpExecError("Could not start instance")
3602

    
3603

    
3604
class LUConnectConsole(NoHooksLU):
3605
  """Connect to an instance's console.
3606

3607
  This is somewhat special in that it returns the command line that
3608
  you need to run on the master node in order to connect to the
3609
  console.
3610

3611
  """
3612
  _OP_REQP = ["instance_name"]
3613
  REQ_BGL = False
3614

    
3615
  def ExpandNames(self):
3616
    self._ExpandAndLockInstance()
3617

    
3618
  def CheckPrereq(self):
3619
    """Check prerequisites.
3620

3621
    This checks that the instance is in the cluster.
3622

3623
    """
3624
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3625
    assert self.instance is not None, \
3626
      "Cannot retrieve locked instance %s" % self.op.instance_name
3627

    
3628
  def Exec(self, feedback_fn):
3629
    """Connect to the console of an instance
3630

3631
    """
3632
    instance = self.instance
3633
    node = instance.primary_node
3634

    
3635
    node_insts = rpc.call_instance_list([node])[node]
3636
    if node_insts is False:
3637
      raise errors.OpExecError("Can't connect to node %s." % node)
3638

    
3639
    if instance.name not in node_insts:
3640
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3641

    
3642
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3643

    
3644
    hyper = hypervisor.GetHypervisor(self.cfg)
3645
    console_cmd = hyper.GetShellCommandForConsole(instance)
3646

    
3647
    # build ssh cmdline
3648
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3649

    
3650

    
3651
class LUReplaceDisks(LogicalUnit):
3652
  """Replace the disks of an instance.
3653

3654
  """
3655
  HPATH = "mirrors-replace"
3656
  HTYPE = constants.HTYPE_INSTANCE
3657
  _OP_REQP = ["instance_name", "mode", "disks"]
3658
  REQ_BGL = False
3659

    
3660
  def ExpandNames(self):
3661
    self._ExpandAndLockInstance()
3662

    
3663
    if not hasattr(self.op, "remote_node"):
3664
      self.op.remote_node = None
3665

    
3666
    ia_name = getattr(self.op, "iallocator", None)
3667
    if ia_name is not None:
3668
      if self.op.remote_node is not None:
3669
        raise errors.OpPrereqError("Give either the iallocator or the new"
3670
                                   " secondary, not both")
3671
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3672
    elif self.op.remote_node is not None:
3673
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3674
      if remote_node is None:
3675
        raise errors.OpPrereqError("Node '%s' not known" %
3676
                                   self.op.remote_node)
3677
      self.op.remote_node = remote_node
3678
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3679
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3680
    else:
3681
      self.needed_locks[locking.LEVEL_NODE] = []
3682
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3683

    
3684
  def DeclareLocks(self, level):
3685
    # If we're not already locking all nodes in the set we have to declare the
3686
    # instance's primary/secondary nodes.
3687
    if (level == locking.LEVEL_NODE and
3688
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3689
      self._LockInstancesNodes()
3690

    
3691
  def _RunAllocator(self):
3692
    """Compute a new secondary node using an IAllocator.
3693

3694
    """
3695
    ial = IAllocator(self.cfg,
3696
                     mode=constants.IALLOCATOR_MODE_RELOC,
3697
                     name=self.op.instance_name,
3698
                     relocate_from=[self.sec_node])
3699

    
3700
    ial.Run(self.op.iallocator)
3701

    
3702
    if not ial.success:
3703
      raise errors.OpPrereqError("Can't compute nodes using"
3704
                                 " iallocator '%s': %s" % (self.op.iallocator,
3705
                                                           ial.info))
3706
    if len(ial.nodes) != ial.required_nodes:
3707
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3708
                                 " of nodes (%s), required %s" %
3709
                                 (len(ial.nodes), ial.required_nodes))
3710
    self.op.remote_node = ial.nodes[0]
3711
    logger.ToStdout("Selected new secondary for the instance: %s" %
3712
                    self.op.remote_node)
3713

    
3714
  def BuildHooksEnv(self):
3715
    """Build hooks env.
3716

3717
    This runs on the master, the primary and all the secondaries.
3718

3719
    """
3720
    env = {
3721
      "MODE": self.op.mode,
3722
      "NEW_SECONDARY": self.op.remote_node,
3723
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3724
      }
3725
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3726
    nl = [
3727
      self.cfg.GetMasterNode(),
3728
      self.instance.primary_node,
3729
      ]
3730
    if self.op.remote_node is not None:
3731
      nl.append(self.op.remote_node)
3732
    return env, nl, nl
3733

    
3734
  def CheckPrereq(self):
3735
    """Check prerequisites.
3736

3737
    This checks that the instance is in the cluster.
3738

3739
    """
3740
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3741
    assert instance is not None, \
3742
      "Cannot retrieve locked instance %s" % self.op.instance_name
3743
    self.instance = instance
3744

    
3745
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3746
      raise errors.OpPrereqError("Instance's disk layout is not"
3747
                                 " network mirrored.")
3748

    
3749
    if len(instance.secondary_nodes) != 1:
3750
      raise errors.OpPrereqError("The instance has a strange layout,"
3751
                                 " expected one secondary but found %d" %
3752
                                 len(instance.secondary_nodes))
3753

    
3754
    self.sec_node = instance.secondary_nodes[0]
3755

    
3756
    ia_name = getattr(self.op, "iallocator", None)
3757
    if ia_name is not None:
3758
      self._RunAllocator()
3759

    
3760
    remote_node = self.op.remote_node
3761
    if remote_node is not None:
3762
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3763
      assert self.remote_node_info is not None, \
3764
        "Cannot retrieve locked node %s" % remote_node
3765
    else:
3766
      self.remote_node_info = None
3767
    if remote_node == instance.primary_node:
3768
      raise errors.OpPrereqError("The specified node is the primary node of"
3769
                                 " the instance.")
3770
    elif remote_node == self.sec_node:
3771
      if self.op.mode == constants.REPLACE_DISK_SEC:
3772
        # this is for DRBD8, where we can't execute the same mode of
3773
        # replacement as for drbd7 (no different port allocated)
3774
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3775
                                   " replacement")
3776
    if instance.disk_template == constants.DT_DRBD8:
3777
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3778
          remote_node is not None):
3779
        # switch to replace secondary mode
3780
        self.op.mode = constants.REPLACE_DISK_SEC
3781

    
3782
      if self.op.mode == constants.REPLACE_DISK_ALL:
3783
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3784
                                   " secondary disk replacement, not"
3785
                                   " both at once")
3786
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3787
        if remote_node is not None:
3788
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3789
                                     " the secondary while doing a primary"
3790
                                     " node disk replacement")
3791
        self.tgt_node = instance.primary_node
3792
        self.oth_node = instance.secondary_nodes[0]
3793
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3794
        self.new_node = remote_node # this can be None, in which case
3795
                                    # we don't change the secondary
3796
        self.tgt_node = instance.secondary_nodes[0]
3797
        self.oth_node = instance.primary_node
3798
      else:
3799
        raise errors.ProgrammerError("Unhandled disk replace mode")
3800

    
3801
    for name in self.op.disks:
3802
      if instance.FindDisk(name) is None:
3803
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3804
                                   (name, instance.name))
3805

    
3806
  def _ExecD8DiskOnly(self, feedback_fn):
3807
    """Replace a disk on the primary or secondary for dbrd8.
3808

3809
    The algorithm for replace is quite complicated:
3810
      - for each disk to be replaced:
3811
        - create new LVs on the target node with unique names
3812
        - detach old LVs from the drbd device
3813
        - rename old LVs to name_replaced.<time_t>
3814
        - rename new LVs to old LVs
3815
        - attach the new LVs (with the old names now) to the drbd device
3816
      - wait for sync across all devices
3817
      - for each modified disk:
3818
        - remove old LVs (which have the name name_replaces.<time_t>)
3819

3820
    Failures are not very well handled.
3821

3822
    """
3823
    steps_total = 6
3824
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3825
    instance = self.instance
3826
    iv_names = {}
3827
    vgname = self.cfg.GetVGName()
3828
    # start of work
3829
    cfg = self.cfg
3830
    tgt_node = self.tgt_node
3831
    oth_node = self.oth_node
3832

    
3833
    # Step: check device activation
3834
    self.proc.LogStep(1, steps_total, "check device existence")
3835
    info("checking volume groups")
3836
    my_vg = cfg.GetVGName()
3837
    results = rpc.call_vg_list([oth_node, tgt_node])
3838
    if not results:
3839
      raise errors.OpExecError("Can't list volume groups on the nodes")
3840
    for node in oth_node, tgt_node:
3841
      res = results.get(node, False)
3842
      if not res or my_vg not in res:
3843
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3844
                                 (my_vg, node))
3845
    for dev in instance.disks:
3846
      if not dev.iv_name in self.op.disks:
3847
        continue
3848
      for node in tgt_node, oth_node:
3849
        info("checking %s on %s" % (dev.iv_name, node))
3850
        cfg.SetDiskID(dev, node)
3851
        if not rpc.call_blockdev_find(node, dev):
3852
          raise errors.OpExecError("Can't find device %s on node %s" %
3853
                                   (dev.iv_name, node))
3854

    
3855
    # Step: check other node consistency
3856
    self.proc.LogStep(2, steps_total, "check peer consistency")
3857
    for dev in instance.disks:
3858
      if not dev.iv_name in self.op.disks:
3859
        continue
3860
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3861
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3862
                                   oth_node==instance.primary_node):
3863
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3864
                                 " to replace disks on this node (%s)" %
3865
                                 (oth_node, tgt_node))
3866

    
3867
    # Step: create new storage
3868
    self.proc.LogStep(3, steps_total, "allocate new storage")
3869
    for dev in instance.disks:
3870
      if not dev.iv_name in self.op.disks:
3871
        continue
3872
      size = dev.size
3873
      cfg.SetDiskID(dev, tgt_node)
3874
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3875
      names = _GenerateUniqueNames(cfg, lv_names)
3876
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3877
                             logical_id=(vgname, names[0]))
3878
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3879
                             logical_id=(vgname, names[1]))
3880
      new_lvs = [lv_data, lv_meta]
3881
      old_lvs = dev.children
3882
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3883
      info("creating new local storage on %s for %s" %
3884
           (tgt_node, dev.iv_name))
3885
      # since we *always* want to create this LV, we use the
3886
      # _Create...OnPrimary (which forces the creation), even if we
3887
      # are talking about the secondary node
3888
      for new_lv in new_lvs:
3889
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3890
                                        _GetInstanceInfoText(instance)):
3891
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3892
                                   " node '%s'" %
3893
                                   (new_lv.logical_id[1], tgt_node))
3894

    
3895
    # Step: for each lv, detach+rename*2+attach
3896
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3897
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3898
      info("detaching %s drbd from local storage" % dev.iv_name)
3899
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3900
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3901
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3902
      #dev.children = []
3903
      #cfg.Update(instance)
3904

    
3905
      # ok, we created the new LVs, so now we know we have the needed
3906
      # storage; as such, we proceed on the target node to rename
3907
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3908
      # using the assumption that logical_id == physical_id (which in
3909
      # turn is the unique_id on that node)
3910

    
3911
      # FIXME(iustin): use a better name for the replaced LVs
3912
      temp_suffix = int(time.time())
3913
      ren_fn = lambda d, suff: (d.physical_id[0],
3914
                                d.physical_id[1] + "_replaced-%s" % suff)
3915
      # build the rename list based on what LVs exist on the node
3916
      rlist = []
3917
      for to_ren in old_lvs:
3918
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3919
        if find_res is not None: # device exists
3920
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3921

    
3922
      info("renaming the old LVs on the target node")
3923
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3924
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3925
      # now we rename the new LVs to the old LVs
3926
      info("renaming the new LVs on the target node")
3927
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3928
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3929
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3930

    
3931
      for old, new in zip(old_lvs, new_lvs):
3932
        new.logical_id = old.logical_id
3933
        cfg.SetDiskID(new, tgt_node)
3934

    
3935
      for disk in old_lvs:
3936
        disk.logical_id = ren_fn(disk, temp_suffix)
3937
        cfg.SetDiskID(disk, tgt_node)
3938

    
3939
      # now that the new lvs have the old name, we can add them to the device
3940
      info("adding new mirror component on %s" % tgt_node)
3941
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3942
        for new_lv in new_lvs:
3943
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3944
            warning("Can't rollback device %s", hint="manually cleanup unused"
3945
                    " logical volumes")
3946
        raise errors.OpExecError("Can't add local storage to drbd")
3947

    
3948
      dev.children = new_lvs
3949
      cfg.Update(instance)
3950

    
3951
    # Step: wait for sync
3952

    
3953
    # this can fail as the old devices are degraded and _WaitForSync
3954
    # does a combined result over all disks, so we don't check its
3955
    # return value
3956
    self.proc.LogStep(5, steps_total, "sync devices")
3957
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3958

    
3959
    # so check manually all the devices
3960
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3961
      cfg.SetDiskID(dev, instance.primary_node)
3962
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3963
      if is_degr:
3964
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3965

    
3966
    # Step: remove old storage
3967
    self.proc.LogStep(6, steps_total, "removing old storage")
3968
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3969
      info("remove logical volumes for %s" % name)
3970
      for lv in old_lvs:
3971
        cfg.SetDiskID(lv, tgt_node)
3972
        if not rpc.call_blockdev_remove(tgt_node, lv):
3973
          warning("Can't remove old LV", hint="manually remove unused LVs")
3974
          continue
3975

    
3976
  def _ExecD8Secondary(self, feedback_fn):
3977
    """Replace the secondary node for drbd8.
3978

3979
    The algorithm for replace is quite complicated:
3980
      - for all disks of the instance:
3981
        - create new LVs on the new node with same names
3982
        - shutdown the drbd device on the old secondary
3983
        - disconnect the drbd network on the primary
3984
        - create the drbd device on the new secondary
3985
        - network attach the drbd on the primary, using an artifice:
3986
          the drbd code for Attach() will connect to the network if it
3987
          finds a device which is connected to the good local disks but
3988
          not network enabled
3989
      - wait for sync across all devices
3990
      - remove all disks from the old secondary
3991

3992
    Failures are not very well handled.
3993

3994
    """
3995
    steps_total = 6
3996
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3997
    instance = self.instance
3998
    iv_names = {}
3999
    vgname = self.cfg.GetVGName()
4000
    # start of work
4001
    cfg = self.cfg
4002
    old_node = self.tgt_node
4003
    new_node = self.new_node
4004
    pri_node = instance.primary_node
4005

    
4006
    # Step: check device activation
4007
    self.proc.LogStep(1, steps_total, "check device existence")
4008
    info("checking volume groups")
4009
    my_vg = cfg.GetVGName()
4010
    results = rpc.call_vg_list([pri_node, new_node])
4011
    if not results:
4012
      raise errors.OpExecError("Can't list volume groups on the nodes")
4013
    for node in pri_node, new_node:
4014
      res = results.get(node, False)
4015
      if not res or my_vg not in res:
4016
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4017
                                 (my_vg, node))
4018
    for dev in instance.disks:
4019
      if not dev.iv_name in self.op.disks:
4020
        continue
4021
      info("checking %s on %s" % (dev.iv_name, pri_node))
4022
      cfg.SetDiskID(dev, pri_node)
4023
      if not rpc.call_blockdev_find(pri_node, dev):
4024
        raise errors.OpExecError("Can't find device %s on node %s" %
4025
                                 (dev.iv_name, pri_node))
4026

    
4027
    # Step: check other node consistency
4028
    self.proc.LogStep(2, steps_total, "check peer consistency")
4029
    for dev in instance.disks:
4030
      if not dev.iv_name in self.op.disks:
4031
        continue
4032
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4033
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4034
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4035
                                 " unsafe to replace the secondary" %
4036
                                 pri_node)
4037

    
4038
    # Step: create new storage
4039
    self.proc.LogStep(3, steps_total, "allocate new storage")
4040
    for dev in instance.disks:
4041
      size = dev.size
4042
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4043
      # since we *always* want to create this LV, we use the
4044
      # _Create...OnPrimary (which forces the creation), even if we
4045
      # are talking about the secondary node
4046
      for new_lv in dev.children:
4047
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4048
                                        _GetInstanceInfoText(instance)):
4049
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4050
                                   " node '%s'" %
4051
                                   (new_lv.logical_id[1], new_node))
4052

    
4053

    
4054
    # Step 4: dbrd minors and drbd setups changes
4055
    # after this, we must manually remove the drbd minors on both the
4056
    # error and the success paths
4057
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4058
                                   instance.name)
4059
    logging.debug("Allocated minors %s" % (minors,))
4060
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4061
    for dev, new_minor in zip(instance.disks, minors):
4062
      size = dev.size
4063
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4064
      # create new devices on new_node
4065
      if pri_node == dev.logical_id[0]:
4066
        new_logical_id = (pri_node, new_node,
4067
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4068
                          dev.logical_id[5])
4069
      else:
4070
        new_logical_id = (new_node, pri_node,
4071
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4072
                          dev.logical_id[5])
4073
      iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4074
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4075
                    new_logical_id)
4076
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4077
                              logical_id=new_logical_id,
4078
                              children=dev.children)
4079
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4080
                                        new_drbd, False,
4081
                                      _GetInstanceInfoText(instance)):
4082
        self.cfg.ReleaseDRBDMinors(instance.name)
4083
        raise errors.OpExecError("Failed to create new DRBD on"
4084
                                 " node '%s'" % new_node)
4085

    
4086
    for dev in instance.disks:
4087
      # we have new devices, shutdown the drbd on the old secondary
4088
      info("shutting down drbd for %s on old node" % dev.iv_name)
4089
      cfg.SetDiskID(dev, old_node)
4090
      if not rpc.call_blockdev_shutdown(old_node, dev):
4091
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4092
                hint="Please cleanup this device manually as soon as possible")
4093

    
4094
    info("detaching primary drbds from the network (=> standalone)")
4095
    done = 0
4096
    for dev in instance.disks:
4097
      cfg.SetDiskID(dev, pri_node)
4098
      # set the network part of the physical (unique in bdev terms) id
4099
      # to None, meaning detach from network
4100
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4101
      # and 'find' the device, which will 'fix' it to match the
4102
      # standalone state
4103
      if rpc.call_blockdev_find(pri_node, dev):
4104
        done += 1
4105
      else:
4106
        warning("Failed to detach drbd %s from network, unusual case" %
4107
                dev.iv_name)
4108

    
4109
    if not done:
4110
      # no detaches succeeded (very unlikely)
4111
      self.cfg.ReleaseDRBDMinors(instance.name)
4112
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4113

    
4114
    # if we managed to detach at least one, we update all the disks of
4115
    # the instance to point to the new secondary
4116
    info("updating instance configuration")
4117
    for dev, _, new_logical_id in iv_names.itervalues():
4118
      dev.logical_id = new_logical_id
4119
      cfg.SetDiskID(dev, pri_node)
4120
    cfg.Update(instance)
4121
    # we can remove now the temp minors as now the new values are
4122
    # written to the config file (and therefore stable)
4123
    self.cfg.ReleaseDRBDMinors(instance.name)
4124

    
4125
    # and now perform the drbd attach
4126
    info("attaching primary drbds to new secondary (standalone => connected)")
4127
    failures = []
4128
    for dev in instance.disks:
4129
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4130
      # since the attach is smart, it's enough to 'find' the device,
4131
      # it will automatically activate the network, if the physical_id
4132
      # is correct
4133
      cfg.SetDiskID(dev, pri_node)
4134
      logging.debug("Disk to attach: %s", dev)
4135
      if not rpc.call_blockdev_find(pri_node, dev):
4136
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4137
                "please do a gnt-instance info to see the status of disks")
4138

    
4139
    # this can fail as the old devices are degraded and _WaitForSync
4140
    # does a combined result over all disks, so we don't check its
4141
    # return value
4142
    self.proc.LogStep(5, steps_total, "sync devices")
4143
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4144

    
4145
    # so check manually all the devices
4146
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4147
      cfg.SetDiskID(dev, pri_node)
4148
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4149
      if is_degr:
4150
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4151

    
4152
    self.proc.LogStep(6, steps_total, "removing old storage")
4153
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4154
      info("remove logical volumes for %s" % name)
4155
      for lv in old_lvs:
4156
        cfg.SetDiskID(lv, old_node)
4157
        if not rpc.call_blockdev_remove(old_node, lv):
4158
          warning("Can't remove LV on old secondary",
4159
                  hint="Cleanup stale volumes by hand")
4160

    
4161
  def Exec(self, feedback_fn):
4162
    """Execute disk replacement.
4163

4164
    This dispatches the disk replacement to the appropriate handler.
4165

4166
    """
4167
    instance = self.instance
4168

    
4169
    # Activate the instance disks if we're replacing them on a down instance
4170
    if instance.status == "down":
4171
      _StartInstanceDisks(self.cfg, instance, True)
4172

    
4173
    if instance.disk_template == constants.DT_DRBD8:
4174
      if self.op.remote_node is None:
4175
        fn = self._ExecD8DiskOnly
4176
      else:
4177
        fn = self._ExecD8Secondary
4178
    else:
4179
      raise errors.ProgrammerError("Unhandled disk replacement case")
4180

    
4181
    ret = fn(feedback_fn)
4182

    
4183
    # Deactivate the instance disks if we're replacing them on a down instance
4184
    if instance.status == "down":
4185
      _SafeShutdownInstanceDisks(instance, self.cfg)
4186

    
4187
    return ret
4188

    
4189

    
4190
class LUGrowDisk(LogicalUnit):
4191
  """Grow a disk of an instance.
4192

4193
  """
4194
  HPATH = "disk-grow"
4195
  HTYPE = constants.HTYPE_INSTANCE
4196
  _OP_REQP = ["instance_name", "disk", "amount"]
4197
  REQ_BGL = False
4198

    
4199
  def ExpandNames(self):
4200
    self._ExpandAndLockInstance()
4201
    self.needed_locks[locking.LEVEL_NODE] = []
4202
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4203

    
4204
  def DeclareLocks(self, level):
4205
    if level == locking.LEVEL_NODE:
4206
      self._LockInstancesNodes()
4207

    
4208
  def BuildHooksEnv(self):
4209
    """Build hooks env.
4210

4211
    This runs on the master, the primary and all the secondaries.
4212

4213
    """
4214
    env = {
4215
      "DISK": self.op.disk,
4216
      "AMOUNT": self.op.amount,
4217
      }
4218
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4219
    nl = [
4220
      self.cfg.GetMasterNode(),
4221
      self.instance.primary_node,
4222
      ]
4223
    return env, nl, nl
4224

    
4225
  def CheckPrereq(self):
4226
    """Check prerequisites.
4227

4228
    This checks that the instance is in the cluster.
4229

4230
    """
4231
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4232
    assert instance is not None, \
4233
      "Cannot retrieve locked instance %s" % self.op.instance_name
4234

    
4235
    self.instance = instance
4236

    
4237
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4238
      raise errors.OpPrereqError("Instance's disk layout does not support"
4239
                                 " growing.")
4240

    
4241
    if instance.FindDisk(self.op.disk) is None:
4242
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4243
                                 (self.op.disk, instance.name))
4244

    
4245
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4246
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4247
    for node in nodenames:
4248
      info = nodeinfo.get(node, None)
4249
      if not info:
4250
        raise errors.OpPrereqError("Cannot get current information"
4251
                                   " from node '%s'" % node)
4252
      vg_free = info.get('vg_free', None)
4253
      if not isinstance(vg_free, int):
4254
        raise errors.OpPrereqError("Can't compute free disk space on"
4255
                                   " node %s" % node)
4256
      if self.op.amount > info['vg_free']:
4257
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4258
                                   " %d MiB available, %d MiB required" %
4259
                                   (node, info['vg_free'], self.op.amount))
4260

    
4261
  def Exec(self, feedback_fn):
4262
    """Execute disk grow.
4263

4264
    """
4265
    instance = self.instance
4266
    disk = instance.FindDisk(self.op.disk)
4267
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4268
      self.cfg.SetDiskID(disk, node)
4269
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4270
      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4271
        raise errors.OpExecError("grow request failed to node %s" % node)
4272
      elif not result[0]:
4273
        raise errors.OpExecError("grow request failed to node %s: %s" %
4274
                                 (node, result[1]))
4275
    disk.RecordGrow(self.op.amount)
4276
    self.cfg.Update(instance)
4277
    return
4278

    
4279

    
4280
class LUQueryInstanceData(NoHooksLU):
4281
  """Query runtime instance data.
4282

4283
  """
4284
  _OP_REQP = ["instances"]
4285
  REQ_BGL = False
4286

    
4287
  def ExpandNames(self):
4288
    self.needed_locks = {}
4289
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4290

    
4291
    if not isinstance(self.op.instances, list):
4292
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4293

    
4294
    if self.op.instances:
4295
      self.wanted_names = []
4296
      for name in self.op.instances:
4297
        full_name = self.cfg.ExpandInstanceName(name)
4298
        if full_name is None:
4299
          raise errors.OpPrereqError("Instance '%s' not known" %
4300
                                     self.op.instance_name)
4301
        self.wanted_names.append(full_name)
4302
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4303
    else:
4304
      self.wanted_names = None
4305
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4306

    
4307
    self.needed_locks[locking.LEVEL_NODE] = []
4308
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4309

    
4310
  def DeclareLocks(self, level):
4311
    if level == locking.LEVEL_NODE:
4312
      self._LockInstancesNodes()
4313

    
4314
  def CheckPrereq(self):
4315
    """Check prerequisites.
4316

4317
    This only checks the optional instance list against the existing names.
4318

4319
    """
4320
    if self.wanted_names is None:
4321
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4322

    
4323
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4324
                             in self.wanted_names]
4325
    return
4326

    
4327
  def _ComputeDiskStatus(self, instance, snode, dev):
4328
    """Compute block device status.
4329

4330
    """
4331
    self.cfg.SetDiskID(dev, instance.primary_node)
4332
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4333
    if dev.dev_type in constants.LDS_DRBD:
4334
      # we change the snode then (otherwise we use the one passed in)
4335
      if dev.logical_id[0] == instance.primary_node:
4336
        snode = dev.logical_id[1]
4337
      else:
4338
        snode = dev.logical_id[0]
4339

    
4340
    if snode:
4341
      self.cfg.SetDiskID(dev, snode)
4342
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4343
    else:
4344
      dev_sstatus = None
4345

    
4346
    if dev.children:
4347
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4348
                      for child in dev.children]
4349
    else:
4350
      dev_children = []
4351

    
4352
    data = {
4353
      "iv_name": dev.iv_name,
4354
      "dev_type": dev.dev_type,
4355
      "logical_id": dev.logical_id,
4356
      "physical_id": dev.physical_id,
4357
      "pstatus": dev_pstatus,
4358
      "sstatus": dev_sstatus,
4359
      "children": dev_children,
4360
      }
4361

    
4362
    return data
4363

    
4364
  def Exec(self, feedback_fn):
4365
    """Gather and return data"""
4366
    result = {}
4367
    for instance in self.wanted_instances:
4368
      remote_info = rpc.call_instance_info(instance.primary_node,
4369
                                                instance.name)
4370
      if remote_info and "state" in remote_info:
4371
        remote_state = "up"
4372
      else:
4373
        remote_state = "down"
4374
      if instance.status == "down":
4375
        config_state = "down"
4376
      else:
4377
        config_state = "up"
4378

    
4379
      disks = [self._ComputeDiskStatus(instance, None, device)
4380
               for device in instance.disks]
4381

    
4382
      idict = {
4383
        "name": instance.name,
4384
        "config_state": config_state,
4385
        "run_state": remote_state,
4386
        "pnode": instance.primary_node,
4387
        "snodes": instance.secondary_nodes,
4388
        "os": instance.os,
4389
        "memory": instance.memory,
4390
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4391
        "disks": disks,
4392
        "vcpus": instance.vcpus,
4393
        }
4394

    
4395
      htkind = self.cfg.GetHypervisorType()
4396
      if htkind == constants.HT_XEN_PVM30:
4397
        idict["kernel_path"] = instance.kernel_path
4398
        idict["initrd_path"] = instance.initrd_path
4399

    
4400
      if htkind == constants.HT_XEN_HVM31:
4401
        idict["hvm_boot_order"] = instance.hvm_boot_order
4402
        idict["hvm_acpi"] = instance.hvm_acpi
4403
        idict["hvm_pae"] = instance.hvm_pae
4404
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4405
        idict["hvm_nic_type"] = instance.hvm_nic_type
4406
        idict["hvm_disk_type"] = instance.hvm_disk_type
4407

    
4408
      if htkind in constants.HTS_REQ_PORT:
4409
        if instance.vnc_bind_address is None:
4410
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4411
        else:
4412
          vnc_bind_address = instance.vnc_bind_address
4413
        if instance.network_port is None:
4414
          vnc_console_port = None
4415
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4416
          vnc_console_port = "%s:%s" % (instance.primary_node,
4417
                                       instance.network_port)
4418
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4419
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4420
                                                   instance.network_port,
4421
                                                   instance.primary_node)
4422
        else:
4423
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4424
                                        instance.network_port)
4425
        idict["vnc_console_port"] = vnc_console_port
4426
        idict["vnc_bind_address"] = vnc_bind_address
4427
        idict["network_port"] = instance.network_port
4428

    
4429
      result[instance.name] = idict
4430

    
4431
    return result
4432

    
4433

    
4434
class LUSetInstanceParams(LogicalUnit):
4435
  """Modifies an instances's parameters.
4436

4437
  """
4438
  HPATH = "instance-modify"
4439
  HTYPE = constants.HTYPE_INSTANCE
4440
  _OP_REQP = ["instance_name"]
4441
  REQ_BGL = False
4442

    
4443
  def ExpandNames(self):
4444
    self._ExpandAndLockInstance()
4445

    
4446
  def BuildHooksEnv(self):
4447
    """Build hooks env.
4448

4449
    This runs on the master, primary and secondaries.
4450

4451
    """
4452
    args = dict()
4453
    if self.mem:
4454
      args['memory'] = self.mem
4455
    if self.vcpus:
4456
      args['vcpus'] = self.vcpus
4457
    if self.do_ip or self.do_bridge or self.mac:
4458
      if self.do_ip:
4459
        ip = self.ip
4460
      else:
4461
        ip = self.instance.nics[0].ip
4462
      if self.bridge:
4463
        bridge = self.bridge
4464
      else:
4465
        bridge = self.instance.nics[0].bridge
4466
      if self.mac:
4467
        mac = self.mac
4468
      else:
4469
        mac = self.instance.nics[0].mac
4470
      args['nics'] = [(ip, bridge, mac)]
4471
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4472
    nl = [self.cfg.GetMasterNode(),
4473
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4474
    return env, nl, nl
4475

    
4476
  def CheckPrereq(self):
4477
    """Check prerequisites.
4478

4479
    This only checks the instance list against the existing names.
4480

4481
    """
4482
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4483
    # a separate CheckArguments function, if we implement one, so the operation
4484
    # can be aborted without waiting for any lock, should it have an error...
4485
    self.mem = getattr(self.op, "mem", None)
4486
    self.vcpus = getattr(self.op, "vcpus", None)
4487
    self.ip = getattr(self.op, "ip", None)
4488
    self.mac = getattr(self.op, "mac", None)
4489
    self.bridge = getattr(self.op, "bridge", None)
4490
    self.kernel_path = getattr(self.op, "kernel_path", None)
4491
    self.initrd_path = getattr(self.op, "initrd_path", None)
4492
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4493
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4494
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4495
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4496
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4497
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4498
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4499
    self.force = getattr(self.op, "force", None)
4500
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4501
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4502
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4503
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4504
    if all_parms.count(None) == len(all_parms):
4505
      raise errors.OpPrereqError("No changes submitted")
4506
    if self.mem is not None:
4507
      try:
4508
        self.mem = int(self.mem)
4509
      except ValueError, err:
4510
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4511
    if self.vcpus is not None:
4512
      try:
4513
        self.vcpus = int(self.vcpus)
4514
      except ValueError, err:
4515
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4516
    if self.ip is not None:
4517
      self.do_ip = True
4518
      if self.ip.lower() == "none":
4519
        self.ip = None
4520
      else:
4521
        if not utils.IsValidIP(self.ip):
4522
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4523
    else:
4524
      self.do_ip = False
4525
    self.do_bridge = (self.bridge is not None)
4526
    if self.mac is not None:
4527
      if self.cfg.IsMacInUse(self.mac):
4528
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4529
                                   self.mac)
4530
      if not utils.IsValidMac(self.mac):
4531
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4532

    
4533
    if self.kernel_path is not None:
4534
      self.do_kernel_path = True
4535
      if self.kernel_path == constants.VALUE_NONE:
4536
        raise errors.OpPrereqError("Can't set instance to no kernel")
4537

    
4538
      if self.kernel_path != constants.VALUE_DEFAULT:
4539
        if not os.path.isabs(self.kernel_path):
4540
          raise errors.OpPrereqError("The kernel path must be an absolute"
4541
                                    " filename")
4542
    else:
4543
      self.do_kernel_path = False
4544

    
4545
    if self.initrd_path is not None:
4546
      self.do_initrd_path = True
4547
      if self.initrd_path not in (constants.VALUE_NONE,
4548
                                  constants.VALUE_DEFAULT):
4549
        if not os.path.isabs(self.initrd_path):
4550
          raise errors.OpPrereqError("The initrd path must be an absolute"
4551
                                    " filename")
4552
    else:
4553
      self.do_initrd_path = False
4554

    
4555
    # boot order verification
4556
    if self.hvm_boot_order is not None:
4557
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4558
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4559
          raise errors.OpPrereqError("invalid boot order specified,"
4560
                                     " must be one or more of [acdn]"
4561
                                     " or 'default'")
4562

    
4563
    # hvm_cdrom_image_path verification
4564
    if self.op.hvm_cdrom_image_path is not None:
4565
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4566
              self.op.hvm_cdrom_image_path.lower() == "none"):
4567
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4568
                                   " be an absolute path or None, not %s" %
4569
                                   self.op.hvm_cdrom_image_path)
4570
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4571
              self.op.hvm_cdrom_image_path.lower() == "none"):
4572
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4573
                                   " regular file or a symlink pointing to"
4574
                                   " an existing regular file, not %s" %
4575
                                   self.op.hvm_cdrom_image_path)
4576

    
4577
    # vnc_bind_address verification
4578
    if self.op.vnc_bind_address is not None:
4579
      if not utils.IsValidIP(self.op.vnc_bind_address):
4580
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4581
                                   " like a valid IP address" %
4582
                                   self.op.vnc_bind_address)
4583

    
4584
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4585
    assert self.instance is not None, \
4586
      "Cannot retrieve locked instance %s" % self.op.instance_name
4587
    self.warn = []
4588
    if self.mem is not None and not self.force:
4589
      pnode = self.instance.primary_node
4590
      nodelist = [pnode]
4591
      nodelist.extend(instance.secondary_nodes)
4592
      instance_info = rpc.call_instance_info(pnode, instance.name)
4593
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4594

    
4595
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4596
        # Assume the primary node is unreachable and go ahead
4597
        self.warn.append("Can't get info from primary node %s" % pnode)
4598
      else:
4599
        if instance_info:
4600
          current_mem = instance_info['memory']
4601
        else:
4602
          # Assume instance not running
4603
          # (there is a slight race condition here, but it's not very probable,
4604
          # and we have no other way to check)
4605
          current_mem = 0
4606
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4607
        if miss_mem > 0:
4608
          raise errors.OpPrereqError("This change will prevent the instance"
4609
                                     " from starting, due to %d MB of memory"
4610
                                     " missing on its primary node" % miss_mem)
4611

    
4612
      for node in instance.secondary_nodes:
4613
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4614
          self.warn.append("Can't get info from secondary node %s" % node)
4615
        elif self.mem > nodeinfo[node]['memory_free']:
4616
          self.warn.append("Not enough memory to failover instance to secondary"
4617
                           " node %s" % node)
4618

    
4619
    # Xen HVM device type checks
4620
    if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
4621
      if self.op.hvm_nic_type is not None:
4622
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4623
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4624
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4625
      if self.op.hvm_disk_type is not None:
4626
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4627
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4628
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4629

    
4630
    return
4631

    
4632
  def Exec(self, feedback_fn):
4633
    """Modifies an instance.
4634

4635
    All parameters take effect only at the next restart of the instance.
4636
    """
4637
    # Process here the warnings from CheckPrereq, as we don't have a
4638
    # feedback_fn there.
4639
    for warn in self.warn:
4640
      feedback_fn("WARNING: %s" % warn)
4641

    
4642
    result = []
4643
    instance = self.instance
4644
    if self.mem:
4645
      instance.memory = self.mem
4646
      result.append(("mem", self.mem))
4647
    if self.vcpus:
4648
      instance.vcpus = self.vcpus
4649
      result.append(("vcpus",  self.vcpus))
4650
    if self.do_ip:
4651
      instance.nics[0].ip = self.ip
4652
      result.append(("ip", self.ip))
4653
    if self.bridge:
4654
      instance.nics[0].bridge = self.bridge
4655
      result.append(("bridge", self.bridge))
4656
    if self.mac:
4657
      instance.nics[0].mac = self.mac
4658
      result.append(("mac", self.mac))
4659
    if self.do_kernel_path:
4660
      instance.kernel_path = self.kernel_path
4661
      result.append(("kernel_path", self.kernel_path))
4662
    if self.do_initrd_path:
4663
      instance.initrd_path = self.initrd_path
4664
      result.append(("initrd_path", self.initrd_path))
4665
    if self.hvm_boot_order:
4666
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4667
        instance.hvm_boot_order = None
4668
      else:
4669
        instance.hvm_boot_order = self.hvm_boot_order
4670
      result.append(("hvm_boot_order", self.hvm_boot_order))
4671
    if self.hvm_acpi is not None:
4672
      instance.hvm_acpi = self.hvm_acpi
4673
      result.append(("hvm_acpi", self.hvm_acpi))
4674
    if self.hvm_pae is not None:
4675
      instance.hvm_pae = self.hvm_pae
4676
      result.append(("hvm_pae", self.hvm_pae))
4677
    if self.hvm_nic_type is not None:
4678
      instance.hvm_nic_type = self.hvm_nic_type
4679
      result.append(("hvm_nic_type", self.hvm_nic_type))
4680
    if self.hvm_disk_type is not None:
4681
      instance.hvm_disk_type = self.hvm_disk_type
4682
      result.append(("hvm_disk_type", self.hvm_disk_type))
4683
    if self.hvm_cdrom_image_path:
4684
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4685
        instance.hvm_cdrom_image_path = None
4686
      else:
4687
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4688
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4689
    if self.vnc_bind_address:
4690
      instance.vnc_bind_address = self.vnc_bind_address
4691
      result.append(("vnc_bind_address", self.vnc_bind_address))
4692

    
4693
    self.cfg.Update(instance)
4694

    
4695
    return result
4696

    
4697

    
4698
class LUQueryExports(NoHooksLU):
4699
  """Query the exports list
4700

4701
  """
4702
  _OP_REQP = ['nodes']
4703
  REQ_BGL = False
4704

    
4705
  def ExpandNames(self):
4706
    self.needed_locks = {}
4707
    self.share_locks[locking.LEVEL_NODE] = 1
4708
    if not self.op.nodes:
4709
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4710
    else:
4711
      self.needed_locks[locking.LEVEL_NODE] = \
4712
        _GetWantedNodes(self, self.op.nodes)
4713

    
4714
  def CheckPrereq(self):
4715
    """Check prerequisites.
4716

4717
    """
4718
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4719

    
4720
  def Exec(self, feedback_fn):
4721
    """Compute the list of all the exported system images.
4722

4723
    Returns:
4724
      a dictionary with the structure node->(export-list)
4725
      where export-list is a list of the instances exported on
4726
      that node.
4727

4728
    """
4729
    return rpc.call_export_list(self.nodes)
4730

    
4731

    
4732
class LUExportInstance(LogicalUnit):
4733
  """Export an instance to an image in the cluster.
4734

4735
  """
4736
  HPATH = "instance-export"
4737
  HTYPE = constants.HTYPE_INSTANCE
4738
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4739
  REQ_BGL = False
4740

    
4741
  def ExpandNames(self):
4742
    self._ExpandAndLockInstance()
4743
    # FIXME: lock only instance primary and destination node
4744
    #
4745
    # Sad but true, for now we have do lock all nodes, as we don't know where
4746
    # the previous export might be, and and in this LU we search for it and
4747
    # remove it from its current node. In the future we could fix this by:
4748
    #  - making a tasklet to search (share-lock all), then create the new one,
4749
    #    then one to remove, after
4750
    #  - removing the removal operation altoghether
4751
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4752

    
4753
  def DeclareLocks(self, level):
4754
    """Last minute lock declaration."""
4755
    # All nodes are locked anyway, so nothing to do here.
4756

    
4757
  def BuildHooksEnv(self):
4758
    """Build hooks env.
4759

4760
    This will run on the master, primary node and target node.
4761

4762
    """
4763
    env = {
4764
      "EXPORT_NODE": self.op.target_node,
4765
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4766
      }
4767
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4768
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4769
          self.op.target_node]
4770
    return env, nl, nl
4771

    
4772
  def CheckPrereq(self):
4773
    """Check prerequisites.
4774

4775
    This checks that the instance and node names are valid.
4776

4777
    """
4778
    instance_name = self.op.instance_name
4779
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4780
    assert self.instance is not None, \
4781
          "Cannot retrieve locked instance %s" % self.op.instance_name
4782

    
4783
    self.dst_node = self.cfg.GetNodeInfo(
4784
      self.cfg.ExpandNodeName(self.op.target_node))
4785

    
4786
    assert self.dst_node is not None, \
4787
          "Cannot retrieve locked node %s" % self.op.target_node
4788

    
4789
    # instance disk type verification
4790
    for disk in self.instance.disks:
4791
      if disk.dev_type == constants.LD_FILE:
4792
        raise errors.OpPrereqError("Export not supported for instances with"
4793
                                   " file-based disks")
4794

    
4795
  def Exec(self, feedback_fn):
4796
    """Export an instance to an image in the cluster.
4797

4798
    """
4799
    instance = self.instance
4800
    dst_node = self.dst_node
4801
    src_node = instance.primary_node
4802
    if self.op.shutdown:
4803
      # shutdown the instance, but not the disks
4804
      if not rpc.call_instance_shutdown(src_node, instance):
4805
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4806
                                 (instance.name, src_node))
4807

    
4808
    vgname = self.cfg.GetVGName()
4809

    
4810
    snap_disks = []
4811

    
4812
    try:
4813
      for disk in instance.disks:
4814
        if disk.iv_name == "sda":
4815
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4816
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4817

    
4818
          if not new_dev_name:
4819
            logger.Error("could not snapshot block device %s on node %s" %
4820
                         (disk.logical_id[1], src_node))
4821
          else:
4822
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4823
                                      logical_id=(vgname, new_dev_name),
4824
                                      physical_id=(vgname, new_dev_name),
4825
                                      iv_name=disk.iv_name)
4826
            snap_disks.append(new_dev)
4827

    
4828
    finally:
4829
      if self.op.shutdown and instance.status == "up":
4830
        if not rpc.call_instance_start(src_node, instance, None):
4831
          _ShutdownInstanceDisks(instance, self.cfg)
4832
          raise errors.OpExecError("Could not start instance")
4833

    
4834
    # TODO: check for size
4835

    
4836
    cluster_name = self.cfg.GetClusterName()
4837
    for dev in snap_disks:
4838
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4839
                                      instance, cluster_name):
4840
        logger.Error("could not export block device %s from node %s to node %s"
4841
                     % (dev.logical_id[1], src_node, dst_node.name))
4842
      if not rpc.call_blockdev_remove(src_node, dev):
4843
        logger.Error("could not remove snapshot block device %s from node %s" %
4844
                     (dev.logical_id[1], src_node))
4845

    
4846
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4847
      logger.Error("could not finalize export for instance %s on node %s" %
4848
                   (instance.name, dst_node.name))
4849

    
4850
    nodelist = self.cfg.GetNodeList()
4851
    nodelist.remove(dst_node.name)
4852

    
4853
    # on one-node clusters nodelist will be empty after the removal
4854
    # if we proceed the backup would be removed because OpQueryExports
4855
    # substitutes an empty list with the full cluster node list.
4856
    if nodelist:
4857
      exportlist = rpc.call_export_list(nodelist)
4858
      for node in exportlist:
4859
        if instance.name in exportlist[node]:
4860
          if not rpc.call_export_remove(node, instance.name):
4861
            logger.Error("could not remove older export for instance %s"
4862
                         " on node %s" % (instance.name, node))
4863

    
4864

    
4865
class LURemoveExport(NoHooksLU):
4866
  """Remove exports related to the named instance.
4867

4868
  """
4869
  _OP_REQP = ["instance_name"]
4870
  REQ_BGL = False
4871

    
4872
  def ExpandNames(self):
4873
    self.needed_locks = {}
4874
    # We need all nodes to be locked in order for RemoveExport to work, but we
4875
    # don't need to lock the instance itself, as nothing will happen to it (and
4876
    # we can remove exports also for a removed instance)
4877
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4878

    
4879
  def CheckPrereq(self):
4880
    """Check prerequisites.
4881
    """
4882
    pass
4883

    
4884
  def Exec(self, feedback_fn):
4885
    """Remove any export.
4886

4887
    """
4888
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4889
    # If the instance was not found we'll try with the name that was passed in.
4890
    # This will only work if it was an FQDN, though.
4891
    fqdn_warn = False
4892
    if not instance_name:
4893
      fqdn_warn = True
4894
      instance_name = self.op.instance_name
4895

    
4896
    exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4897
    found = False
4898
    for node in exportlist:
4899
      if instance_name in exportlist[node]:
4900
        found = True
4901
        if not rpc.call_export_remove(node, instance_name):
4902
          logger.Error("could not remove export for instance %s"
4903
                       " on node %s" % (instance_name, node))
4904

    
4905
    if fqdn_warn and not found:
4906
      feedback_fn("Export not found. If trying to remove an export belonging"
4907
                  " to a deleted instance please use its Fully Qualified"
4908
                  " Domain Name.")
4909

    
4910

    
4911
class TagsLU(NoHooksLU):
4912
  """Generic tags LU.
4913

4914
  This is an abstract class which is the parent of all the other tags LUs.
4915

4916
  """
4917

    
4918
  def ExpandNames(self):
4919
    self.needed_locks = {}
4920
    if self.op.kind == constants.TAG_NODE:
4921
      name = self.cfg.ExpandNodeName(self.op.name)
4922
      if name is None:
4923
        raise errors.OpPrereqError("Invalid node name (%s)" %
4924
                                   (self.op.name,))
4925
      self.op.name = name
4926
      self.needed_locks[locking.LEVEL_NODE] = name
4927
    elif self.op.kind == constants.TAG_INSTANCE:
4928
      name = self.cfg.ExpandInstanceName(self.op.name)
4929
      if name is None:
4930
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4931
                                   (self.op.name,))
4932
      self.op.name = name
4933
      self.needed_locks[locking.LEVEL_INSTANCE] = name
4934

    
4935
  def CheckPrereq(self):
4936
    """Check prerequisites.
4937

4938
    """
4939
    if self.op.kind == constants.TAG_CLUSTER:
4940
      self.target = self.cfg.GetClusterInfo()
4941
    elif self.op.kind == constants.TAG_NODE:
4942
      self.target = self.cfg.GetNodeInfo(self.op.name)
4943
    elif self.op.kind == constants.TAG_INSTANCE:
4944
      self.target = self.cfg.GetInstanceInfo(self.op.name)
4945
    else:
4946
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4947
                                 str(self.op.kind))
4948

    
4949

    
4950
class LUGetTags(TagsLU):
4951
  """Returns the tags of a given object.
4952

4953
  """
4954
  _OP_REQP = ["kind", "name"]
4955
  REQ_BGL = False
4956

    
4957
  def Exec(self, feedback_fn):
4958
    """Returns the tag list.
4959

4960
    """
4961
    return list(self.target.GetTags())
4962

    
4963

    
4964
class LUSearchTags(NoHooksLU):
4965
  """Searches the tags for a given pattern.
4966

4967
  """
4968
  _OP_REQP = ["pattern"]
4969
  REQ_BGL = False
4970

    
4971
  def ExpandNames(self):
4972
    self.needed_locks = {}
4973

    
4974
  def CheckPrereq(self):
4975
    """Check prerequisites.
4976

4977
    This checks the pattern passed for validity by compiling it.
4978

4979
    """
4980
    try:
4981
      self.re = re.compile(self.op.pattern)
4982
    except re.error, err:
4983
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4984
                                 (self.op.pattern, err))
4985

    
4986
  def Exec(self, feedback_fn):
4987
    """Returns the tag list.
4988

4989
    """
4990
    cfg = self.cfg
4991
    tgts = [("/cluster", cfg.GetClusterInfo())]
4992
    ilist = cfg.GetAllInstancesInfo().values()
4993
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4994
    nlist = cfg.GetAllNodesInfo().values()
4995
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4996
    results = []
4997
    for path, target in tgts:
4998
      for tag in target.GetTags():
4999
        if self.re.search(tag):
5000
          results.append((path, tag))
5001
    return results
5002

    
5003

    
5004
class LUAddTags(TagsLU):
5005
  """Sets a tag on a given object.
5006

5007
  """
5008
  _OP_REQP = ["kind", "name", "tags"]
5009
  REQ_BGL = False
5010

    
5011
  def CheckPrereq(self):
5012
    """Check prerequisites.
5013

5014
    This checks the type and length of the tag name and value.
5015

5016
    """
5017
    TagsLU.CheckPrereq(self)
5018
    for tag in self.op.tags:
5019
      objects.TaggableObject.ValidateTag(tag)
5020

    
5021
  def Exec(self, feedback_fn):
5022
    """Sets the tag.
5023

5024
    """
5025
    try:
5026
      for tag in self.op.tags:
5027
        self.target.AddTag(tag)
5028
    except errors.TagError, err:
5029
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5030
    try:
5031
      self.cfg.Update(self.target)
5032
    except errors.ConfigurationError:
5033
      raise errors.OpRetryError("There has been a modification to the"
5034
                                " config file and the operation has been"
5035
                                " aborted. Please retry.")
5036

    
5037

    
5038
class LUDelTags(TagsLU):
5039
  """Delete a list of tags from a given object.
5040

5041
  """
5042
  _OP_REQP = ["kind", "name", "tags"]
5043
  REQ_BGL = False
5044

    
5045
  def CheckPrereq(self):
5046
    """Check prerequisites.
5047

5048
    This checks that we have the given tag.
5049

5050
    """
5051
    TagsLU.CheckPrereq(self)
5052
    for tag in self.op.tags:
5053
      objects.TaggableObject.ValidateTag(tag)
5054
    del_tags = frozenset(self.op.tags)
5055
    cur_tags = self.target.GetTags()
5056
    if not del_tags <= cur_tags:
5057
      diff_tags = del_tags - cur_tags
5058
      diff_names = ["'%s'" % tag for tag in diff_tags]
5059
      diff_names.sort()
5060
      raise errors.OpPrereqError("Tag(s) %s not found" %
5061
                                 (",".join(diff_names)))
5062

    
5063
  def Exec(self, feedback_fn):
5064
    """Remove the tag from the object.
5065

5066
    """
5067
    for tag in self.op.tags:
5068
      self.target.RemoveTag(tag)
5069
    try:
5070
      self.cfg.Update(self.target)
5071
    except errors.ConfigurationError:
5072
      raise errors.OpRetryError("There has been a modification to the"
5073
                                " config file and the operation has been"
5074
                                " aborted. Please retry.")
5075

    
5076

    
5077
class LUTestDelay(NoHooksLU):
5078
  """Sleep for a specified amount of time.
5079

5080
  This LU sleeps on the master and/or nodes for a specified amount of
5081
  time.
5082

5083
  """
5084
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5085
  REQ_BGL = False
5086

    
5087
  def ExpandNames(self):
5088
    """Expand names and set required locks.
5089

5090
    This expands the node list, if any.
5091

5092
    """
5093
    self.needed_locks = {}
5094
    if self.op.on_nodes:
5095
      # _GetWantedNodes can be used here, but is not always appropriate to use
5096
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5097
      # more information.
5098
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5099
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5100

    
5101
  def CheckPrereq(self):
5102
    """Check prerequisites.
5103

5104
    """
5105

    
5106
  def Exec(self, feedback_fn):
5107
    """Do the actual sleep.
5108

5109
    """
5110
    if self.op.on_master:
5111
      if not utils.TestDelay(self.op.duration):
5112
        raise errors.OpExecError("Error during master delay test")
5113
    if self.op.on_nodes:
5114
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5115
      if not result:
5116
        raise errors.OpExecError("Complete failure from rpc call")
5117
      for node, node_result in result.items():
5118
        if not node_result:
5119
          raise errors.OpExecError("Failure during rpc call to node %s,"
5120
                                   " result: %s" % (node, node_result))
5121

    
5122

    
5123
class IAllocator(object):
5124
  """IAllocator framework.
5125

5126
  An IAllocator instance has three sets of attributes:
5127
    - cfg that is needed to query the cluster
5128
    - input data (all members of the _KEYS class attribute are required)
5129
    - four buffer attributes (in|out_data|text), that represent the
5130
      input (to the external script) in text and data structure format,
5131
      and the output from it, again in two formats
5132
    - the result variables from the script (success, info, nodes) for
5133
      easy usage
5134

5135
  """
5136
  _ALLO_KEYS = [
5137
    "mem_size", "disks", "disk_template",
5138
    "os", "tags", "nics", "vcpus",
5139
    ]
5140
  _RELO_KEYS = [
5141
    "relocate_from",
5142
    ]
5143

    
5144
  def __init__(self, cfg, mode, name, **kwargs):
5145
    self.cfg = cfg
5146
    # init buffer variables
5147
    self.in_text = self.out_text = self.in_data = self.out_data = None
5148
    # init all input fields so that pylint is happy
5149
    self.mode = mode
5150
    self.name = name
5151
    self.mem_size = self.disks = self.disk_template = None
5152
    self.os = self.tags = self.nics = self.vcpus = None
5153
    self.relocate_from = None
5154
    # computed fields
5155
    self.required_nodes = None
5156
    # init result fields
5157
    self.success = self.info = self.nodes = None
5158
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5159
      keyset = self._ALLO_KEYS
5160
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5161
      keyset = self._RELO_KEYS
5162
    else:
5163
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5164
                                   " IAllocator" % self.mode)
5165
    for key in kwargs:
5166
      if key not in keyset:
5167
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5168
                                     " IAllocator" % key)
5169
      setattr(self, key, kwargs[key])
5170
    for key in keyset:
5171
      if key not in kwargs:
5172
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5173
                                     " IAllocator" % key)
5174
    self._BuildInputData()
5175

    
5176
  def _ComputeClusterData(self):
5177
    """Compute the generic allocator input data.
5178

5179
    This is the data that is independent of the actual operation.
5180

5181
    """
5182
    cfg = self.cfg
5183
    # cluster data
5184
    data = {
5185
      "version": 1,
5186
      "cluster_name": self.cfg.GetClusterName(),
5187
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5188
      "hypervisor_type": self.cfg.GetHypervisorType(),
5189
      # we don't have job IDs
5190
      }
5191

    
5192
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5193

    
5194
    # node data
5195
    node_results = {}
5196
    node_list = cfg.GetNodeList()
5197
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5198
    for nname in node_list:
5199
      ninfo = cfg.GetNodeInfo(nname)
5200
      if nname not in node_data or not isinstance(node_data[nname], dict):
5201
        raise errors.OpExecError("Can't get data for node %s" % nname)
5202
      remote_info = node_data[nname]
5203
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5204
                   'vg_size', 'vg_free', 'cpu_total']:
5205
        if attr not in remote_info:
5206
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5207
                                   (nname, attr))
5208
        try:
5209
          remote_info[attr] = int(remote_info[attr])
5210
        except ValueError, err:
5211
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5212
                                   " %s" % (nname, attr, str(err)))
5213
      # compute memory used by primary instances
5214
      i_p_mem = i_p_up_mem = 0
5215
      for iinfo in i_list:
5216
        if iinfo.primary_node == nname:
5217
          i_p_mem += iinfo.memory
5218
          if iinfo.status == "up":
5219
            i_p_up_mem += iinfo.memory
5220

    
5221
      # compute memory used by instances
5222
      pnr = {
5223
        "tags": list(ninfo.GetTags()),
5224
        "total_memory": remote_info['memory_total'],
5225
        "reserved_memory": remote_info['memory_dom0'],
5226
        "free_memory": remote_info['memory_free'],
5227
        "i_pri_memory": i_p_mem,
5228
        "i_pri_up_memory": i_p_up_mem,
5229
        "total_disk": remote_info['vg_size'],
5230
        "free_disk": remote_info['vg_free'],
5231
        "primary_ip": ninfo.primary_ip,
5232
        "secondary_ip": ninfo.secondary_ip,
5233
        "total_cpus": remote_info['cpu_total'],
5234
        }
5235
      node_results[nname] = pnr
5236
    data["nodes"] = node_results
5237

    
5238
    # instance data
5239
    instance_data = {}
5240
    for iinfo in i_list:
5241
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5242
                  for n in iinfo.nics]
5243
      pir = {
5244
        "tags": list(iinfo.GetTags()),
5245
        "should_run": iinfo.status == "up",
5246
        "vcpus": iinfo.vcpus,
5247
        "memory": iinfo.memory,
5248
        "os": iinfo.os,
5249
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5250
        "nics": nic_data,
5251
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5252
        "disk_template": iinfo.disk_template,
5253
        }
5254
      instance_data[iinfo.name] = pir
5255

    
5256
    data["instances"] = instance_data
5257

    
5258
    self.in_data = data
5259

    
5260
  def _AddNewInstance(self):
5261
    """Add new instance data to allocator structure.
5262

5263
    This in combination with _AllocatorGetClusterData will create the
5264
    correct structure needed as input for the allocator.
5265

5266
    The checks for the completeness of the opcode must have already been
5267
    done.
5268

5269
    """
5270
    data = self.in_data
5271
    if len(self.disks) != 2:
5272
      raise errors.OpExecError("Only two-disk configurations supported")
5273

    
5274
    disk_space = _ComputeDiskSize(self.disk_template,
5275
                                  self.disks[0]["size"], self.disks[1]["size"])
5276

    
5277
    if self.disk_template in constants.DTS_NET_MIRROR:
5278
      self.required_nodes = 2
5279
    else:
5280
      self.required_nodes = 1
5281
    request = {
5282
      "type": "allocate",
5283
      "name": self.name,
5284
      "disk_template": self.disk_template,
5285
      "tags": self.tags,
5286
      "os": self.os,
5287
      "vcpus": self.vcpus,
5288
      "memory": self.mem_size,
5289
      "disks": self.disks,
5290
      "disk_space_total": disk_space,
5291
      "nics": self.nics,
5292
      "required_nodes": self.required_nodes,
5293
      }
5294
    data["request"] = request
5295

    
5296
  def _AddRelocateInstance(self):
5297
    """Add relocate instance data to allocator structure.
5298

5299
    This in combination with _IAllocatorGetClusterData will create the
5300
    correct structure needed as input for the allocator.
5301

5302
    The checks for the completeness of the opcode must have already been
5303
    done.
5304

5305
    """
5306
    instance = self.cfg.GetInstanceInfo(self.name)
5307
    if instance is None:
5308
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5309
                                   " IAllocator" % self.name)
5310

    
5311
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5312
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5313

    
5314
    if len(instance.secondary_nodes) != 1:
5315
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5316

    
5317
    self.required_nodes = 1
5318

    
5319
    disk_space = _ComputeDiskSize(instance.disk_template,
5320
                                  instance.disks[0].size,
5321
                                  instance.disks[1].size)
5322

    
5323
    request = {
5324
      "type": "relocate",
5325
      "name": self.name,
5326
      "disk_space_total": disk_space,
5327
      "required_nodes": self.required_nodes,
5328
      "relocate_from": self.relocate_from,
5329
      }
5330
    self.in_data["request"] = request
5331

    
5332
  def _BuildInputData(self):
5333
    """Build input data structures.
5334

5335
    """
5336
    self._ComputeClusterData()
5337

    
5338
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5339
      self._AddNewInstance()
5340
    else:
5341
      self._AddRelocateInstance()
5342

    
5343
    self.in_text = serializer.Dump(self.in_data)
5344

    
5345
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5346
    """Run an instance allocator and return the results.
5347

5348
    """
5349
    data = self.in_text
5350

    
5351
    result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
5352

    
5353
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5354
      raise errors.OpExecError("Invalid result from master iallocator runner")
5355

    
5356
    rcode, stdout, stderr, fail = result
5357

    
5358
    if rcode == constants.IARUN_NOTFOUND:
5359
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5360
    elif rcode == constants.IARUN_FAILURE:
5361
      raise errors.OpExecError("Instance allocator call failed: %s,"
5362
                               " output: %s" % (fail, stdout+stderr))
5363
    self.out_text = stdout
5364
    if validate:
5365
      self._ValidateResult()
5366

    
5367
  def _ValidateResult(self):
5368
    """Process the allocator results.
5369

5370
    This will process and if successful save the result in
5371
    self.out_data and the other parameters.
5372

5373
    """
5374
    try:
5375
      rdict = serializer.Load(self.out_text)
5376
    except Exception, err:
5377
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5378

    
5379
    if not isinstance(rdict, dict):
5380
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5381

    
5382
    for key in "success", "info", "nodes":
5383
      if key not in rdict:
5384
        raise errors.OpExecError("Can't parse iallocator results:"
5385
                                 " missing key '%s'" % key)
5386
      setattr(self, key, rdict[key])
5387

    
5388
    if not isinstance(rdict["nodes"], list):
5389
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5390
                               " is not a list")
5391
    self.out_data = rdict
5392

    
5393

    
5394
class LUTestAllocator(NoHooksLU):
5395
  """Run allocator tests.
5396

5397
  This LU runs the allocator tests
5398

5399
  """
5400
  _OP_REQP = ["direction", "mode", "name"]
5401

    
5402
  def CheckPrereq(self):
5403
    """Check prerequisites.
5404

5405
    This checks the opcode parameters depending on the director and mode test.
5406

5407
    """
5408
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5409
      for attr in ["name", "mem_size", "disks", "disk_template",
5410
                   "os", "tags", "nics", "vcpus"]:
5411
        if not hasattr(self.op, attr):
5412
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5413
                                     attr)
5414
      iname = self.cfg.ExpandInstanceName(self.op.name)
5415
      if iname is not None:
5416
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5417
                                   iname)
5418
      if not isinstance(self.op.nics, list):
5419
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5420
      for row in self.op.nics:
5421
        if (not isinstance(row, dict) or
5422
            "mac" not in row or
5423
            "ip" not in row or
5424
            "bridge" not in row):
5425
          raise errors.OpPrereqError("Invalid contents of the"
5426
                                     " 'nics' parameter")
5427
      if not isinstance(self.op.disks, list):
5428
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5429
      if len(self.op.disks) != 2:
5430
        raise errors.OpPrereqError("Only two-disk configurations supported")
5431
      for row in self.op.disks:
5432
        if (not isinstance(row, dict) or
5433
            "size" not in row or
5434
            not isinstance(row["size"], int) or
5435
            "mode" not in row or
5436
            row["mode"] not in ['r', 'w']):
5437
          raise errors.OpPrereqError("Invalid contents of the"
5438
                                     " 'disks' parameter")
5439
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5440
      if not hasattr(self.op, "name"):
5441
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5442
      fname = self.cfg.ExpandInstanceName(self.op.name)
5443
      if fname is None:
5444
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5445
                                   self.op.name)
5446
      self.op.name = fname
5447
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5448
    else:
5449
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5450
                                 self.op.mode)
5451

    
5452
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5453
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5454
        raise errors.OpPrereqError("Missing allocator name")
5455
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5456
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5457
                                 self.op.direction)
5458

    
5459
  def Exec(self, feedback_fn):
5460
    """Run the allocator test.
5461

5462
    """
5463
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5464
      ial = IAllocator(self.cfg,
5465
                       mode=self.op.mode,
5466
                       name=self.op.name,
5467
                       mem_size=self.op.mem_size,
5468
                       disks=self.op.disks,
5469
                       disk_template=self.op.disk_template,
5470
                       os=self.op.os,
5471
                       tags=self.op.tags,
5472
                       nics=self.op.nics,
5473
                       vcpus=self.op.vcpus,
5474
                       )
5475
    else:
5476
      ial = IAllocator(self.cfg,
5477
                       mode=self.op.mode,
5478
                       name=self.op.name,
5479
                       relocate_from=list(self.relocate_from),
5480
                       )
5481

    
5482
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5483
      result = ial.in_text
5484
    else:
5485
      ial.Run(self.op.allocator, validate=False)
5486
      result = ial.out_text
5487
    return result