Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 6b0469d2

History | View | Annotate | Download (188.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_BGL = True
69

    
70
  def __init__(self, processor, op, context):
71
    """Constructor for LogicalUnit.
72

73
    This needs to be overriden in derived classes in order to check op
74
    validity.
75

76
    """
77
    self.proc = processor
78
    self.op = op
79
    self.cfg = context.cfg
80
    self.context = context
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90

    
91
    for attr_name in self._OP_REQP:
92
      attr_val = getattr(op, attr_name, None)
93
      if attr_val is None:
94
        raise errors.OpPrereqError("Required parameter '%s' missing" %
95
                                   attr_name)
96

    
97
    if not self.cfg.IsCluster():
98
      raise errors.OpPrereqError("Cluster not initialized yet,"
99
                                 " use 'gnt-cluster init' first.")
100
    if self.REQ_MASTER:
101
      master = self.cfg.GetMasterNode()
102
      if master != utils.HostInfo().name:
103
        raise errors.OpPrereqError("Commands must be run on the master"
104
                                   " node %s" % master)
105

    
106
  def __GetSSH(self):
107
    """Returns the SshRunner object
108

109
    """
110
    if not self.__ssh:
111
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
112
    return self.__ssh
113

    
114
  ssh = property(fget=__GetSSH)
115

    
116
  def ExpandNames(self):
117
    """Expand names for this LU.
118

119
    This method is called before starting to execute the opcode, and it should
120
    update all the parameters of the opcode to their canonical form (e.g. a
121
    short node name must be fully expanded after this method has successfully
122
    completed). This way locking, hooks, logging, ecc. can work correctly.
123

124
    LUs which implement this method must also populate the self.needed_locks
125
    member, as a dict with lock levels as keys, and a list of needed lock names
126
    as values. Rules:
127
      - Use an empty dict if you don't need any lock
128
      - If you don't need any lock at a particular level omit that level
129
      - Don't put anything for the BGL level
130
      - If you want all locks at a level use locking.ALL_SET as a value
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: locking.ALL_SET,
140
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265
  def _LockInstancesNodes(self, primary_only=False):
266
    """Helper function to declare instances' nodes for locking.
267

268
    This function should be called after locking one or more instances to lock
269
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270
    with all primary or secondary nodes for instances already locked and
271
    present in self.needed_locks[locking.LEVEL_INSTANCE].
272

273
    It should be called from DeclareLocks, and for safety only works if
274
    self.recalculate_locks[locking.LEVEL_NODE] is set.
275

276
    In the future it may grow parameters to just lock some instance's nodes, or
277
    to just lock primaries or secondary nodes, if needed.
278

279
    If should be called in DeclareLocks in a way similar to:
280

281
    if level == locking.LEVEL_NODE:
282
      self._LockInstancesNodes()
283

284
    @type primary_only: boolean
285
    @param primary_only: only lock primary nodes of locked instances
286

287
    """
288
    assert locking.LEVEL_NODE in self.recalculate_locks, \
289
      "_LockInstancesNodes helper function called with no nodes to recalculate"
290

    
291
    # TODO: check if we're really been called with the instance locks held
292

    
293
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294
    # future we might want to have different behaviors depending on the value
295
    # of self.recalculate_locks[locking.LEVEL_NODE]
296
    wanted_nodes = []
297
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298
      instance = self.context.cfg.GetInstanceInfo(instance_name)
299
      wanted_nodes.append(instance.primary_node)
300
      if not primary_only:
301
        wanted_nodes.extend(instance.secondary_nodes)
302

    
303
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
307

    
308
    del self.recalculate_locks[locking.LEVEL_NODE]
309

    
310

    
311
class NoHooksLU(LogicalUnit):
312
  """Simple LU which runs no hooks.
313

314
  This LU is intended as a parent for other LogicalUnits which will
315
  run no hooks, in order to reduce duplicate code.
316

317
  """
318
  HPATH = None
319
  HTYPE = None
320

    
321

    
322
def _GetWantedNodes(lu, nodes):
323
  """Returns list of checked and expanded node names.
324

325
  Args:
326
    nodes: List of nodes (strings) or None for all
327

328
  """
329
  if not isinstance(nodes, list):
330
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
331

    
332
  if not nodes:
333
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334
      " non-empty list of nodes whose name is to be expanded.")
335

    
336
  wanted = []
337
  for name in nodes:
338
    node = lu.cfg.ExpandNodeName(name)
339
    if node is None:
340
      raise errors.OpPrereqError("No such node name '%s'" % name)
341
    wanted.append(node)
342

    
343
  return utils.NiceSort(wanted)
344

    
345

    
346
def _GetWantedInstances(lu, instances):
347
  """Returns list of checked and expanded instance names.
348

349
  Args:
350
    instances: List of instances (strings) or None for all
351

352
  """
353
  if not isinstance(instances, list):
354
    raise errors.OpPrereqError("Invalid argument type 'instances'")
355

    
356
  if instances:
357
    wanted = []
358

    
359
    for name in instances:
360
      instance = lu.cfg.ExpandInstanceName(name)
361
      if instance is None:
362
        raise errors.OpPrereqError("No such instance name '%s'" % name)
363
      wanted.append(instance)
364

    
365
  else:
366
    wanted = lu.cfg.GetInstanceList()
367
  return utils.NiceSort(wanted)
368

    
369

    
370
def _CheckOutputFields(static, dynamic, selected):
371
  """Checks whether all selected fields are valid.
372

373
  Args:
374
    static: Static fields
375
    dynamic: Dynamic fields
376

377
  """
378
  static_fields = frozenset(static)
379
  dynamic_fields = frozenset(dynamic)
380

    
381
  all_fields = static_fields | dynamic_fields
382

    
383
  if not all_fields.issuperset(selected):
384
    raise errors.OpPrereqError("Unknown output fields selected: %s"
385
                               % ",".join(frozenset(selected).
386
                                          difference(all_fields)))
387

    
388

    
389
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390
                          memory, vcpus, nics):
391
  """Builds instance related env variables for hooks from single variables.
392

393
  Args:
394
    secondary_nodes: List of secondary nodes as strings
395
  """
396
  env = {
397
    "OP_TARGET": name,
398
    "INSTANCE_NAME": name,
399
    "INSTANCE_PRIMARY": primary_node,
400
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401
    "INSTANCE_OS_TYPE": os_type,
402
    "INSTANCE_STATUS": status,
403
    "INSTANCE_MEMORY": memory,
404
    "INSTANCE_VCPUS": vcpus,
405
  }
406

    
407
  if nics:
408
    nic_count = len(nics)
409
    for idx, (ip, bridge, mac) in enumerate(nics):
410
      if ip is None:
411
        ip = ""
412
      env["INSTANCE_NIC%d_IP" % idx] = ip
413
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
415
  else:
416
    nic_count = 0
417

    
418
  env["INSTANCE_NIC_COUNT"] = nic_count
419

    
420
  return env
421

    
422

    
423
def _BuildInstanceHookEnvByObject(instance, override=None):
424
  """Builds instance related env variables for hooks from an object.
425

426
  Args:
427
    instance: objects.Instance object of instance
428
    override: dict of values to override
429
  """
430
  args = {
431
    'name': instance.name,
432
    'primary_node': instance.primary_node,
433
    'secondary_nodes': instance.secondary_nodes,
434
    'os_type': instance.os,
435
    'status': instance.os,
436
    'memory': instance.memory,
437
    'vcpus': instance.vcpus,
438
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
439
  }
440
  if override:
441
    args.update(override)
442
  return _BuildInstanceHookEnv(**args)
443

    
444

    
445
def _CheckInstanceBridgesExist(instance):
446
  """Check that the brigdes needed by an instance exist.
447

448
  """
449
  # check bridges existance
450
  brlist = [nic.bridge for nic in instance.nics]
451
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
452
    raise errors.OpPrereqError("one or more target bridges %s does not"
453
                               " exist on destination node '%s'" %
454
                               (brlist, instance.primary_node))
455

    
456

    
457
class LUDestroyCluster(NoHooksLU):
458
  """Logical unit for destroying the cluster.
459

460
  """
461
  _OP_REQP = []
462

    
463
  def CheckPrereq(self):
464
    """Check prerequisites.
465

466
    This checks whether the cluster is empty.
467

468
    Any errors are signalled by raising errors.OpPrereqError.
469

470
    """
471
    master = self.cfg.GetMasterNode()
472

    
473
    nodelist = self.cfg.GetNodeList()
474
    if len(nodelist) != 1 or nodelist[0] != master:
475
      raise errors.OpPrereqError("There are still %d node(s) in"
476
                                 " this cluster." % (len(nodelist) - 1))
477
    instancelist = self.cfg.GetInstanceList()
478
    if instancelist:
479
      raise errors.OpPrereqError("There are still %d instance(s) in"
480
                                 " this cluster." % len(instancelist))
481

    
482
  def Exec(self, feedback_fn):
483
    """Destroys the cluster.
484

485
    """
486
    master = self.cfg.GetMasterNode()
487
    if not rpc.call_node_stop_master(master, False):
488
      raise errors.OpExecError("Could not disable the master role")
489
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
490
    utils.CreateBackup(priv_key)
491
    utils.CreateBackup(pub_key)
492
    return master
493

    
494

    
495
class LUVerifyCluster(LogicalUnit):
496
  """Verifies the cluster status.
497

498
  """
499
  HPATH = "cluster-verify"
500
  HTYPE = constants.HTYPE_CLUSTER
501
  _OP_REQP = ["skip_checks"]
502
  REQ_BGL = False
503

    
504
  def ExpandNames(self):
505
    self.needed_locks = {
506
      locking.LEVEL_NODE: locking.ALL_SET,
507
      locking.LEVEL_INSTANCE: locking.ALL_SET,
508
    }
509
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
510

    
511
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
512
                  remote_version, feedback_fn):
513
    """Run multiple tests against a node.
514

515
    Test list:
516
      - compares ganeti version
517
      - checks vg existance and size > 20G
518
      - checks config file checksum
519
      - checks ssh to other nodes
520

521
    Args:
522
      node: name of the node to check
523
      file_list: required list of files
524
      local_cksum: dictionary of local files and their checksums
525

526
    """
527
    # compares ganeti version
528
    local_version = constants.PROTOCOL_VERSION
529
    if not remote_version:
530
      feedback_fn("  - ERROR: connection to %s failed" % (node))
531
      return True
532

    
533
    if local_version != remote_version:
534
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
535
                      (local_version, node, remote_version))
536
      return True
537

    
538
    # checks vg existance and size > 20G
539

    
540
    bad = False
541
    if not vglist:
542
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
543
                      (node,))
544
      bad = True
545
    else:
546
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
547
                                            constants.MIN_VG_SIZE)
548
      if vgstatus:
549
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
550
        bad = True
551

    
552
    # checks config file checksum
553
    # checks ssh to any
554

    
555
    if 'filelist' not in node_result:
556
      bad = True
557
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
558
    else:
559
      remote_cksum = node_result['filelist']
560
      for file_name in file_list:
561
        if file_name not in remote_cksum:
562
          bad = True
563
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
564
        elif remote_cksum[file_name] != local_cksum[file_name]:
565
          bad = True
566
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
567

    
568
    if 'nodelist' not in node_result:
569
      bad = True
570
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
571
    else:
572
      if node_result['nodelist']:
573
        bad = True
574
        for node in node_result['nodelist']:
575
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
576
                          (node, node_result['nodelist'][node]))
577
    if 'node-net-test' not in node_result:
578
      bad = True
579
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
580
    else:
581
      if node_result['node-net-test']:
582
        bad = True
583
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
584
        for node in nlist:
585
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
586
                          (node, node_result['node-net-test'][node]))
587

    
588
    hyp_result = node_result.get('hypervisor', None)
589
    if hyp_result is not None:
590
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
591
    return bad
592

    
593
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
594
                      node_instance, feedback_fn):
595
    """Verify an instance.
596

597
    This function checks to see if the required block devices are
598
    available on the instance's node.
599

600
    """
601
    bad = False
602

    
603
    node_current = instanceconfig.primary_node
604

    
605
    node_vol_should = {}
606
    instanceconfig.MapLVsByNode(node_vol_should)
607

    
608
    for node in node_vol_should:
609
      for volume in node_vol_should[node]:
610
        if node not in node_vol_is or volume not in node_vol_is[node]:
611
          feedback_fn("  - ERROR: volume %s missing on node %s" %
612
                          (volume, node))
613
          bad = True
614

    
615
    if not instanceconfig.status == 'down':
616
      if (node_current not in node_instance or
617
          not instance in node_instance[node_current]):
618
        feedback_fn("  - ERROR: instance %s not running on node %s" %
619
                        (instance, node_current))
620
        bad = True
621

    
622
    for node in node_instance:
623
      if (not node == node_current):
624
        if instance in node_instance[node]:
625
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
626
                          (instance, node))
627
          bad = True
628

    
629
    return bad
630

    
631
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
632
    """Verify if there are any unknown volumes in the cluster.
633

634
    The .os, .swap and backup volumes are ignored. All other volumes are
635
    reported as unknown.
636

637
    """
638
    bad = False
639

    
640
    for node in node_vol_is:
641
      for volume in node_vol_is[node]:
642
        if node not in node_vol_should or volume not in node_vol_should[node]:
643
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
644
                      (volume, node))
645
          bad = True
646
    return bad
647

    
648
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
649
    """Verify the list of running instances.
650

651
    This checks what instances are running but unknown to the cluster.
652

653
    """
654
    bad = False
655
    for node in node_instance:
656
      for runninginstance in node_instance[node]:
657
        if runninginstance not in instancelist:
658
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
659
                          (runninginstance, node))
660
          bad = True
661
    return bad
662

    
663
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
664
    """Verify N+1 Memory Resilience.
665

666
    Check that if one single node dies we can still start all the instances it
667
    was primary for.
668

669
    """
670
    bad = False
671

    
672
    for node, nodeinfo in node_info.iteritems():
673
      # This code checks that every node which is now listed as secondary has
674
      # enough memory to host all instances it is supposed to should a single
675
      # other node in the cluster fail.
676
      # FIXME: not ready for failover to an arbitrary node
677
      # FIXME: does not support file-backed instances
678
      # WARNING: we currently take into account down instances as well as up
679
      # ones, considering that even if they're down someone might want to start
680
      # them even in the event of a node failure.
681
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
682
        needed_mem = 0
683
        for instance in instances:
684
          needed_mem += instance_cfg[instance].memory
685
        if nodeinfo['mfree'] < needed_mem:
686
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
687
                      " failovers should node %s fail" % (node, prinode))
688
          bad = True
689
    return bad
690

    
691
  def CheckPrereq(self):
692
    """Check prerequisites.
693

694
    Transform the list of checks we're going to skip into a set and check that
695
    all its members are valid.
696

697
    """
698
    self.skip_set = frozenset(self.op.skip_checks)
699
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
700
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
701

    
702
  def BuildHooksEnv(self):
703
    """Build hooks env.
704

705
    Cluster-Verify hooks just rone in the post phase and their failure makes
706
    the output be logged in the verify output and the verification to fail.
707

708
    """
709
    all_nodes = self.cfg.GetNodeList()
710
    # TODO: populate the environment with useful information for verify hooks
711
    env = {}
712
    return env, [], all_nodes
713

    
714
  def Exec(self, feedback_fn):
715
    """Verify integrity of cluster, performing various test on nodes.
716

717
    """
718
    bad = False
719
    feedback_fn("* Verifying global settings")
720
    for msg in self.cfg.VerifyConfig():
721
      feedback_fn("  - ERROR: %s" % msg)
722

    
723
    vg_name = self.cfg.GetVGName()
724
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
725
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
726
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
727
    i_non_redundant = [] # Non redundant instances
728
    node_volume = {}
729
    node_instance = {}
730
    node_info = {}
731
    instance_cfg = {}
732

    
733
    # FIXME: verify OS list
734
    # do local checksums
735
    file_names = []
736
    file_names.append(constants.SSL_CERT_FILE)
737
    file_names.append(constants.CLUSTER_CONF_FILE)
738
    local_checksums = utils.FingerprintFiles(file_names)
739

    
740
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
741
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
742
    all_instanceinfo = rpc.call_instance_list(nodelist)
743
    all_vglist = rpc.call_vg_list(nodelist)
744
    node_verify_param = {
745
      'filelist': file_names,
746
      'nodelist': nodelist,
747
      'hypervisor': None,
748
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
749
                        for node in nodeinfo]
750
      }
751
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
752
    all_rversion = rpc.call_version(nodelist)
753
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
754

    
755
    for node in nodelist:
756
      feedback_fn("* Verifying node %s" % node)
757
      result = self._VerifyNode(node, file_names, local_checksums,
758
                                all_vglist[node], all_nvinfo[node],
759
                                all_rversion[node], feedback_fn)
760
      bad = bad or result
761

    
762
      # node_volume
763
      volumeinfo = all_volumeinfo[node]
764

    
765
      if isinstance(volumeinfo, basestring):
766
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
767
                    (node, volumeinfo[-400:].encode('string_escape')))
768
        bad = True
769
        node_volume[node] = {}
770
      elif not isinstance(volumeinfo, dict):
771
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
772
        bad = True
773
        continue
774
      else:
775
        node_volume[node] = volumeinfo
776

    
777
      # node_instance
778
      nodeinstance = all_instanceinfo[node]
779
      if type(nodeinstance) != list:
780
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
781
        bad = True
782
        continue
783

    
784
      node_instance[node] = nodeinstance
785

    
786
      # node_info
787
      nodeinfo = all_ninfo[node]
788
      if not isinstance(nodeinfo, dict):
789
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
790
        bad = True
791
        continue
792

    
793
      try:
794
        node_info[node] = {
795
          "mfree": int(nodeinfo['memory_free']),
796
          "dfree": int(nodeinfo['vg_free']),
797
          "pinst": [],
798
          "sinst": [],
799
          # dictionary holding all instances this node is secondary for,
800
          # grouped by their primary node. Each key is a cluster node, and each
801
          # value is a list of instances which have the key as primary and the
802
          # current node as secondary.  this is handy to calculate N+1 memory
803
          # availability if you can only failover from a primary to its
804
          # secondary.
805
          "sinst-by-pnode": {},
806
        }
807
      except ValueError:
808
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
809
        bad = True
810
        continue
811

    
812
    node_vol_should = {}
813

    
814
    for instance in instancelist:
815
      feedback_fn("* Verifying instance %s" % instance)
816
      inst_config = self.cfg.GetInstanceInfo(instance)
817
      result =  self._VerifyInstance(instance, inst_config, node_volume,
818
                                     node_instance, feedback_fn)
819
      bad = bad or result
820

    
821
      inst_config.MapLVsByNode(node_vol_should)
822

    
823
      instance_cfg[instance] = inst_config
824

    
825
      pnode = inst_config.primary_node
826
      if pnode in node_info:
827
        node_info[pnode]['pinst'].append(instance)
828
      else:
829
        feedback_fn("  - ERROR: instance %s, connection to primary node"
830
                    " %s failed" % (instance, pnode))
831
        bad = True
832

    
833
      # If the instance is non-redundant we cannot survive losing its primary
834
      # node, so we are not N+1 compliant. On the other hand we have no disk
835
      # templates with more than one secondary so that situation is not well
836
      # supported either.
837
      # FIXME: does not support file-backed instances
838
      if len(inst_config.secondary_nodes) == 0:
839
        i_non_redundant.append(instance)
840
      elif len(inst_config.secondary_nodes) > 1:
841
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
842
                    % instance)
843

    
844
      for snode in inst_config.secondary_nodes:
845
        if snode in node_info:
846
          node_info[snode]['sinst'].append(instance)
847
          if pnode not in node_info[snode]['sinst-by-pnode']:
848
            node_info[snode]['sinst-by-pnode'][pnode] = []
849
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
850
        else:
851
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
852
                      " %s failed" % (instance, snode))
853

    
854
    feedback_fn("* Verifying orphan volumes")
855
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
856
                                       feedback_fn)
857
    bad = bad or result
858

    
859
    feedback_fn("* Verifying remaining instances")
860
    result = self._VerifyOrphanInstances(instancelist, node_instance,
861
                                         feedback_fn)
862
    bad = bad or result
863

    
864
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
865
      feedback_fn("* Verifying N+1 Memory redundancy")
866
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
867
      bad = bad or result
868

    
869
    feedback_fn("* Other Notes")
870
    if i_non_redundant:
871
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
872
                  % len(i_non_redundant))
873

    
874
    return not bad
875

    
876
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
877
    """Analize the post-hooks' result, handle it, and send some
878
    nicely-formatted feedback back to the user.
879

880
    Args:
881
      phase: the hooks phase that has just been run
882
      hooks_results: the results of the multi-node hooks rpc call
883
      feedback_fn: function to send feedback back to the caller
884
      lu_result: previous Exec result
885

886
    """
887
    # We only really run POST phase hooks, and are only interested in
888
    # their results
889
    if phase == constants.HOOKS_PHASE_POST:
890
      # Used to change hooks' output to proper indentation
891
      indent_re = re.compile('^', re.M)
892
      feedback_fn("* Hooks Results")
893
      if not hooks_results:
894
        feedback_fn("  - ERROR: general communication failure")
895
        lu_result = 1
896
      else:
897
        for node_name in hooks_results:
898
          show_node_header = True
899
          res = hooks_results[node_name]
900
          if res is False or not isinstance(res, list):
901
            feedback_fn("    Communication failure")
902
            lu_result = 1
903
            continue
904
          for script, hkr, output in res:
905
            if hkr == constants.HKR_FAIL:
906
              # The node header is only shown once, if there are
907
              # failing hooks on that node
908
              if show_node_header:
909
                feedback_fn("  Node %s:" % node_name)
910
                show_node_header = False
911
              feedback_fn("    ERROR: Script %s failed, output:" % script)
912
              output = indent_re.sub('      ', output)
913
              feedback_fn("%s" % output)
914
              lu_result = 1
915

    
916
      return lu_result
917

    
918

    
919
class LUVerifyDisks(NoHooksLU):
920
  """Verifies the cluster disks status.
921

922
  """
923
  _OP_REQP = []
924
  REQ_BGL = False
925

    
926
  def ExpandNames(self):
927
    self.needed_locks = {
928
      locking.LEVEL_NODE: locking.ALL_SET,
929
      locking.LEVEL_INSTANCE: locking.ALL_SET,
930
    }
931
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
932

    
933
  def CheckPrereq(self):
934
    """Check prerequisites.
935

936
    This has no prerequisites.
937

938
    """
939
    pass
940

    
941
  def Exec(self, feedback_fn):
942
    """Verify integrity of cluster disks.
943

944
    """
945
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
946

    
947
    vg_name = self.cfg.GetVGName()
948
    nodes = utils.NiceSort(self.cfg.GetNodeList())
949
    instances = [self.cfg.GetInstanceInfo(name)
950
                 for name in self.cfg.GetInstanceList()]
951

    
952
    nv_dict = {}
953
    for inst in instances:
954
      inst_lvs = {}
955
      if (inst.status != "up" or
956
          inst.disk_template not in constants.DTS_NET_MIRROR):
957
        continue
958
      inst.MapLVsByNode(inst_lvs)
959
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
960
      for node, vol_list in inst_lvs.iteritems():
961
        for vol in vol_list:
962
          nv_dict[(node, vol)] = inst
963

    
964
    if not nv_dict:
965
      return result
966

    
967
    node_lvs = rpc.call_volume_list(nodes, vg_name)
968

    
969
    to_act = set()
970
    for node in nodes:
971
      # node_volume
972
      lvs = node_lvs[node]
973

    
974
      if isinstance(lvs, basestring):
975
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
976
        res_nlvm[node] = lvs
977
      elif not isinstance(lvs, dict):
978
        logger.Info("connection to node %s failed or invalid data returned" %
979
                    (node,))
980
        res_nodes.append(node)
981
        continue
982

    
983
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
984
        inst = nv_dict.pop((node, lv_name), None)
985
        if (not lv_online and inst is not None
986
            and inst.name not in res_instances):
987
          res_instances.append(inst.name)
988

    
989
    # any leftover items in nv_dict are missing LVs, let's arrange the
990
    # data better
991
    for key, inst in nv_dict.iteritems():
992
      if inst.name not in res_missing:
993
        res_missing[inst.name] = []
994
      res_missing[inst.name].append(key)
995

    
996
    return result
997

    
998

    
999
class LURenameCluster(LogicalUnit):
1000
  """Rename the cluster.
1001

1002
  """
1003
  HPATH = "cluster-rename"
1004
  HTYPE = constants.HTYPE_CLUSTER
1005
  _OP_REQP = ["name"]
1006

    
1007
  def BuildHooksEnv(self):
1008
    """Build hooks env.
1009

1010
    """
1011
    env = {
1012
      "OP_TARGET": self.cfg.GetClusterName(),
1013
      "NEW_NAME": self.op.name,
1014
      }
1015
    mn = self.cfg.GetMasterNode()
1016
    return env, [mn], [mn]
1017

    
1018
  def CheckPrereq(self):
1019
    """Verify that the passed name is a valid one.
1020

1021
    """
1022
    hostname = utils.HostInfo(self.op.name)
1023

    
1024
    new_name = hostname.name
1025
    self.ip = new_ip = hostname.ip
1026
    old_name = self.cfg.GetClusterName()
1027
    old_ip = self.cfg.GetMasterIP()
1028
    if new_name == old_name and new_ip == old_ip:
1029
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1030
                                 " cluster has changed")
1031
    if new_ip != old_ip:
1032
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1033
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1034
                                   " reachable on the network. Aborting." %
1035
                                   new_ip)
1036

    
1037
    self.op.name = new_name
1038

    
1039
  def Exec(self, feedback_fn):
1040
    """Rename the cluster.
1041

1042
    """
1043
    clustername = self.op.name
1044
    ip = self.ip
1045

    
1046
    # shutdown the master IP
1047
    master = self.cfg.GetMasterNode()
1048
    if not rpc.call_node_stop_master(master, False):
1049
      raise errors.OpExecError("Could not disable the master role")
1050

    
1051
    try:
1052
      # modify the sstore
1053
      # TODO: sstore
1054
      ss.SetKey(ss.SS_MASTER_IP, ip)
1055
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1056

    
1057
      # Distribute updated ss config to all nodes
1058
      myself = self.cfg.GetNodeInfo(master)
1059
      dist_nodes = self.cfg.GetNodeList()
1060
      if myself.name in dist_nodes:
1061
        dist_nodes.remove(myself.name)
1062

    
1063
      logger.Debug("Copying updated ssconf data to all nodes")
1064
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1065
        fname = ss.KeyToFilename(keyname)
1066
        result = rpc.call_upload_file(dist_nodes, fname)
1067
        for to_node in dist_nodes:
1068
          if not result[to_node]:
1069
            logger.Error("copy of file %s to node %s failed" %
1070
                         (fname, to_node))
1071
    finally:
1072
      if not rpc.call_node_start_master(master, False):
1073
        logger.Error("Could not re-enable the master role on the master,"
1074
                     " please restart manually.")
1075

    
1076

    
1077
def _RecursiveCheckIfLVMBased(disk):
1078
  """Check if the given disk or its children are lvm-based.
1079

1080
  Args:
1081
    disk: ganeti.objects.Disk object
1082

1083
  Returns:
1084
    boolean indicating whether a LD_LV dev_type was found or not
1085

1086
  """
1087
  if disk.children:
1088
    for chdisk in disk.children:
1089
      if _RecursiveCheckIfLVMBased(chdisk):
1090
        return True
1091
  return disk.dev_type == constants.LD_LV
1092

    
1093

    
1094
class LUSetClusterParams(LogicalUnit):
1095
  """Change the parameters of the cluster.
1096

1097
  """
1098
  HPATH = "cluster-modify"
1099
  HTYPE = constants.HTYPE_CLUSTER
1100
  _OP_REQP = []
1101
  REQ_BGL = False
1102

    
1103
  def ExpandNames(self):
1104
    # FIXME: in the future maybe other cluster params won't require checking on
1105
    # all nodes to be modified.
1106
    self.needed_locks = {
1107
      locking.LEVEL_NODE: locking.ALL_SET,
1108
    }
1109
    self.share_locks[locking.LEVEL_NODE] = 1
1110

    
1111
  def BuildHooksEnv(self):
1112
    """Build hooks env.
1113

1114
    """
1115
    env = {
1116
      "OP_TARGET": self.cfg.GetClusterName(),
1117
      "NEW_VG_NAME": self.op.vg_name,
1118
      }
1119
    mn = self.cfg.GetMasterNode()
1120
    return env, [mn], [mn]
1121

    
1122
  def CheckPrereq(self):
1123
    """Check prerequisites.
1124

1125
    This checks whether the given params don't conflict and
1126
    if the given volume group is valid.
1127

1128
    """
1129
    # FIXME: This only works because there is only one parameter that can be
1130
    # changed or removed.
1131
    if not self.op.vg_name:
1132
      instances = self.cfg.GetAllInstancesInfo().values()
1133
      for inst in instances:
1134
        for disk in inst.disks:
1135
          if _RecursiveCheckIfLVMBased(disk):
1136
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1137
                                       " lvm-based instances exist")
1138

    
1139
    # if vg_name not None, checks given volume group on all nodes
1140
    if self.op.vg_name:
1141
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1142
      vglist = rpc.call_vg_list(node_list)
1143
      for node in node_list:
1144
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1145
                                              constants.MIN_VG_SIZE)
1146
        if vgstatus:
1147
          raise errors.OpPrereqError("Error on node '%s': %s" %
1148
                                     (node, vgstatus))
1149

    
1150
  def Exec(self, feedback_fn):
1151
    """Change the parameters of the cluster.
1152

1153
    """
1154
    if self.op.vg_name != self.cfg.GetVGName():
1155
      self.cfg.SetVGName(self.op.vg_name)
1156
    else:
1157
      feedback_fn("Cluster LVM configuration already in desired"
1158
                  " state, not changing")
1159

    
1160

    
1161
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1162
  """Sleep and poll for an instance's disk to sync.
1163

1164
  """
1165
  if not instance.disks:
1166
    return True
1167

    
1168
  if not oneshot:
1169
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1170

    
1171
  node = instance.primary_node
1172

    
1173
  for dev in instance.disks:
1174
    cfgw.SetDiskID(dev, node)
1175

    
1176
  retries = 0
1177
  while True:
1178
    max_time = 0
1179
    done = True
1180
    cumul_degraded = False
1181
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1182
    if not rstats:
1183
      proc.LogWarning("Can't get any data from node %s" % node)
1184
      retries += 1
1185
      if retries >= 10:
1186
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1187
                                 " aborting." % node)
1188
      time.sleep(6)
1189
      continue
1190
    retries = 0
1191
    for i in range(len(rstats)):
1192
      mstat = rstats[i]
1193
      if mstat is None:
1194
        proc.LogWarning("Can't compute data for node %s/%s" %
1195
                        (node, instance.disks[i].iv_name))
1196
        continue
1197
      # we ignore the ldisk parameter
1198
      perc_done, est_time, is_degraded, _ = mstat
1199
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1200
      if perc_done is not None:
1201
        done = False
1202
        if est_time is not None:
1203
          rem_time = "%d estimated seconds remaining" % est_time
1204
          max_time = est_time
1205
        else:
1206
          rem_time = "no time estimate"
1207
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1208
                     (instance.disks[i].iv_name, perc_done, rem_time))
1209
    if done or oneshot:
1210
      break
1211

    
1212
    time.sleep(min(60, max_time))
1213

    
1214
  if done:
1215
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1216
  return not cumul_degraded
1217

    
1218

    
1219
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1220
  """Check that mirrors are not degraded.
1221

1222
  The ldisk parameter, if True, will change the test from the
1223
  is_degraded attribute (which represents overall non-ok status for
1224
  the device(s)) to the ldisk (representing the local storage status).
1225

1226
  """
1227
  cfgw.SetDiskID(dev, node)
1228
  if ldisk:
1229
    idx = 6
1230
  else:
1231
    idx = 5
1232

    
1233
  result = True
1234
  if on_primary or dev.AssembleOnSecondary():
1235
    rstats = rpc.call_blockdev_find(node, dev)
1236
    if not rstats:
1237
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1238
      result = False
1239
    else:
1240
      result = result and (not rstats[idx])
1241
  if dev.children:
1242
    for child in dev.children:
1243
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1244

    
1245
  return result
1246

    
1247

    
1248
class LUDiagnoseOS(NoHooksLU):
1249
  """Logical unit for OS diagnose/query.
1250

1251
  """
1252
  _OP_REQP = ["output_fields", "names"]
1253
  REQ_BGL = False
1254

    
1255
  def ExpandNames(self):
1256
    if self.op.names:
1257
      raise errors.OpPrereqError("Selective OS query not supported")
1258

    
1259
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1260
    _CheckOutputFields(static=[],
1261
                       dynamic=self.dynamic_fields,
1262
                       selected=self.op.output_fields)
1263

    
1264
    # Lock all nodes, in shared mode
1265
    self.needed_locks = {}
1266
    self.share_locks[locking.LEVEL_NODE] = 1
1267
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1268

    
1269
  def CheckPrereq(self):
1270
    """Check prerequisites.
1271

1272
    """
1273

    
1274
  @staticmethod
1275
  def _DiagnoseByOS(node_list, rlist):
1276
    """Remaps a per-node return list into an a per-os per-node dictionary
1277

1278
      Args:
1279
        node_list: a list with the names of all nodes
1280
        rlist: a map with node names as keys and OS objects as values
1281

1282
      Returns:
1283
        map: a map with osnames as keys and as value another map, with
1284
             nodes as
1285
             keys and list of OS objects as values
1286
             e.g. {"debian-etch": {"node1": [<object>,...],
1287
                                   "node2": [<object>,]}
1288
                  }
1289

1290
    """
1291
    all_os = {}
1292
    for node_name, nr in rlist.iteritems():
1293
      if not nr:
1294
        continue
1295
      for os_obj in nr:
1296
        if os_obj.name not in all_os:
1297
          # build a list of nodes for this os containing empty lists
1298
          # for each node in node_list
1299
          all_os[os_obj.name] = {}
1300
          for nname in node_list:
1301
            all_os[os_obj.name][nname] = []
1302
        all_os[os_obj.name][node_name].append(os_obj)
1303
    return all_os
1304

    
1305
  def Exec(self, feedback_fn):
1306
    """Compute the list of OSes.
1307

1308
    """
1309
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1310
    node_data = rpc.call_os_diagnose(node_list)
1311
    if node_data == False:
1312
      raise errors.OpExecError("Can't gather the list of OSes")
1313
    pol = self._DiagnoseByOS(node_list, node_data)
1314
    output = []
1315
    for os_name, os_data in pol.iteritems():
1316
      row = []
1317
      for field in self.op.output_fields:
1318
        if field == "name":
1319
          val = os_name
1320
        elif field == "valid":
1321
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1322
        elif field == "node_status":
1323
          val = {}
1324
          for node_name, nos_list in os_data.iteritems():
1325
            val[node_name] = [(v.status, v.path) for v in nos_list]
1326
        else:
1327
          raise errors.ParameterError(field)
1328
        row.append(val)
1329
      output.append(row)
1330

    
1331
    return output
1332

    
1333

    
1334
class LURemoveNode(LogicalUnit):
1335
  """Logical unit for removing a node.
1336

1337
  """
1338
  HPATH = "node-remove"
1339
  HTYPE = constants.HTYPE_NODE
1340
  _OP_REQP = ["node_name"]
1341

    
1342
  def BuildHooksEnv(self):
1343
    """Build hooks env.
1344

1345
    This doesn't run on the target node in the pre phase as a failed
1346
    node would then be impossible to remove.
1347

1348
    """
1349
    env = {
1350
      "OP_TARGET": self.op.node_name,
1351
      "NODE_NAME": self.op.node_name,
1352
      }
1353
    all_nodes = self.cfg.GetNodeList()
1354
    all_nodes.remove(self.op.node_name)
1355
    return env, all_nodes, all_nodes
1356

    
1357
  def CheckPrereq(self):
1358
    """Check prerequisites.
1359

1360
    This checks:
1361
     - the node exists in the configuration
1362
     - it does not have primary or secondary instances
1363
     - it's not the master
1364

1365
    Any errors are signalled by raising errors.OpPrereqError.
1366

1367
    """
1368
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1369
    if node is None:
1370
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1371

    
1372
    instance_list = self.cfg.GetInstanceList()
1373

    
1374
    masternode = self.cfg.GetMasterNode()
1375
    if node.name == masternode:
1376
      raise errors.OpPrereqError("Node is the master node,"
1377
                                 " you need to failover first.")
1378

    
1379
    for instance_name in instance_list:
1380
      instance = self.cfg.GetInstanceInfo(instance_name)
1381
      if node.name == instance.primary_node:
1382
        raise errors.OpPrereqError("Instance %s still running on the node,"
1383
                                   " please remove first." % instance_name)
1384
      if node.name in instance.secondary_nodes:
1385
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1386
                                   " please remove first." % instance_name)
1387
    self.op.node_name = node.name
1388
    self.node = node
1389

    
1390
  def Exec(self, feedback_fn):
1391
    """Removes the node from the cluster.
1392

1393
    """
1394
    node = self.node
1395
    logger.Info("stopping the node daemon and removing configs from node %s" %
1396
                node.name)
1397

    
1398
    self.context.RemoveNode(node.name)
1399

    
1400
    rpc.call_node_leave_cluster(node.name)
1401

    
1402

    
1403
class LUQueryNodes(NoHooksLU):
1404
  """Logical unit for querying nodes.
1405

1406
  """
1407
  _OP_REQP = ["output_fields", "names"]
1408
  REQ_BGL = False
1409

    
1410
  def ExpandNames(self):
1411
    self.dynamic_fields = frozenset([
1412
      "dtotal", "dfree",
1413
      "mtotal", "mnode", "mfree",
1414
      "bootid",
1415
      "ctotal",
1416
      ])
1417

    
1418
    self.static_fields = frozenset([
1419
      "name", "pinst_cnt", "sinst_cnt",
1420
      "pinst_list", "sinst_list",
1421
      "pip", "sip", "tags",
1422
      "serial_no",
1423
      ])
1424

    
1425
    _CheckOutputFields(static=self.static_fields,
1426
                       dynamic=self.dynamic_fields,
1427
                       selected=self.op.output_fields)
1428

    
1429
    self.needed_locks = {}
1430
    self.share_locks[locking.LEVEL_NODE] = 1
1431

    
1432
    if self.op.names:
1433
      self.wanted = _GetWantedNodes(self, self.op.names)
1434
    else:
1435
      self.wanted = locking.ALL_SET
1436

    
1437
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1438
    if self.do_locking:
1439
      # if we don't request only static fields, we need to lock the nodes
1440
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1441

    
1442

    
1443
  def CheckPrereq(self):
1444
    """Check prerequisites.
1445

1446
    """
1447
    # The validation of the node list is done in the _GetWantedNodes,
1448
    # if non empty, and if empty, there's no validation to do
1449
    pass
1450

    
1451
  def Exec(self, feedback_fn):
1452
    """Computes the list of nodes and their attributes.
1453

1454
    """
1455
    all_info = self.cfg.GetAllNodesInfo()
1456
    if self.do_locking:
1457
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1458
    elif self.wanted != locking.ALL_SET:
1459
      nodenames = self.wanted
1460
      missing = set(nodenames).difference(all_info.keys())
1461
      if missing:
1462
        raise self.OpExecError(
1463
          "Some nodes were removed before retrieving their data: %s" % missing)
1464
    else:
1465
      nodenames = all_info.keys()
1466
    nodelist = [all_info[name] for name in nodenames]
1467

    
1468
    # begin data gathering
1469

    
1470
    if self.dynamic_fields.intersection(self.op.output_fields):
1471
      live_data = {}
1472
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1473
      for name in nodenames:
1474
        nodeinfo = node_data.get(name, None)
1475
        if nodeinfo:
1476
          live_data[name] = {
1477
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1478
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1479
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1480
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1481
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1482
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1483
            "bootid": nodeinfo['bootid'],
1484
            }
1485
        else:
1486
          live_data[name] = {}
1487
    else:
1488
      live_data = dict.fromkeys(nodenames, {})
1489

    
1490
    node_to_primary = dict([(name, set()) for name in nodenames])
1491
    node_to_secondary = dict([(name, set()) for name in nodenames])
1492

    
1493
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1494
                             "sinst_cnt", "sinst_list"))
1495
    if inst_fields & frozenset(self.op.output_fields):
1496
      instancelist = self.cfg.GetInstanceList()
1497

    
1498
      for instance_name in instancelist:
1499
        inst = self.cfg.GetInstanceInfo(instance_name)
1500
        if inst.primary_node in node_to_primary:
1501
          node_to_primary[inst.primary_node].add(inst.name)
1502
        for secnode in inst.secondary_nodes:
1503
          if secnode in node_to_secondary:
1504
            node_to_secondary[secnode].add(inst.name)
1505

    
1506
    # end data gathering
1507

    
1508
    output = []
1509
    for node in nodelist:
1510
      node_output = []
1511
      for field in self.op.output_fields:
1512
        if field == "name":
1513
          val = node.name
1514
        elif field == "pinst_list":
1515
          val = list(node_to_primary[node.name])
1516
        elif field == "sinst_list":
1517
          val = list(node_to_secondary[node.name])
1518
        elif field == "pinst_cnt":
1519
          val = len(node_to_primary[node.name])
1520
        elif field == "sinst_cnt":
1521
          val = len(node_to_secondary[node.name])
1522
        elif field == "pip":
1523
          val = node.primary_ip
1524
        elif field == "sip":
1525
          val = node.secondary_ip
1526
        elif field == "tags":
1527
          val = list(node.GetTags())
1528
        elif field == "serial_no":
1529
          val = node.serial_no
1530
        elif field in self.dynamic_fields:
1531
          val = live_data[node.name].get(field, None)
1532
        else:
1533
          raise errors.ParameterError(field)
1534
        node_output.append(val)
1535
      output.append(node_output)
1536

    
1537
    return output
1538

    
1539

    
1540
class LUQueryNodeVolumes(NoHooksLU):
1541
  """Logical unit for getting volumes on node(s).
1542

1543
  """
1544
  _OP_REQP = ["nodes", "output_fields"]
1545
  REQ_BGL = False
1546

    
1547
  def ExpandNames(self):
1548
    _CheckOutputFields(static=["node"],
1549
                       dynamic=["phys", "vg", "name", "size", "instance"],
1550
                       selected=self.op.output_fields)
1551

    
1552
    self.needed_locks = {}
1553
    self.share_locks[locking.LEVEL_NODE] = 1
1554
    if not self.op.nodes:
1555
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1556
    else:
1557
      self.needed_locks[locking.LEVEL_NODE] = \
1558
        _GetWantedNodes(self, self.op.nodes)
1559

    
1560
  def CheckPrereq(self):
1561
    """Check prerequisites.
1562

1563
    This checks that the fields required are valid output fields.
1564

1565
    """
1566
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1567

    
1568
  def Exec(self, feedback_fn):
1569
    """Computes the list of nodes and their attributes.
1570

1571
    """
1572
    nodenames = self.nodes
1573
    volumes = rpc.call_node_volumes(nodenames)
1574

    
1575
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1576
             in self.cfg.GetInstanceList()]
1577

    
1578
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1579

    
1580
    output = []
1581
    for node in nodenames:
1582
      if node not in volumes or not volumes[node]:
1583
        continue
1584

    
1585
      node_vols = volumes[node][:]
1586
      node_vols.sort(key=lambda vol: vol['dev'])
1587

    
1588
      for vol in node_vols:
1589
        node_output = []
1590
        for field in self.op.output_fields:
1591
          if field == "node":
1592
            val = node
1593
          elif field == "phys":
1594
            val = vol['dev']
1595
          elif field == "vg":
1596
            val = vol['vg']
1597
          elif field == "name":
1598
            val = vol['name']
1599
          elif field == "size":
1600
            val = int(float(vol['size']))
1601
          elif field == "instance":
1602
            for inst in ilist:
1603
              if node not in lv_by_node[inst]:
1604
                continue
1605
              if vol['name'] in lv_by_node[inst][node]:
1606
                val = inst.name
1607
                break
1608
            else:
1609
              val = '-'
1610
          else:
1611
            raise errors.ParameterError(field)
1612
          node_output.append(str(val))
1613

    
1614
        output.append(node_output)
1615

    
1616
    return output
1617

    
1618

    
1619
class LUAddNode(LogicalUnit):
1620
  """Logical unit for adding node to the cluster.
1621

1622
  """
1623
  HPATH = "node-add"
1624
  HTYPE = constants.HTYPE_NODE
1625
  _OP_REQP = ["node_name"]
1626

    
1627
  def BuildHooksEnv(self):
1628
    """Build hooks env.
1629

1630
    This will run on all nodes before, and on all nodes + the new node after.
1631

1632
    """
1633
    env = {
1634
      "OP_TARGET": self.op.node_name,
1635
      "NODE_NAME": self.op.node_name,
1636
      "NODE_PIP": self.op.primary_ip,
1637
      "NODE_SIP": self.op.secondary_ip,
1638
      }
1639
    nodes_0 = self.cfg.GetNodeList()
1640
    nodes_1 = nodes_0 + [self.op.node_name, ]
1641
    return env, nodes_0, nodes_1
1642

    
1643
  def CheckPrereq(self):
1644
    """Check prerequisites.
1645

1646
    This checks:
1647
     - the new node is not already in the config
1648
     - it is resolvable
1649
     - its parameters (single/dual homed) matches the cluster
1650

1651
    Any errors are signalled by raising errors.OpPrereqError.
1652

1653
    """
1654
    node_name = self.op.node_name
1655
    cfg = self.cfg
1656

    
1657
    dns_data = utils.HostInfo(node_name)
1658

    
1659
    node = dns_data.name
1660
    primary_ip = self.op.primary_ip = dns_data.ip
1661
    secondary_ip = getattr(self.op, "secondary_ip", None)
1662
    if secondary_ip is None:
1663
      secondary_ip = primary_ip
1664
    if not utils.IsValidIP(secondary_ip):
1665
      raise errors.OpPrereqError("Invalid secondary IP given")
1666
    self.op.secondary_ip = secondary_ip
1667

    
1668
    node_list = cfg.GetNodeList()
1669
    if not self.op.readd and node in node_list:
1670
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1671
                                 node)
1672
    elif self.op.readd and node not in node_list:
1673
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1674

    
1675
    for existing_node_name in node_list:
1676
      existing_node = cfg.GetNodeInfo(existing_node_name)
1677

    
1678
      if self.op.readd and node == existing_node_name:
1679
        if (existing_node.primary_ip != primary_ip or
1680
            existing_node.secondary_ip != secondary_ip):
1681
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1682
                                     " address configuration as before")
1683
        continue
1684

    
1685
      if (existing_node.primary_ip == primary_ip or
1686
          existing_node.secondary_ip == primary_ip or
1687
          existing_node.primary_ip == secondary_ip or
1688
          existing_node.secondary_ip == secondary_ip):
1689
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1690
                                   " existing node %s" % existing_node.name)
1691

    
1692
    # check that the type of the node (single versus dual homed) is the
1693
    # same as for the master
1694
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1695
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1696
    newbie_singlehomed = secondary_ip == primary_ip
1697
    if master_singlehomed != newbie_singlehomed:
1698
      if master_singlehomed:
1699
        raise errors.OpPrereqError("The master has no private ip but the"
1700
                                   " new node has one")
1701
      else:
1702
        raise errors.OpPrereqError("The master has a private ip but the"
1703
                                   " new node doesn't have one")
1704

    
1705
    # checks reachablity
1706
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1707
      raise errors.OpPrereqError("Node not reachable by ping")
1708

    
1709
    if not newbie_singlehomed:
1710
      # check reachability from my secondary ip to newbie's secondary ip
1711
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1712
                           source=myself.secondary_ip):
1713
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1714
                                   " based ping to noded port")
1715

    
1716
    self.new_node = objects.Node(name=node,
1717
                                 primary_ip=primary_ip,
1718
                                 secondary_ip=secondary_ip)
1719

    
1720
  def Exec(self, feedback_fn):
1721
    """Adds the new node to the cluster.
1722

1723
    """
1724
    new_node = self.new_node
1725
    node = new_node.name
1726

    
1727
    # check connectivity
1728
    result = rpc.call_version([node])[node]
1729
    if result:
1730
      if constants.PROTOCOL_VERSION == result:
1731
        logger.Info("communication to node %s fine, sw version %s match" %
1732
                    (node, result))
1733
      else:
1734
        raise errors.OpExecError("Version mismatch master version %s,"
1735
                                 " node version %s" %
1736
                                 (constants.PROTOCOL_VERSION, result))
1737
    else:
1738
      raise errors.OpExecError("Cannot get version from the new node")
1739

    
1740
    # setup ssh on node
1741
    logger.Info("copy ssh key to node %s" % node)
1742
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1743
    keyarray = []
1744
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1745
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1746
                priv_key, pub_key]
1747

    
1748
    for i in keyfiles:
1749
      f = open(i, 'r')
1750
      try:
1751
        keyarray.append(f.read())
1752
      finally:
1753
        f.close()
1754

    
1755
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1756
                               keyarray[3], keyarray[4], keyarray[5])
1757

    
1758
    if not result:
1759
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1760

    
1761
    # Add node to our /etc/hosts, and add key to known_hosts
1762
    utils.AddHostToEtcHosts(new_node.name)
1763

    
1764
    if new_node.secondary_ip != new_node.primary_ip:
1765
      if not rpc.call_node_tcp_ping(new_node.name,
1766
                                    constants.LOCALHOST_IP_ADDRESS,
1767
                                    new_node.secondary_ip,
1768
                                    constants.DEFAULT_NODED_PORT,
1769
                                    10, False):
1770
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1771
                                 " you gave (%s). Please fix and re-run this"
1772
                                 " command." % new_node.secondary_ip)
1773

    
1774
    node_verify_list = [self.cfg.GetMasterNode()]
1775
    node_verify_param = {
1776
      'nodelist': [node],
1777
      # TODO: do a node-net-test as well?
1778
    }
1779

    
1780
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1781
    for verifier in node_verify_list:
1782
      if not result[verifier]:
1783
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1784
                                 " for remote verification" % verifier)
1785
      if result[verifier]['nodelist']:
1786
        for failed in result[verifier]['nodelist']:
1787
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1788
                      (verifier, result[verifier]['nodelist'][failed]))
1789
        raise errors.OpExecError("ssh/hostname verification failed.")
1790

    
1791
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1792
    # including the node just added
1793
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1794
    dist_nodes = self.cfg.GetNodeList()
1795
    if not self.op.readd:
1796
      dist_nodes.append(node)
1797
    if myself.name in dist_nodes:
1798
      dist_nodes.remove(myself.name)
1799

    
1800
    logger.Debug("Copying hosts and known_hosts to all nodes")
1801
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1802
      result = rpc.call_upload_file(dist_nodes, fname)
1803
      for to_node in dist_nodes:
1804
        if not result[to_node]:
1805
          logger.Error("copy of file %s to node %s failed" %
1806
                       (fname, to_node))
1807

    
1808
    to_copy = []
1809
    if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
1810
      to_copy.append(constants.VNC_PASSWORD_FILE)
1811
    for fname in to_copy:
1812
      result = rpc.call_upload_file([node], fname)
1813
      if not result[node]:
1814
        logger.Error("could not copy file %s to node %s" % (fname, node))
1815

    
1816
    if self.op.readd:
1817
      self.context.ReaddNode(new_node)
1818
    else:
1819
      self.context.AddNode(new_node)
1820

    
1821

    
1822
class LUQueryClusterInfo(NoHooksLU):
1823
  """Query cluster configuration.
1824

1825
  """
1826
  _OP_REQP = []
1827
  REQ_MASTER = False
1828
  REQ_BGL = False
1829

    
1830
  def ExpandNames(self):
1831
    self.needed_locks = {}
1832

    
1833
  def CheckPrereq(self):
1834
    """No prerequsites needed for this LU.
1835

1836
    """
1837
    pass
1838

    
1839
  def Exec(self, feedback_fn):
1840
    """Return cluster config.
1841

1842
    """
1843
    result = {
1844
      "name": self.cfg.GetClusterName(),
1845
      "software_version": constants.RELEASE_VERSION,
1846
      "protocol_version": constants.PROTOCOL_VERSION,
1847
      "config_version": constants.CONFIG_VERSION,
1848
      "os_api_version": constants.OS_API_VERSION,
1849
      "export_version": constants.EXPORT_VERSION,
1850
      "master": self.cfg.GetMasterNode(),
1851
      "architecture": (platform.architecture()[0], platform.machine()),
1852
      "hypervisor_type": self.cfg.GetHypervisorType(),
1853
      }
1854

    
1855
    return result
1856

    
1857

    
1858
class LUQueryConfigValues(NoHooksLU):
1859
  """Return configuration values.
1860

1861
  """
1862
  _OP_REQP = []
1863
  REQ_BGL = False
1864

    
1865
  def ExpandNames(self):
1866
    self.needed_locks = {}
1867

    
1868
    static_fields = ["cluster_name", "master_node"]
1869
    _CheckOutputFields(static=static_fields,
1870
                       dynamic=[],
1871
                       selected=self.op.output_fields)
1872

    
1873
  def CheckPrereq(self):
1874
    """No prerequisites.
1875

1876
    """
1877
    pass
1878

    
1879
  def Exec(self, feedback_fn):
1880
    """Dump a representation of the cluster config to the standard output.
1881

1882
    """
1883
    values = []
1884
    for field in self.op.output_fields:
1885
      if field == "cluster_name":
1886
        values.append(self.cfg.GetClusterName())
1887
      elif field == "master_node":
1888
        values.append(self.cfg.GetMasterNode())
1889
      else:
1890
        raise errors.ParameterError(field)
1891
    return values
1892

    
1893

    
1894
class LUActivateInstanceDisks(NoHooksLU):
1895
  """Bring up an instance's disks.
1896

1897
  """
1898
  _OP_REQP = ["instance_name"]
1899
  REQ_BGL = False
1900

    
1901
  def ExpandNames(self):
1902
    self._ExpandAndLockInstance()
1903
    self.needed_locks[locking.LEVEL_NODE] = []
1904
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1905

    
1906
  def DeclareLocks(self, level):
1907
    if level == locking.LEVEL_NODE:
1908
      self._LockInstancesNodes()
1909

    
1910
  def CheckPrereq(self):
1911
    """Check prerequisites.
1912

1913
    This checks that the instance is in the cluster.
1914

1915
    """
1916
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1917
    assert self.instance is not None, \
1918
      "Cannot retrieve locked instance %s" % self.op.instance_name
1919

    
1920
  def Exec(self, feedback_fn):
1921
    """Activate the disks.
1922

1923
    """
1924
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1925
    if not disks_ok:
1926
      raise errors.OpExecError("Cannot activate block devices")
1927

    
1928
    return disks_info
1929

    
1930

    
1931
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1932
  """Prepare the block devices for an instance.
1933

1934
  This sets up the block devices on all nodes.
1935

1936
  Args:
1937
    instance: a ganeti.objects.Instance object
1938
    ignore_secondaries: if true, errors on secondary nodes won't result
1939
                        in an error return from the function
1940

1941
  Returns:
1942
    false if the operation failed
1943
    list of (host, instance_visible_name, node_visible_name) if the operation
1944
         suceeded with the mapping from node devices to instance devices
1945
  """
1946
  device_info = []
1947
  disks_ok = True
1948
  iname = instance.name
1949
  # With the two passes mechanism we try to reduce the window of
1950
  # opportunity for the race condition of switching DRBD to primary
1951
  # before handshaking occured, but we do not eliminate it
1952

    
1953
  # The proper fix would be to wait (with some limits) until the
1954
  # connection has been made and drbd transitions from WFConnection
1955
  # into any other network-connected state (Connected, SyncTarget,
1956
  # SyncSource, etc.)
1957

    
1958
  # 1st pass, assemble on all nodes in secondary mode
1959
  for inst_disk in instance.disks:
1960
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1961
      cfg.SetDiskID(node_disk, node)
1962
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1963
      if not result:
1964
        logger.Error("could not prepare block device %s on node %s"
1965
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1966
        if not ignore_secondaries:
1967
          disks_ok = False
1968

    
1969
  # FIXME: race condition on drbd migration to primary
1970

    
1971
  # 2nd pass, do only the primary node
1972
  for inst_disk in instance.disks:
1973
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1974
      if node != instance.primary_node:
1975
        continue
1976
      cfg.SetDiskID(node_disk, node)
1977
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1978
      if not result:
1979
        logger.Error("could not prepare block device %s on node %s"
1980
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1981
        disks_ok = False
1982
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1983

    
1984
  # leave the disks configured for the primary node
1985
  # this is a workaround that would be fixed better by
1986
  # improving the logical/physical id handling
1987
  for disk in instance.disks:
1988
    cfg.SetDiskID(disk, instance.primary_node)
1989

    
1990
  return disks_ok, device_info
1991

    
1992

    
1993
def _StartInstanceDisks(cfg, instance, force):
1994
  """Start the disks of an instance.
1995

1996
  """
1997
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1998
                                           ignore_secondaries=force)
1999
  if not disks_ok:
2000
    _ShutdownInstanceDisks(instance, cfg)
2001
    if force is not None and not force:
2002
      logger.Error("If the message above refers to a secondary node,"
2003
                   " you can retry the operation using '--force'.")
2004
    raise errors.OpExecError("Disk consistency error")
2005

    
2006

    
2007
class LUDeactivateInstanceDisks(NoHooksLU):
2008
  """Shutdown an instance's disks.
2009

2010
  """
2011
  _OP_REQP = ["instance_name"]
2012
  REQ_BGL = False
2013

    
2014
  def ExpandNames(self):
2015
    self._ExpandAndLockInstance()
2016
    self.needed_locks[locking.LEVEL_NODE] = []
2017
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2018

    
2019
  def DeclareLocks(self, level):
2020
    if level == locking.LEVEL_NODE:
2021
      self._LockInstancesNodes()
2022

    
2023
  def CheckPrereq(self):
2024
    """Check prerequisites.
2025

2026
    This checks that the instance is in the cluster.
2027

2028
    """
2029
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2030
    assert self.instance is not None, \
2031
      "Cannot retrieve locked instance %s" % self.op.instance_name
2032

    
2033
  def Exec(self, feedback_fn):
2034
    """Deactivate the disks
2035

2036
    """
2037
    instance = self.instance
2038
    _SafeShutdownInstanceDisks(instance, self.cfg)
2039

    
2040

    
2041
def _SafeShutdownInstanceDisks(instance, cfg):
2042
  """Shutdown block devices of an instance.
2043

2044
  This function checks if an instance is running, before calling
2045
  _ShutdownInstanceDisks.
2046

2047
  """
2048
  ins_l = rpc.call_instance_list([instance.primary_node])
2049
  ins_l = ins_l[instance.primary_node]
2050
  if not type(ins_l) is list:
2051
    raise errors.OpExecError("Can't contact node '%s'" %
2052
                             instance.primary_node)
2053

    
2054
  if instance.name in ins_l:
2055
    raise errors.OpExecError("Instance is running, can't shutdown"
2056
                             " block devices.")
2057

    
2058
  _ShutdownInstanceDisks(instance, cfg)
2059

    
2060

    
2061
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2062
  """Shutdown block devices of an instance.
2063

2064
  This does the shutdown on all nodes of the instance.
2065

2066
  If the ignore_primary is false, errors on the primary node are
2067
  ignored.
2068

2069
  """
2070
  result = True
2071
  for disk in instance.disks:
2072
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2073
      cfg.SetDiskID(top_disk, node)
2074
      if not rpc.call_blockdev_shutdown(node, top_disk):
2075
        logger.Error("could not shutdown block device %s on node %s" %
2076
                     (disk.iv_name, node))
2077
        if not ignore_primary or node != instance.primary_node:
2078
          result = False
2079
  return result
2080

    
2081

    
2082
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2083
  """Checks if a node has enough free memory.
2084

2085
  This function check if a given node has the needed amount of free
2086
  memory. In case the node has less memory or we cannot get the
2087
  information from the node, this function raise an OpPrereqError
2088
  exception.
2089

2090
  Args:
2091
    - cfg: a ConfigWriter instance
2092
    - node: the node name
2093
    - reason: string to use in the error message
2094
    - requested: the amount of memory in MiB
2095

2096
  """
2097
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2098
  if not nodeinfo or not isinstance(nodeinfo, dict):
2099
    raise errors.OpPrereqError("Could not contact node %s for resource"
2100
                             " information" % (node,))
2101

    
2102
  free_mem = nodeinfo[node].get('memory_free')
2103
  if not isinstance(free_mem, int):
2104
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2105
                             " was '%s'" % (node, free_mem))
2106
  if requested > free_mem:
2107
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2108
                             " needed %s MiB, available %s MiB" %
2109
                             (node, reason, requested, free_mem))
2110

    
2111

    
2112
class LUStartupInstance(LogicalUnit):
2113
  """Starts an instance.
2114

2115
  """
2116
  HPATH = "instance-start"
2117
  HTYPE = constants.HTYPE_INSTANCE
2118
  _OP_REQP = ["instance_name", "force"]
2119
  REQ_BGL = False
2120

    
2121
  def ExpandNames(self):
2122
    self._ExpandAndLockInstance()
2123
    self.needed_locks[locking.LEVEL_NODE] = []
2124
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2125

    
2126
  def DeclareLocks(self, level):
2127
    if level == locking.LEVEL_NODE:
2128
      self._LockInstancesNodes()
2129

    
2130
  def BuildHooksEnv(self):
2131
    """Build hooks env.
2132

2133
    This runs on master, primary and secondary nodes of the instance.
2134

2135
    """
2136
    env = {
2137
      "FORCE": self.op.force,
2138
      }
2139
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2140
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2141
          list(self.instance.secondary_nodes))
2142
    return env, nl, nl
2143

    
2144
  def CheckPrereq(self):
2145
    """Check prerequisites.
2146

2147
    This checks that the instance is in the cluster.
2148

2149
    """
2150
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2151
    assert self.instance is not None, \
2152
      "Cannot retrieve locked instance %s" % self.op.instance_name
2153

    
2154
    # check bridges existance
2155
    _CheckInstanceBridgesExist(instance)
2156

    
2157
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2158
                         "starting instance %s" % instance.name,
2159
                         instance.memory)
2160

    
2161
  def Exec(self, feedback_fn):
2162
    """Start the instance.
2163

2164
    """
2165
    instance = self.instance
2166
    force = self.op.force
2167
    extra_args = getattr(self.op, "extra_args", "")
2168

    
2169
    self.cfg.MarkInstanceUp(instance.name)
2170

    
2171
    node_current = instance.primary_node
2172

    
2173
    _StartInstanceDisks(self.cfg, instance, force)
2174

    
2175
    if not rpc.call_instance_start(node_current, instance, extra_args):
2176
      _ShutdownInstanceDisks(instance, self.cfg)
2177
      raise errors.OpExecError("Could not start instance")
2178

    
2179

    
2180
class LURebootInstance(LogicalUnit):
2181
  """Reboot an instance.
2182

2183
  """
2184
  HPATH = "instance-reboot"
2185
  HTYPE = constants.HTYPE_INSTANCE
2186
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2187
  REQ_BGL = False
2188

    
2189
  def ExpandNames(self):
2190
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2191
                                   constants.INSTANCE_REBOOT_HARD,
2192
                                   constants.INSTANCE_REBOOT_FULL]:
2193
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2194
                                  (constants.INSTANCE_REBOOT_SOFT,
2195
                                   constants.INSTANCE_REBOOT_HARD,
2196
                                   constants.INSTANCE_REBOOT_FULL))
2197
    self._ExpandAndLockInstance()
2198
    self.needed_locks[locking.LEVEL_NODE] = []
2199
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2200

    
2201
  def DeclareLocks(self, level):
2202
    if level == locking.LEVEL_NODE:
2203
      primary_only = not constants.INSTANCE_REBOOT_FULL
2204
      self._LockInstancesNodes(primary_only=primary_only)
2205

    
2206
  def BuildHooksEnv(self):
2207
    """Build hooks env.
2208

2209
    This runs on master, primary and secondary nodes of the instance.
2210

2211
    """
2212
    env = {
2213
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2214
      }
2215
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2216
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2217
          list(self.instance.secondary_nodes))
2218
    return env, nl, nl
2219

    
2220
  def CheckPrereq(self):
2221
    """Check prerequisites.
2222

2223
    This checks that the instance is in the cluster.
2224

2225
    """
2226
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2227
    assert self.instance is not None, \
2228
      "Cannot retrieve locked instance %s" % self.op.instance_name
2229

    
2230
    # check bridges existance
2231
    _CheckInstanceBridgesExist(instance)
2232

    
2233
  def Exec(self, feedback_fn):
2234
    """Reboot the instance.
2235

2236
    """
2237
    instance = self.instance
2238
    ignore_secondaries = self.op.ignore_secondaries
2239
    reboot_type = self.op.reboot_type
2240
    extra_args = getattr(self.op, "extra_args", "")
2241

    
2242
    node_current = instance.primary_node
2243

    
2244
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2245
                       constants.INSTANCE_REBOOT_HARD]:
2246
      if not rpc.call_instance_reboot(node_current, instance,
2247
                                      reboot_type, extra_args):
2248
        raise errors.OpExecError("Could not reboot instance")
2249
    else:
2250
      if not rpc.call_instance_shutdown(node_current, instance):
2251
        raise errors.OpExecError("could not shutdown instance for full reboot")
2252
      _ShutdownInstanceDisks(instance, self.cfg)
2253
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2254
      if not rpc.call_instance_start(node_current, instance, extra_args):
2255
        _ShutdownInstanceDisks(instance, self.cfg)
2256
        raise errors.OpExecError("Could not start instance for full reboot")
2257

    
2258
    self.cfg.MarkInstanceUp(instance.name)
2259

    
2260

    
2261
class LUShutdownInstance(LogicalUnit):
2262
  """Shutdown an instance.
2263

2264
  """
2265
  HPATH = "instance-stop"
2266
  HTYPE = constants.HTYPE_INSTANCE
2267
  _OP_REQP = ["instance_name"]
2268
  REQ_BGL = False
2269

    
2270
  def ExpandNames(self):
2271
    self._ExpandAndLockInstance()
2272
    self.needed_locks[locking.LEVEL_NODE] = []
2273
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2274

    
2275
  def DeclareLocks(self, level):
2276
    if level == locking.LEVEL_NODE:
2277
      self._LockInstancesNodes()
2278

    
2279
  def BuildHooksEnv(self):
2280
    """Build hooks env.
2281

2282
    This runs on master, primary and secondary nodes of the instance.
2283

2284
    """
2285
    env = _BuildInstanceHookEnvByObject(self.instance)
2286
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2287
          list(self.instance.secondary_nodes))
2288
    return env, nl, nl
2289

    
2290
  def CheckPrereq(self):
2291
    """Check prerequisites.
2292

2293
    This checks that the instance is in the cluster.
2294

2295
    """
2296
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2297
    assert self.instance is not None, \
2298
      "Cannot retrieve locked instance %s" % self.op.instance_name
2299

    
2300
  def Exec(self, feedback_fn):
2301
    """Shutdown the instance.
2302

2303
    """
2304
    instance = self.instance
2305
    node_current = instance.primary_node
2306
    self.cfg.MarkInstanceDown(instance.name)
2307
    if not rpc.call_instance_shutdown(node_current, instance):
2308
      logger.Error("could not shutdown instance")
2309

    
2310
    _ShutdownInstanceDisks(instance, self.cfg)
2311

    
2312

    
2313
class LUReinstallInstance(LogicalUnit):
2314
  """Reinstall an instance.
2315

2316
  """
2317
  HPATH = "instance-reinstall"
2318
  HTYPE = constants.HTYPE_INSTANCE
2319
  _OP_REQP = ["instance_name"]
2320
  REQ_BGL = False
2321

    
2322
  def ExpandNames(self):
2323
    self._ExpandAndLockInstance()
2324
    self.needed_locks[locking.LEVEL_NODE] = []
2325
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2326

    
2327
  def DeclareLocks(self, level):
2328
    if level == locking.LEVEL_NODE:
2329
      self._LockInstancesNodes()
2330

    
2331
  def BuildHooksEnv(self):
2332
    """Build hooks env.
2333

2334
    This runs on master, primary and secondary nodes of the instance.
2335

2336
    """
2337
    env = _BuildInstanceHookEnvByObject(self.instance)
2338
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2339
          list(self.instance.secondary_nodes))
2340
    return env, nl, nl
2341

    
2342
  def CheckPrereq(self):
2343
    """Check prerequisites.
2344

2345
    This checks that the instance is in the cluster and is not running.
2346

2347
    """
2348
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2349
    assert instance is not None, \
2350
      "Cannot retrieve locked instance %s" % self.op.instance_name
2351

    
2352
    if instance.disk_template == constants.DT_DISKLESS:
2353
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2354
                                 self.op.instance_name)
2355
    if instance.status != "down":
2356
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2357
                                 self.op.instance_name)
2358
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2359
    if remote_info:
2360
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2361
                                 (self.op.instance_name,
2362
                                  instance.primary_node))
2363

    
2364
    self.op.os_type = getattr(self.op, "os_type", None)
2365
    if self.op.os_type is not None:
2366
      # OS verification
2367
      pnode = self.cfg.GetNodeInfo(
2368
        self.cfg.ExpandNodeName(instance.primary_node))
2369
      if pnode is None:
2370
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2371
                                   self.op.pnode)
2372
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2373
      if not os_obj:
2374
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2375
                                   " primary node"  % self.op.os_type)
2376

    
2377
    self.instance = instance
2378

    
2379
  def Exec(self, feedback_fn):
2380
    """Reinstall the instance.
2381

2382
    """
2383
    inst = self.instance
2384

    
2385
    if self.op.os_type is not None:
2386
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2387
      inst.os = self.op.os_type
2388
      self.cfg.Update(inst)
2389

    
2390
    _StartInstanceDisks(self.cfg, inst, None)
2391
    try:
2392
      feedback_fn("Running the instance OS create scripts...")
2393
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2394
        raise errors.OpExecError("Could not install OS for instance %s"
2395
                                 " on node %s" %
2396
                                 (inst.name, inst.primary_node))
2397
    finally:
2398
      _ShutdownInstanceDisks(inst, self.cfg)
2399

    
2400

    
2401
class LURenameInstance(LogicalUnit):
2402
  """Rename an instance.
2403

2404
  """
2405
  HPATH = "instance-rename"
2406
  HTYPE = constants.HTYPE_INSTANCE
2407
  _OP_REQP = ["instance_name", "new_name"]
2408

    
2409
  def BuildHooksEnv(self):
2410
    """Build hooks env.
2411

2412
    This runs on master, primary and secondary nodes of the instance.
2413

2414
    """
2415
    env = _BuildInstanceHookEnvByObject(self.instance)
2416
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2417
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2418
          list(self.instance.secondary_nodes))
2419
    return env, nl, nl
2420

    
2421
  def CheckPrereq(self):
2422
    """Check prerequisites.
2423

2424
    This checks that the instance is in the cluster and is not running.
2425

2426
    """
2427
    instance = self.cfg.GetInstanceInfo(
2428
      self.cfg.ExpandInstanceName(self.op.instance_name))
2429
    if instance is None:
2430
      raise errors.OpPrereqError("Instance '%s' not known" %
2431
                                 self.op.instance_name)
2432
    if instance.status != "down":
2433
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2434
                                 self.op.instance_name)
2435
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2436
    if remote_info:
2437
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2438
                                 (self.op.instance_name,
2439
                                  instance.primary_node))
2440
    self.instance = instance
2441

    
2442
    # new name verification
2443
    name_info = utils.HostInfo(self.op.new_name)
2444

    
2445
    self.op.new_name = new_name = name_info.name
2446
    instance_list = self.cfg.GetInstanceList()
2447
    if new_name in instance_list:
2448
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2449
                                 new_name)
2450

    
2451
    if not getattr(self.op, "ignore_ip", False):
2452
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2453
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2454
                                   (name_info.ip, new_name))
2455

    
2456

    
2457
  def Exec(self, feedback_fn):
2458
    """Reinstall the instance.
2459

2460
    """
2461
    inst = self.instance
2462
    old_name = inst.name
2463

    
2464
    if inst.disk_template == constants.DT_FILE:
2465
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2466

    
2467
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2468
    # Change the instance lock. This is definitely safe while we hold the BGL
2469
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2470
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2471

    
2472
    # re-read the instance from the configuration after rename
2473
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2474

    
2475
    if inst.disk_template == constants.DT_FILE:
2476
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2477
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2478
                                                old_file_storage_dir,
2479
                                                new_file_storage_dir)
2480

    
2481
      if not result:
2482
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2483
                                 " directory '%s' to '%s' (but the instance"
2484
                                 " has been renamed in Ganeti)" % (
2485
                                 inst.primary_node, old_file_storage_dir,
2486
                                 new_file_storage_dir))
2487

    
2488
      if not result[0]:
2489
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2490
                                 " (but the instance has been renamed in"
2491
                                 " Ganeti)" % (old_file_storage_dir,
2492
                                               new_file_storage_dir))
2493

    
2494
    _StartInstanceDisks(self.cfg, inst, None)
2495
    try:
2496
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2497
                                          "sda", "sdb"):
2498
        msg = ("Could not run OS rename script for instance %s on node %s"
2499
               " (but the instance has been renamed in Ganeti)" %
2500
               (inst.name, inst.primary_node))
2501
        logger.Error(msg)
2502
    finally:
2503
      _ShutdownInstanceDisks(inst, self.cfg)
2504

    
2505

    
2506
class LURemoveInstance(LogicalUnit):
2507
  """Remove an instance.
2508

2509
  """
2510
  HPATH = "instance-remove"
2511
  HTYPE = constants.HTYPE_INSTANCE
2512
  _OP_REQP = ["instance_name", "ignore_failures"]
2513
  REQ_BGL = False
2514

    
2515
  def ExpandNames(self):
2516
    self._ExpandAndLockInstance()
2517
    self.needed_locks[locking.LEVEL_NODE] = []
2518
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2519

    
2520
  def DeclareLocks(self, level):
2521
    if level == locking.LEVEL_NODE:
2522
      self._LockInstancesNodes()
2523

    
2524
  def BuildHooksEnv(self):
2525
    """Build hooks env.
2526

2527
    This runs on master, primary and secondary nodes of the instance.
2528

2529
    """
2530
    env = _BuildInstanceHookEnvByObject(self.instance)
2531
    nl = [self.cfg.GetMasterNode()]
2532
    return env, nl, nl
2533

    
2534
  def CheckPrereq(self):
2535
    """Check prerequisites.
2536

2537
    This checks that the instance is in the cluster.
2538

2539
    """
2540
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2541
    assert self.instance is not None, \
2542
      "Cannot retrieve locked instance %s" % self.op.instance_name
2543

    
2544
  def Exec(self, feedback_fn):
2545
    """Remove the instance.
2546

2547
    """
2548
    instance = self.instance
2549
    logger.Info("shutting down instance %s on node %s" %
2550
                (instance.name, instance.primary_node))
2551

    
2552
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2553
      if self.op.ignore_failures:
2554
        feedback_fn("Warning: can't shutdown instance")
2555
      else:
2556
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2557
                                 (instance.name, instance.primary_node))
2558

    
2559
    logger.Info("removing block devices for instance %s" % instance.name)
2560

    
2561
    if not _RemoveDisks(instance, self.cfg):
2562
      if self.op.ignore_failures:
2563
        feedback_fn("Warning: can't remove instance's disks")
2564
      else:
2565
        raise errors.OpExecError("Can't remove instance's disks")
2566

    
2567
    logger.Info("removing instance %s out of cluster config" % instance.name)
2568

    
2569
    self.cfg.RemoveInstance(instance.name)
2570
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2571

    
2572

    
2573
class LUQueryInstances(NoHooksLU):
2574
  """Logical unit for querying instances.
2575

2576
  """
2577
  _OP_REQP = ["output_fields", "names"]
2578
  REQ_BGL = False
2579

    
2580
  def ExpandNames(self):
2581
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2582
    self.static_fields = frozenset([
2583
      "name", "os", "pnode", "snodes",
2584
      "admin_state", "admin_ram",
2585
      "disk_template", "ip", "mac", "bridge",
2586
      "sda_size", "sdb_size", "vcpus", "tags",
2587
      "network_port", "kernel_path", "initrd_path",
2588
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2589
      "hvm_cdrom_image_path", "hvm_nic_type",
2590
      "hvm_disk_type", "vnc_bind_address",
2591
      "serial_no",
2592
      ])
2593
    _CheckOutputFields(static=self.static_fields,
2594
                       dynamic=self.dynamic_fields,
2595
                       selected=self.op.output_fields)
2596

    
2597
    self.needed_locks = {}
2598
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2599
    self.share_locks[locking.LEVEL_NODE] = 1
2600

    
2601
    if self.op.names:
2602
      self.wanted = _GetWantedInstances(self, self.op.names)
2603
    else:
2604
      self.wanted = locking.ALL_SET
2605

    
2606
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2607
    if self.do_locking:
2608
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2609
      self.needed_locks[locking.LEVEL_NODE] = []
2610
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2611

    
2612
  def DeclareLocks(self, level):
2613
    if level == locking.LEVEL_NODE and self.do_locking:
2614
      self._LockInstancesNodes()
2615

    
2616
  def CheckPrereq(self):
2617
    """Check prerequisites.
2618

2619
    """
2620
    pass
2621

    
2622
  def Exec(self, feedback_fn):
2623
    """Computes the list of nodes and their attributes.
2624

2625
    """
2626
    all_info = self.cfg.GetAllInstancesInfo()
2627
    if self.do_locking:
2628
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2629
    elif self.wanted != locking.ALL_SET:
2630
      instance_names = self.wanted
2631
      missing = set(instance_names).difference(all_info.keys())
2632
      if missing:
2633
        raise self.OpExecError(
2634
          "Some instances were removed before retrieving their data: %s"
2635
          % missing)
2636
    else:
2637
      instance_names = all_info.keys()
2638
    instance_list = [all_info[iname] for iname in instance_names]
2639

    
2640
    # begin data gathering
2641

    
2642
    nodes = frozenset([inst.primary_node for inst in instance_list])
2643

    
2644
    bad_nodes = []
2645
    if self.dynamic_fields.intersection(self.op.output_fields):
2646
      live_data = {}
2647
      node_data = rpc.call_all_instances_info(nodes)
2648
      for name in nodes:
2649
        result = node_data[name]
2650
        if result:
2651
          live_data.update(result)
2652
        elif result == False:
2653
          bad_nodes.append(name)
2654
        # else no instance is alive
2655
    else:
2656
      live_data = dict([(name, {}) for name in instance_names])
2657

    
2658
    # end data gathering
2659

    
2660
    output = []
2661
    for instance in instance_list:
2662
      iout = []
2663
      for field in self.op.output_fields:
2664
        if field == "name":
2665
          val = instance.name
2666
        elif field == "os":
2667
          val = instance.os
2668
        elif field == "pnode":
2669
          val = instance.primary_node
2670
        elif field == "snodes":
2671
          val = list(instance.secondary_nodes)
2672
        elif field == "admin_state":
2673
          val = (instance.status != "down")
2674
        elif field == "oper_state":
2675
          if instance.primary_node in bad_nodes:
2676
            val = None
2677
          else:
2678
            val = bool(live_data.get(instance.name))
2679
        elif field == "status":
2680
          if instance.primary_node in bad_nodes:
2681
            val = "ERROR_nodedown"
2682
          else:
2683
            running = bool(live_data.get(instance.name))
2684
            if running:
2685
              if instance.status != "down":
2686
                val = "running"
2687
              else:
2688
                val = "ERROR_up"
2689
            else:
2690
              if instance.status != "down":
2691
                val = "ERROR_down"
2692
              else:
2693
                val = "ADMIN_down"
2694
        elif field == "admin_ram":
2695
          val = instance.memory
2696
        elif field == "oper_ram":
2697
          if instance.primary_node in bad_nodes:
2698
            val = None
2699
          elif instance.name in live_data:
2700
            val = live_data[instance.name].get("memory", "?")
2701
          else:
2702
            val = "-"
2703
        elif field == "disk_template":
2704
          val = instance.disk_template
2705
        elif field == "ip":
2706
          val = instance.nics[0].ip
2707
        elif field == "bridge":
2708
          val = instance.nics[0].bridge
2709
        elif field == "mac":
2710
          val = instance.nics[0].mac
2711
        elif field == "sda_size" or field == "sdb_size":
2712
          disk = instance.FindDisk(field[:3])
2713
          if disk is None:
2714
            val = None
2715
          else:
2716
            val = disk.size
2717
        elif field == "vcpus":
2718
          val = instance.vcpus
2719
        elif field == "tags":
2720
          val = list(instance.GetTags())
2721
        elif field == "serial_no":
2722
          val = instance.serial_no
2723
        elif field in ("network_port", "kernel_path", "initrd_path",
2724
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2725
                       "hvm_cdrom_image_path", "hvm_nic_type",
2726
                       "hvm_disk_type", "vnc_bind_address"):
2727
          val = getattr(instance, field, None)
2728
          if val is not None:
2729
            pass
2730
          elif field in ("hvm_nic_type", "hvm_disk_type",
2731
                         "kernel_path", "initrd_path"):
2732
            val = "default"
2733
          else:
2734
            val = "-"
2735
        else:
2736
          raise errors.ParameterError(field)
2737
        iout.append(val)
2738
      output.append(iout)
2739

    
2740
    return output
2741

    
2742

    
2743
class LUFailoverInstance(LogicalUnit):
2744
  """Failover an instance.
2745

2746
  """
2747
  HPATH = "instance-failover"
2748
  HTYPE = constants.HTYPE_INSTANCE
2749
  _OP_REQP = ["instance_name", "ignore_consistency"]
2750
  REQ_BGL = False
2751

    
2752
  def ExpandNames(self):
2753
    self._ExpandAndLockInstance()
2754
    self.needed_locks[locking.LEVEL_NODE] = []
2755
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2756

    
2757
  def DeclareLocks(self, level):
2758
    if level == locking.LEVEL_NODE:
2759
      self._LockInstancesNodes()
2760

    
2761
  def BuildHooksEnv(self):
2762
    """Build hooks env.
2763

2764
    This runs on master, primary and secondary nodes of the instance.
2765

2766
    """
2767
    env = {
2768
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2769
      }
2770
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2771
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2772
    return env, nl, nl
2773

    
2774
  def CheckPrereq(self):
2775
    """Check prerequisites.
2776

2777
    This checks that the instance is in the cluster.
2778

2779
    """
2780
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2781
    assert self.instance is not None, \
2782
      "Cannot retrieve locked instance %s" % self.op.instance_name
2783

    
2784
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2785
      raise errors.OpPrereqError("Instance's disk layout is not"
2786
                                 " network mirrored, cannot failover.")
2787

    
2788
    secondary_nodes = instance.secondary_nodes
2789
    if not secondary_nodes:
2790
      raise errors.ProgrammerError("no secondary node but using "
2791
                                   "a mirrored disk template")
2792

    
2793
    target_node = secondary_nodes[0]
2794
    # check memory requirements on the secondary node
2795
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2796
                         instance.name, instance.memory)
2797

    
2798
    # check bridge existance
2799
    brlist = [nic.bridge for nic in instance.nics]
2800
    if not rpc.call_bridges_exist(target_node, brlist):
2801
      raise errors.OpPrereqError("One or more target bridges %s does not"
2802
                                 " exist on destination node '%s'" %
2803
                                 (brlist, target_node))
2804

    
2805
  def Exec(self, feedback_fn):
2806
    """Failover an instance.
2807

2808
    The failover is done by shutting it down on its present node and
2809
    starting it on the secondary.
2810

2811
    """
2812
    instance = self.instance
2813

    
2814
    source_node = instance.primary_node
2815
    target_node = instance.secondary_nodes[0]
2816

    
2817
    feedback_fn("* checking disk consistency between source and target")
2818
    for dev in instance.disks:
2819
      # for drbd, these are drbd over lvm
2820
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2821
        if instance.status == "up" and not self.op.ignore_consistency:
2822
          raise errors.OpExecError("Disk %s is degraded on target node,"
2823
                                   " aborting failover." % dev.iv_name)
2824

    
2825
    feedback_fn("* shutting down instance on source node")
2826
    logger.Info("Shutting down instance %s on node %s" %
2827
                (instance.name, source_node))
2828

    
2829
    if not rpc.call_instance_shutdown(source_node, instance):
2830
      if self.op.ignore_consistency:
2831
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2832
                     " anyway. Please make sure node %s is down"  %
2833
                     (instance.name, source_node, source_node))
2834
      else:
2835
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2836
                                 (instance.name, source_node))
2837

    
2838
    feedback_fn("* deactivating the instance's disks on source node")
2839
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2840
      raise errors.OpExecError("Can't shut down the instance's disks.")
2841

    
2842
    instance.primary_node = target_node
2843
    # distribute new instance config to the other nodes
2844
    self.cfg.Update(instance)
2845

    
2846
    # Only start the instance if it's marked as up
2847
    if instance.status == "up":
2848
      feedback_fn("* activating the instance's disks on target node")
2849
      logger.Info("Starting instance %s on node %s" %
2850
                  (instance.name, target_node))
2851

    
2852
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2853
                                               ignore_secondaries=True)
2854
      if not disks_ok:
2855
        _ShutdownInstanceDisks(instance, self.cfg)
2856
        raise errors.OpExecError("Can't activate the instance's disks")
2857

    
2858
      feedback_fn("* starting the instance on the target node")
2859
      if not rpc.call_instance_start(target_node, instance, None):
2860
        _ShutdownInstanceDisks(instance, self.cfg)
2861
        raise errors.OpExecError("Could not start instance %s on node %s." %
2862
                                 (instance.name, target_node))
2863

    
2864

    
2865
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2866
  """Create a tree of block devices on the primary node.
2867

2868
  This always creates all devices.
2869

2870
  """
2871
  if device.children:
2872
    for child in device.children:
2873
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2874
        return False
2875

    
2876
  cfg.SetDiskID(device, node)
2877
  new_id = rpc.call_blockdev_create(node, device, device.size,
2878
                                    instance.name, True, info)
2879
  if not new_id:
2880
    return False
2881
  if device.physical_id is None:
2882
    device.physical_id = new_id
2883
  return True
2884

    
2885

    
2886
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2887
  """Create a tree of block devices on a secondary node.
2888

2889
  If this device type has to be created on secondaries, create it and
2890
  all its children.
2891

2892
  If not, just recurse to children keeping the same 'force' value.
2893

2894
  """
2895
  if device.CreateOnSecondary():
2896
    force = True
2897
  if device.children:
2898
    for child in device.children:
2899
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2900
                                        child, force, info):
2901
        return False
2902

    
2903
  if not force:
2904
    return True
2905
  cfg.SetDiskID(device, node)
2906
  new_id = rpc.call_blockdev_create(node, device, device.size,
2907
                                    instance.name, False, info)
2908
  if not new_id:
2909
    return False
2910
  if device.physical_id is None:
2911
    device.physical_id = new_id
2912
  return True
2913

    
2914

    
2915
def _GenerateUniqueNames(cfg, exts):
2916
  """Generate a suitable LV name.
2917

2918
  This will generate a logical volume name for the given instance.
2919

2920
  """
2921
  results = []
2922
  for val in exts:
2923
    new_id = cfg.GenerateUniqueID()
2924
    results.append("%s%s" % (new_id, val))
2925
  return results
2926

    
2927

    
2928
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2929
                         p_minor, s_minor):
2930
  """Generate a drbd8 device complete with its children.
2931

2932
  """
2933
  port = cfg.AllocatePort()
2934
  vgname = cfg.GetVGName()
2935
  shared_secret = cfg.GenerateDRBDSecret()
2936
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2937
                          logical_id=(vgname, names[0]))
2938
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2939
                          logical_id=(vgname, names[1]))
2940
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2941
                          logical_id=(primary, secondary, port,
2942
                                      p_minor, s_minor,
2943
                                      shared_secret),
2944
                          children=[dev_data, dev_meta],
2945
                          iv_name=iv_name)
2946
  return drbd_dev
2947

    
2948

    
2949
def _GenerateDiskTemplate(cfg, template_name,
2950
                          instance_name, primary_node,
2951
                          secondary_nodes, disk_sz, swap_sz,
2952
                          file_storage_dir, file_driver):
2953
  """Generate the entire disk layout for a given template type.
2954

2955
  """
2956
  #TODO: compute space requirements
2957

    
2958
  vgname = cfg.GetVGName()
2959
  if template_name == constants.DT_DISKLESS:
2960
    disks = []
2961
  elif template_name == constants.DT_PLAIN:
2962
    if len(secondary_nodes) != 0:
2963
      raise errors.ProgrammerError("Wrong template configuration")
2964

    
2965
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2966
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2967
                           logical_id=(vgname, names[0]),
2968
                           iv_name = "sda")
2969
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2970
                           logical_id=(vgname, names[1]),
2971
                           iv_name = "sdb")
2972
    disks = [sda_dev, sdb_dev]
2973
  elif template_name == constants.DT_DRBD8:
2974
    if len(secondary_nodes) != 1:
2975
      raise errors.ProgrammerError("Wrong template configuration")
2976
    remote_node = secondary_nodes[0]
2977
    (minor_pa, minor_pb,
2978
     minor_sa, minor_sb) = cfg.AllocateDRBDMinor(
2979
      [primary_node, primary_node, remote_node, remote_node], instance_name)
2980

    
2981
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2982
                                       ".sdb_data", ".sdb_meta"])
2983
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2984
                                        disk_sz, names[0:2], "sda",
2985
                                        minor_pa, minor_sa)
2986
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2987
                                        swap_sz, names[2:4], "sdb",
2988
                                        minor_pb, minor_sb)
2989
    disks = [drbd_sda_dev, drbd_sdb_dev]
2990
  elif template_name == constants.DT_FILE:
2991
    if len(secondary_nodes) != 0:
2992
      raise errors.ProgrammerError("Wrong template configuration")
2993

    
2994
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2995
                                iv_name="sda", logical_id=(file_driver,
2996
                                "%s/sda" % file_storage_dir))
2997
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2998
                                iv_name="sdb", logical_id=(file_driver,
2999
                                "%s/sdb" % file_storage_dir))
3000
    disks = [file_sda_dev, file_sdb_dev]
3001
  else:
3002
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3003
  return disks
3004

    
3005

    
3006
def _GetInstanceInfoText(instance):
3007
  """Compute that text that should be added to the disk's metadata.
3008

3009
  """
3010
  return "originstname+%s" % instance.name
3011

    
3012

    
3013
def _CreateDisks(cfg, instance):
3014
  """Create all disks for an instance.
3015

3016
  This abstracts away some work from AddInstance.
3017

3018
  Args:
3019
    instance: the instance object
3020

3021
  Returns:
3022
    True or False showing the success of the creation process
3023

3024
  """
3025
  info = _GetInstanceInfoText(instance)
3026

    
3027
  if instance.disk_template == constants.DT_FILE:
3028
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3029
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3030
                                              file_storage_dir)
3031

    
3032
    if not result:
3033
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3034
      return False
3035

    
3036
    if not result[0]:
3037
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3038
      return False
3039

    
3040
  for device in instance.disks:
3041
    logger.Info("creating volume %s for instance %s" %
3042
                (device.iv_name, instance.name))
3043
    #HARDCODE
3044
    for secondary_node in instance.secondary_nodes:
3045
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3046
                                        device, False, info):
3047
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3048
                     (device.iv_name, device, secondary_node))
3049
        return False
3050
    #HARDCODE
3051
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3052
                                    instance, device, info):
3053
      logger.Error("failed to create volume %s on primary!" %
3054
                   device.iv_name)
3055
      return False
3056

    
3057
  return True
3058

    
3059

    
3060
def _RemoveDisks(instance, cfg):
3061
  """Remove all disks for an instance.
3062

3063
  This abstracts away some work from `AddInstance()` and
3064
  `RemoveInstance()`. Note that in case some of the devices couldn't
3065
  be removed, the removal will continue with the other ones (compare
3066
  with `_CreateDisks()`).
3067

3068
  Args:
3069
    instance: the instance object
3070

3071
  Returns:
3072
    True or False showing the success of the removal proces
3073

3074
  """
3075
  logger.Info("removing block devices for instance %s" % instance.name)
3076

    
3077
  result = True
3078
  for device in instance.disks:
3079
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3080
      cfg.SetDiskID(disk, node)
3081
      if not rpc.call_blockdev_remove(node, disk):
3082
        logger.Error("could not remove block device %s on node %s,"
3083
                     " continuing anyway" %
3084
                     (device.iv_name, node))
3085
        result = False
3086

    
3087
  if instance.disk_template == constants.DT_FILE:
3088
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3089
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3090
                                            file_storage_dir):
3091
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3092
      result = False
3093

    
3094
  return result
3095

    
3096

    
3097
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3098
  """Compute disk size requirements in the volume group
3099

3100
  This is currently hard-coded for the two-drive layout.
3101

3102
  """
3103
  # Required free disk space as a function of disk and swap space
3104
  req_size_dict = {
3105
    constants.DT_DISKLESS: None,
3106
    constants.DT_PLAIN: disk_size + swap_size,
3107
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3108
    constants.DT_DRBD8: disk_size + swap_size + 256,
3109
    constants.DT_FILE: None,
3110
  }
3111

    
3112
  if disk_template not in req_size_dict:
3113
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3114
                                 " is unknown" %  disk_template)
3115

    
3116
  return req_size_dict[disk_template]
3117

    
3118

    
3119
class LUCreateInstance(LogicalUnit):
3120
  """Create an instance.
3121

3122
  """
3123
  HPATH = "instance-add"
3124
  HTYPE = constants.HTYPE_INSTANCE
3125
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3126
              "disk_template", "swap_size", "mode", "start", "vcpus",
3127
              "wait_for_sync", "ip_check", "mac"]
3128
  REQ_BGL = False
3129

    
3130
  def _ExpandNode(self, node):
3131
    """Expands and checks one node name.
3132

3133
    """
3134
    node_full = self.cfg.ExpandNodeName(node)
3135
    if node_full is None:
3136
      raise errors.OpPrereqError("Unknown node %s" % node)
3137
    return node_full
3138

    
3139
  def ExpandNames(self):
3140
    """ExpandNames for CreateInstance.
3141

3142
    Figure out the right locks for instance creation.
3143

3144
    """
3145
    self.needed_locks = {}
3146

    
3147
    # set optional parameters to none if they don't exist
3148
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3149
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3150
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3151
                 "vnc_bind_address"]:
3152
      if not hasattr(self.op, attr):
3153
        setattr(self.op, attr, None)
3154

    
3155
    # verify creation mode
3156
    if self.op.mode not in (constants.INSTANCE_CREATE,
3157
                            constants.INSTANCE_IMPORT):
3158
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3159
                                 self.op.mode)
3160
    # disk template and mirror node verification
3161
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3162
      raise errors.OpPrereqError("Invalid disk template name")
3163

    
3164
    #### instance parameters check
3165

    
3166
    # instance name verification
3167
    hostname1 = utils.HostInfo(self.op.instance_name)
3168
    self.op.instance_name = instance_name = hostname1.name
3169

    
3170
    # this is just a preventive check, but someone might still add this
3171
    # instance in the meantime, and creation will fail at lock-add time
3172
    if instance_name in self.cfg.GetInstanceList():
3173
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3174
                                 instance_name)
3175

    
3176
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3177

    
3178
    # ip validity checks
3179
    ip = getattr(self.op, "ip", None)
3180
    if ip is None or ip.lower() == "none":
3181
      inst_ip = None
3182
    elif ip.lower() == "auto":
3183
      inst_ip = hostname1.ip
3184
    else:
3185
      if not utils.IsValidIP(ip):
3186
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3187
                                   " like a valid IP" % ip)
3188
      inst_ip = ip
3189
    self.inst_ip = self.op.ip = inst_ip
3190
    # used in CheckPrereq for ip ping check
3191
    self.check_ip = hostname1.ip
3192

    
3193
    # MAC address verification
3194
    if self.op.mac != "auto":
3195
      if not utils.IsValidMac(self.op.mac.lower()):
3196
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3197
                                   self.op.mac)
3198

    
3199
    # boot order verification
3200
    if self.op.hvm_boot_order is not None:
3201
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3202
        raise errors.OpPrereqError("invalid boot order specified,"
3203
                                   " must be one or more of [acdn]")
3204
    # file storage checks
3205
    if (self.op.file_driver and
3206
        not self.op.file_driver in constants.FILE_DRIVER):
3207
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3208
                                 self.op.file_driver)
3209

    
3210
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3211
      raise errors.OpPrereqError("File storage directory path not absolute")
3212

    
3213
    ### Node/iallocator related checks
3214
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3215
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3216
                                 " node must be given")
3217

    
3218
    if self.op.iallocator:
3219
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3220
    else:
3221
      self.op.pnode = self._ExpandNode(self.op.pnode)
3222
      nodelist = [self.op.pnode]
3223
      if self.op.snode is not None:
3224
        self.op.snode = self._ExpandNode(self.op.snode)
3225
        nodelist.append(self.op.snode)
3226
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3227

    
3228
    # in case of import lock the source node too
3229
    if self.op.mode == constants.INSTANCE_IMPORT:
3230
      src_node = getattr(self.op, "src_node", None)
3231
      src_path = getattr(self.op, "src_path", None)
3232

    
3233
      if src_node is None or src_path is None:
3234
        raise errors.OpPrereqError("Importing an instance requires source"
3235
                                   " node and path options")
3236

    
3237
      if not os.path.isabs(src_path):
3238
        raise errors.OpPrereqError("The source path must be absolute")
3239

    
3240
      self.op.src_node = src_node = self._ExpandNode(src_node)
3241
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3242
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3243

    
3244
    else: # INSTANCE_CREATE
3245
      if getattr(self.op, "os_type", None) is None:
3246
        raise errors.OpPrereqError("No guest OS specified")
3247

    
3248
  def _RunAllocator(self):
3249
    """Run the allocator based on input opcode.
3250

3251
    """
3252
    disks = [{"size": self.op.disk_size, "mode": "w"},
3253
             {"size": self.op.swap_size, "mode": "w"}]
3254
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3255
             "bridge": self.op.bridge}]
3256
    ial = IAllocator(self.cfg,
3257
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3258
                     name=self.op.instance_name,
3259
                     disk_template=self.op.disk_template,
3260
                     tags=[],
3261
                     os=self.op.os_type,
3262
                     vcpus=self.op.vcpus,
3263
                     mem_size=self.op.mem_size,
3264
                     disks=disks,
3265
                     nics=nics,
3266
                     )
3267

    
3268
    ial.Run(self.op.iallocator)
3269

    
3270
    if not ial.success:
3271
      raise errors.OpPrereqError("Can't compute nodes using"
3272
                                 " iallocator '%s': %s" % (self.op.iallocator,
3273
                                                           ial.info))
3274
    if len(ial.nodes) != ial.required_nodes:
3275
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3276
                                 " of nodes (%s), required %s" %
3277
                                 (self.op.iallocator, len(ial.nodes),
3278
                                  ial.required_nodes))
3279
    self.op.pnode = ial.nodes[0]
3280
    logger.ToStdout("Selected nodes for the instance: %s" %
3281
                    (", ".join(ial.nodes),))
3282
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3283
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3284
    if ial.required_nodes == 2:
3285
      self.op.snode = ial.nodes[1]
3286

    
3287
  def BuildHooksEnv(self):
3288
    """Build hooks env.
3289

3290
    This runs on master, primary and secondary nodes of the instance.
3291

3292
    """
3293
    env = {
3294
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3295
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3296
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3297
      "INSTANCE_ADD_MODE": self.op.mode,
3298
      }
3299
    if self.op.mode == constants.INSTANCE_IMPORT:
3300
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3301
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3302
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3303

    
3304
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3305
      primary_node=self.op.pnode,
3306
      secondary_nodes=self.secondaries,
3307
      status=self.instance_status,
3308
      os_type=self.op.os_type,
3309
      memory=self.op.mem_size,
3310
      vcpus=self.op.vcpus,
3311
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3312
    ))
3313

    
3314
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3315
          self.secondaries)
3316
    return env, nl, nl
3317

    
3318

    
3319
  def CheckPrereq(self):
3320
    """Check prerequisites.
3321

3322
    """
3323
    if (not self.cfg.GetVGName() and
3324
        self.op.disk_template not in constants.DTS_NOT_LVM):
3325
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3326
                                 " instances")
3327

    
3328
    if self.op.mode == constants.INSTANCE_IMPORT:
3329
      src_node = self.op.src_node
3330
      src_path = self.op.src_path
3331

    
3332
      export_info = rpc.call_export_info(src_node, src_path)
3333

    
3334
      if not export_info:
3335
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3336

    
3337
      if not export_info.has_section(constants.INISECT_EXP):
3338
        raise errors.ProgrammerError("Corrupted export config")
3339

    
3340
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3341
      if (int(ei_version) != constants.EXPORT_VERSION):
3342
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3343
                                   (ei_version, constants.EXPORT_VERSION))
3344

    
3345
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3346
        raise errors.OpPrereqError("Can't import instance with more than"
3347
                                   " one data disk")
3348

    
3349
      # FIXME: are the old os-es, disk sizes, etc. useful?
3350
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3351
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3352
                                                         'disk0_dump'))
3353
      self.src_image = diskimage
3354

    
3355
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3356

    
3357
    if self.op.start and not self.op.ip_check:
3358
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3359
                                 " adding an instance in start mode")
3360

    
3361
    if self.op.ip_check:
3362
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3363
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3364
                                   (self.check_ip, instance_name))
3365

    
3366
    # bridge verification
3367
    bridge = getattr(self.op, "bridge", None)
3368
    if bridge is None:
3369
      self.op.bridge = self.cfg.GetDefBridge()
3370
    else:
3371
      self.op.bridge = bridge
3372

    
3373
    #### allocator run
3374

    
3375
    if self.op.iallocator is not None:
3376
      self._RunAllocator()
3377

    
3378
    #### node related checks
3379

    
3380
    # check primary node
3381
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3382
    assert self.pnode is not None, \
3383
      "Cannot retrieve locked node %s" % self.op.pnode
3384
    self.secondaries = []
3385

    
3386
    # mirror node verification
3387
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3388
      if self.op.snode is None:
3389
        raise errors.OpPrereqError("The networked disk templates need"
3390
                                   " a mirror node")
3391
      if self.op.snode == pnode.name:
3392
        raise errors.OpPrereqError("The secondary node cannot be"
3393
                                   " the primary node.")
3394
      self.secondaries.append(self.op.snode)
3395

    
3396
    req_size = _ComputeDiskSize(self.op.disk_template,
3397
                                self.op.disk_size, self.op.swap_size)
3398

    
3399
    # Check lv size requirements
3400
    if req_size is not None:
3401
      nodenames = [pnode.name] + self.secondaries
3402
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3403
      for node in nodenames:
3404
        info = nodeinfo.get(node, None)
3405
        if not info:
3406
          raise errors.OpPrereqError("Cannot get current information"
3407
                                     " from node '%s'" % node)
3408
        vg_free = info.get('vg_free', None)
3409
        if not isinstance(vg_free, int):
3410
          raise errors.OpPrereqError("Can't compute free disk space on"
3411
                                     " node %s" % node)
3412
        if req_size > info['vg_free']:
3413
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3414
                                     " %d MB available, %d MB required" %
3415
                                     (node, info['vg_free'], req_size))
3416

    
3417
    # os verification
3418
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3419
    if not os_obj:
3420
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3421
                                 " primary node"  % self.op.os_type)
3422

    
3423
    if self.op.kernel_path == constants.VALUE_NONE:
3424
      raise errors.OpPrereqError("Can't set instance kernel to none")
3425

    
3426
    # bridge check on primary node
3427
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3428
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3429
                                 " destination node '%s'" %
3430
                                 (self.op.bridge, pnode.name))
3431

    
3432
    # memory check on primary node
3433
    if self.op.start:
3434
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3435
                           "creating instance %s" % self.op.instance_name,
3436
                           self.op.mem_size)
3437

    
3438
    # hvm_cdrom_image_path verification
3439
    if self.op.hvm_cdrom_image_path is not None:
3440
      # FIXME (als): shouldn't these checks happen on the destination node?
3441
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3442
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3443
                                   " be an absolute path or None, not %s" %
3444
                                   self.op.hvm_cdrom_image_path)
3445
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3446
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3447
                                   " regular file or a symlink pointing to"
3448
                                   " an existing regular file, not %s" %
3449
                                   self.op.hvm_cdrom_image_path)
3450

    
3451
    # vnc_bind_address verification
3452
    if self.op.vnc_bind_address is not None:
3453
      if not utils.IsValidIP(self.op.vnc_bind_address):
3454
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3455
                                   " like a valid IP address" %
3456
                                   self.op.vnc_bind_address)
3457

    
3458
    # Xen HVM device type checks
3459
    if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
3460
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3461
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3462
                                   " hypervisor" % self.op.hvm_nic_type)
3463
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3464
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3465
                                   " hypervisor" % self.op.hvm_disk_type)
3466

    
3467
    if self.op.start:
3468
      self.instance_status = 'up'
3469
    else:
3470
      self.instance_status = 'down'
3471

    
3472
  def Exec(self, feedback_fn):
3473
    """Create and add the instance to the cluster.
3474

3475
    """
3476
    instance = self.op.instance_name
3477
    pnode_name = self.pnode.name
3478

    
3479
    if self.op.mac == "auto":
3480
      mac_address = self.cfg.GenerateMAC()
3481
    else:
3482
      mac_address = self.op.mac
3483

    
3484
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3485
    if self.inst_ip is not None:
3486
      nic.ip = self.inst_ip
3487

    
3488
    ht_kind = self.cfg.GetHypervisorType()
3489
    if ht_kind in constants.HTS_REQ_PORT:
3490
      network_port = self.cfg.AllocatePort()
3491
    else:
3492
      network_port = None
3493

    
3494
    if self.op.vnc_bind_address is None:
3495
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3496

    
3497
    # this is needed because os.path.join does not accept None arguments
3498
    if self.op.file_storage_dir is None:
3499
      string_file_storage_dir = ""
3500
    else:
3501
      string_file_storage_dir = self.op.file_storage_dir
3502

    
3503
    # build the full file storage dir path
3504
    file_storage_dir = os.path.normpath(os.path.join(
3505
                                        self.cfg.GetFileStorageDir(),
3506
                                        string_file_storage_dir, instance))
3507

    
3508

    
3509
    disks = _GenerateDiskTemplate(self.cfg,
3510
                                  self.op.disk_template,
3511
                                  instance, pnode_name,
3512
                                  self.secondaries, self.op.disk_size,
3513
                                  self.op.swap_size,
3514
                                  file_storage_dir,
3515
                                  self.op.file_driver)
3516

    
3517
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3518
                            primary_node=pnode_name,
3519
                            memory=self.op.mem_size,
3520
                            vcpus=self.op.vcpus,
3521
                            nics=[nic], disks=disks,
3522
                            disk_template=self.op.disk_template,
3523
                            status=self.instance_status,
3524
                            network_port=network_port,
3525
                            kernel_path=self.op.kernel_path,
3526
                            initrd_path=self.op.initrd_path,
3527
                            hvm_boot_order=self.op.hvm_boot_order,
3528
                            hvm_acpi=self.op.hvm_acpi,
3529
                            hvm_pae=self.op.hvm_pae,
3530
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3531
                            vnc_bind_address=self.op.vnc_bind_address,
3532
                            hvm_nic_type=self.op.hvm_nic_type,
3533
                            hvm_disk_type=self.op.hvm_disk_type,
3534
                            )
3535

    
3536
    feedback_fn("* creating instance disks...")
3537
    if not _CreateDisks(self.cfg, iobj):
3538
      _RemoveDisks(iobj, self.cfg)
3539
      self.cfg.ReleaseDRBDMinors(instance)
3540
      raise errors.OpExecError("Device creation failed, reverting...")
3541

    
3542
    feedback_fn("adding instance %s to cluster config" % instance)
3543

    
3544
    self.cfg.AddInstance(iobj)
3545
    # Declare that we don't want to remove the instance lock anymore, as we've
3546
    # added the instance to the config
3547
    del self.remove_locks[locking.LEVEL_INSTANCE]
3548
    # Remove the temp. assignements for the instance's drbds
3549
    self.cfg.ReleaseDRBDMinors(instance)
3550

    
3551
    if self.op.wait_for_sync:
3552
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3553
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3554
      # make sure the disks are not degraded (still sync-ing is ok)
3555
      time.sleep(15)
3556
      feedback_fn("* checking mirrors status")
3557
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3558
    else:
3559
      disk_abort = False
3560

    
3561
    if disk_abort:
3562
      _RemoveDisks(iobj, self.cfg)
3563
      self.cfg.RemoveInstance(iobj.name)
3564
      # Make sure the instance lock gets removed
3565
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3566
      raise errors.OpExecError("There are some degraded disks for"
3567
                               " this instance")
3568

    
3569
    feedback_fn("creating os for instance %s on node %s" %
3570
                (instance, pnode_name))
3571

    
3572
    if iobj.disk_template != constants.DT_DISKLESS:
3573
      if self.op.mode == constants.INSTANCE_CREATE:
3574
        feedback_fn("* running the instance OS create scripts...")
3575
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3576
          raise errors.OpExecError("could not add os for instance %s"
3577
                                   " on node %s" %
3578
                                   (instance, pnode_name))
3579

    
3580
      elif self.op.mode == constants.INSTANCE_IMPORT:
3581
        feedback_fn("* running the instance OS import scripts...")
3582
        src_node = self.op.src_node
3583
        src_image = self.src_image
3584
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3585
                                                src_node, src_image):
3586
          raise errors.OpExecError("Could not import os for instance"
3587
                                   " %s on node %s" %
3588
                                   (instance, pnode_name))
3589
      else:
3590
        # also checked in the prereq part
3591
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3592
                                     % self.op.mode)
3593

    
3594
    if self.op.start:
3595
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3596
      feedback_fn("* starting instance...")
3597
      if not rpc.call_instance_start(pnode_name, iobj, None):
3598
        raise errors.OpExecError("Could not start instance")
3599

    
3600

    
3601
class LUConnectConsole(NoHooksLU):
3602
  """Connect to an instance's console.
3603

3604
  This is somewhat special in that it returns the command line that
3605
  you need to run on the master node in order to connect to the
3606
  console.
3607

3608
  """
3609
  _OP_REQP = ["instance_name"]
3610
  REQ_BGL = False
3611

    
3612
  def ExpandNames(self):
3613
    self._ExpandAndLockInstance()
3614

    
3615
  def CheckPrereq(self):
3616
    """Check prerequisites.
3617

3618
    This checks that the instance is in the cluster.
3619

3620
    """
3621
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3622
    assert self.instance is not None, \
3623
      "Cannot retrieve locked instance %s" % self.op.instance_name
3624

    
3625
  def Exec(self, feedback_fn):
3626
    """Connect to the console of an instance
3627

3628
    """
3629
    instance = self.instance
3630
    node = instance.primary_node
3631

    
3632
    node_insts = rpc.call_instance_list([node])[node]
3633
    if node_insts is False:
3634
      raise errors.OpExecError("Can't connect to node %s." % node)
3635

    
3636
    if instance.name not in node_insts:
3637
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3638

    
3639
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3640

    
3641
    hyper = hypervisor.GetHypervisor(self.cfg)
3642
    console_cmd = hyper.GetShellCommandForConsole(instance)
3643

    
3644
    # build ssh cmdline
3645
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3646

    
3647

    
3648
class LUReplaceDisks(LogicalUnit):
3649
  """Replace the disks of an instance.
3650

3651
  """
3652
  HPATH = "mirrors-replace"
3653
  HTYPE = constants.HTYPE_INSTANCE
3654
  _OP_REQP = ["instance_name", "mode", "disks"]
3655
  REQ_BGL = False
3656

    
3657
  def ExpandNames(self):
3658
    self._ExpandAndLockInstance()
3659

    
3660
    if not hasattr(self.op, "remote_node"):
3661
      self.op.remote_node = None
3662

    
3663
    ia_name = getattr(self.op, "iallocator", None)
3664
    if ia_name is not None:
3665
      if self.op.remote_node is not None:
3666
        raise errors.OpPrereqError("Give either the iallocator or the new"
3667
                                   " secondary, not both")
3668
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3669
    elif self.op.remote_node is not None:
3670
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3671
      if remote_node is None:
3672
        raise errors.OpPrereqError("Node '%s' not known" %
3673
                                   self.op.remote_node)
3674
      self.op.remote_node = remote_node
3675
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3676
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3677
    else:
3678
      self.needed_locks[locking.LEVEL_NODE] = []
3679
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3680

    
3681
  def DeclareLocks(self, level):
3682
    # If we're not already locking all nodes in the set we have to declare the
3683
    # instance's primary/secondary nodes.
3684
    if (level == locking.LEVEL_NODE and
3685
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3686
      self._LockInstancesNodes()
3687

    
3688
  def _RunAllocator(self):
3689
    """Compute a new secondary node using an IAllocator.
3690

3691
    """
3692
    ial = IAllocator(self.cfg,
3693
                     mode=constants.IALLOCATOR_MODE_RELOC,
3694
                     name=self.op.instance_name,
3695
                     relocate_from=[self.sec_node])
3696

    
3697
    ial.Run(self.op.iallocator)
3698

    
3699
    if not ial.success:
3700
      raise errors.OpPrereqError("Can't compute nodes using"
3701
                                 " iallocator '%s': %s" % (self.op.iallocator,
3702
                                                           ial.info))
3703
    if len(ial.nodes) != ial.required_nodes:
3704
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3705
                                 " of nodes (%s), required %s" %
3706
                                 (len(ial.nodes), ial.required_nodes))
3707
    self.op.remote_node = ial.nodes[0]
3708
    logger.ToStdout("Selected new secondary for the instance: %s" %
3709
                    self.op.remote_node)
3710

    
3711
  def BuildHooksEnv(self):
3712
    """Build hooks env.
3713

3714
    This runs on the master, the primary and all the secondaries.
3715

3716
    """
3717
    env = {
3718
      "MODE": self.op.mode,
3719
      "NEW_SECONDARY": self.op.remote_node,
3720
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3721
      }
3722
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3723
    nl = [
3724
      self.cfg.GetMasterNode(),
3725
      self.instance.primary_node,
3726
      ]
3727
    if self.op.remote_node is not None:
3728
      nl.append(self.op.remote_node)
3729
    return env, nl, nl
3730

    
3731
  def CheckPrereq(self):
3732
    """Check prerequisites.
3733

3734
    This checks that the instance is in the cluster.
3735

3736
    """
3737
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3738
    assert instance is not None, \
3739
      "Cannot retrieve locked instance %s" % self.op.instance_name
3740
    self.instance = instance
3741

    
3742
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3743
      raise errors.OpPrereqError("Instance's disk layout is not"
3744
                                 " network mirrored.")
3745

    
3746
    if len(instance.secondary_nodes) != 1:
3747
      raise errors.OpPrereqError("The instance has a strange layout,"
3748
                                 " expected one secondary but found %d" %
3749
                                 len(instance.secondary_nodes))
3750

    
3751
    self.sec_node = instance.secondary_nodes[0]
3752

    
3753
    ia_name = getattr(self.op, "iallocator", None)
3754
    if ia_name is not None:
3755
      self._RunAllocator()
3756

    
3757
    remote_node = self.op.remote_node
3758
    if remote_node is not None:
3759
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3760
      assert self.remote_node_info is not None, \
3761
        "Cannot retrieve locked node %s" % remote_node
3762
    else:
3763
      self.remote_node_info = None
3764
    if remote_node == instance.primary_node:
3765
      raise errors.OpPrereqError("The specified node is the primary node of"
3766
                                 " the instance.")
3767
    elif remote_node == self.sec_node:
3768
      if self.op.mode == constants.REPLACE_DISK_SEC:
3769
        # this is for DRBD8, where we can't execute the same mode of
3770
        # replacement as for drbd7 (no different port allocated)
3771
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3772
                                   " replacement")
3773
    if instance.disk_template == constants.DT_DRBD8:
3774
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3775
          remote_node is not None):
3776
        # switch to replace secondary mode
3777
        self.op.mode = constants.REPLACE_DISK_SEC
3778

    
3779
      if self.op.mode == constants.REPLACE_DISK_ALL:
3780
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3781
                                   " secondary disk replacement, not"
3782
                                   " both at once")
3783
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3784
        if remote_node is not None:
3785
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3786
                                     " the secondary while doing a primary"
3787
                                     " node disk replacement")
3788
        self.tgt_node = instance.primary_node
3789
        self.oth_node = instance.secondary_nodes[0]
3790
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3791
        self.new_node = remote_node # this can be None, in which case
3792
                                    # we don't change the secondary
3793
        self.tgt_node = instance.secondary_nodes[0]
3794
        self.oth_node = instance.primary_node
3795
      else:
3796
        raise errors.ProgrammerError("Unhandled disk replace mode")
3797

    
3798
    for name in self.op.disks:
3799
      if instance.FindDisk(name) is None:
3800
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3801
                                   (name, instance.name))
3802

    
3803
  def _ExecD8DiskOnly(self, feedback_fn):
3804
    """Replace a disk on the primary or secondary for dbrd8.
3805

3806
    The algorithm for replace is quite complicated:
3807
      - for each disk to be replaced:
3808
        - create new LVs on the target node with unique names
3809
        - detach old LVs from the drbd device
3810
        - rename old LVs to name_replaced.<time_t>
3811
        - rename new LVs to old LVs
3812
        - attach the new LVs (with the old names now) to the drbd device
3813
      - wait for sync across all devices
3814
      - for each modified disk:
3815
        - remove old LVs (which have the name name_replaces.<time_t>)
3816

3817
    Failures are not very well handled.
3818

3819
    """
3820
    steps_total = 6
3821
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3822
    instance = self.instance
3823
    iv_names = {}
3824
    vgname = self.cfg.GetVGName()
3825
    # start of work
3826
    cfg = self.cfg
3827
    tgt_node = self.tgt_node
3828
    oth_node = self.oth_node
3829

    
3830
    # Step: check device activation
3831
    self.proc.LogStep(1, steps_total, "check device existence")
3832
    info("checking volume groups")
3833
    my_vg = cfg.GetVGName()
3834
    results = rpc.call_vg_list([oth_node, tgt_node])
3835
    if not results:
3836
      raise errors.OpExecError("Can't list volume groups on the nodes")
3837
    for node in oth_node, tgt_node:
3838
      res = results.get(node, False)
3839
      if not res or my_vg not in res:
3840
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3841
                                 (my_vg, node))
3842
    for dev in instance.disks:
3843
      if not dev.iv_name in self.op.disks:
3844
        continue
3845
      for node in tgt_node, oth_node:
3846
        info("checking %s on %s" % (dev.iv_name, node))
3847
        cfg.SetDiskID(dev, node)
3848
        if not rpc.call_blockdev_find(node, dev):
3849
          raise errors.OpExecError("Can't find device %s on node %s" %
3850
                                   (dev.iv_name, node))
3851

    
3852
    # Step: check other node consistency
3853
    self.proc.LogStep(2, steps_total, "check peer consistency")
3854
    for dev in instance.disks:
3855
      if not dev.iv_name in self.op.disks:
3856
        continue
3857
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3858
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3859
                                   oth_node==instance.primary_node):
3860
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3861
                                 " to replace disks on this node (%s)" %
3862
                                 (oth_node, tgt_node))
3863

    
3864
    # Step: create new storage
3865
    self.proc.LogStep(3, steps_total, "allocate new storage")
3866
    for dev in instance.disks:
3867
      if not dev.iv_name in self.op.disks:
3868
        continue
3869
      size = dev.size
3870
      cfg.SetDiskID(dev, tgt_node)
3871
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3872
      names = _GenerateUniqueNames(cfg, lv_names)
3873
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3874
                             logical_id=(vgname, names[0]))
3875
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3876
                             logical_id=(vgname, names[1]))
3877
      new_lvs = [lv_data, lv_meta]
3878
      old_lvs = dev.children
3879
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3880
      info("creating new local storage on %s for %s" %
3881
           (tgt_node, dev.iv_name))
3882
      # since we *always* want to create this LV, we use the
3883
      # _Create...OnPrimary (which forces the creation), even if we
3884
      # are talking about the secondary node
3885
      for new_lv in new_lvs:
3886
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3887
                                        _GetInstanceInfoText(instance)):
3888
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3889
                                   " node '%s'" %
3890
                                   (new_lv.logical_id[1], tgt_node))
3891

    
3892
    # Step: for each lv, detach+rename*2+attach
3893
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3894
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3895
      info("detaching %s drbd from local storage" % dev.iv_name)
3896
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3897
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3898
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3899
      #dev.children = []
3900
      #cfg.Update(instance)
3901

    
3902
      # ok, we created the new LVs, so now we know we have the needed
3903
      # storage; as such, we proceed on the target node to rename
3904
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3905
      # using the assumption that logical_id == physical_id (which in
3906
      # turn is the unique_id on that node)
3907

    
3908
      # FIXME(iustin): use a better name for the replaced LVs
3909
      temp_suffix = int(time.time())
3910
      ren_fn = lambda d, suff: (d.physical_id[0],
3911
                                d.physical_id[1] + "_replaced-%s" % suff)
3912
      # build the rename list based on what LVs exist on the node
3913
      rlist = []
3914
      for to_ren in old_lvs:
3915
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3916
        if find_res is not None: # device exists
3917
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3918

    
3919
      info("renaming the old LVs on the target node")
3920
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3921
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3922
      # now we rename the new LVs to the old LVs
3923
      info("renaming the new LVs on the target node")
3924
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3925
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3926
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3927

    
3928
      for old, new in zip(old_lvs, new_lvs):
3929
        new.logical_id = old.logical_id
3930
        cfg.SetDiskID(new, tgt_node)
3931

    
3932
      for disk in old_lvs:
3933
        disk.logical_id = ren_fn(disk, temp_suffix)
3934
        cfg.SetDiskID(disk, tgt_node)
3935

    
3936
      # now that the new lvs have the old name, we can add them to the device
3937
      info("adding new mirror component on %s" % tgt_node)
3938
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3939
        for new_lv in new_lvs:
3940
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3941
            warning("Can't rollback device %s", hint="manually cleanup unused"
3942
                    " logical volumes")
3943
        raise errors.OpExecError("Can't add local storage to drbd")
3944

    
3945
      dev.children = new_lvs
3946
      cfg.Update(instance)
3947

    
3948
    # Step: wait for sync
3949

    
3950
    # this can fail as the old devices are degraded and _WaitForSync
3951
    # does a combined result over all disks, so we don't check its
3952
    # return value
3953
    self.proc.LogStep(5, steps_total, "sync devices")
3954
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3955

    
3956
    # so check manually all the devices
3957
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3958
      cfg.SetDiskID(dev, instance.primary_node)
3959
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3960
      if is_degr:
3961
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3962

    
3963
    # Step: remove old storage
3964
    self.proc.LogStep(6, steps_total, "removing old storage")
3965
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3966
      info("remove logical volumes for %s" % name)
3967
      for lv in old_lvs:
3968
        cfg.SetDiskID(lv, tgt_node)
3969
        if not rpc.call_blockdev_remove(tgt_node, lv):
3970
          warning("Can't remove old LV", hint="manually remove unused LVs")
3971
          continue
3972

    
3973
  def _ExecD8Secondary(self, feedback_fn):
3974
    """Replace the secondary node for drbd8.
3975

3976
    The algorithm for replace is quite complicated:
3977
      - for all disks of the instance:
3978
        - create new LVs on the new node with same names
3979
        - shutdown the drbd device on the old secondary
3980
        - disconnect the drbd network on the primary
3981
        - create the drbd device on the new secondary
3982
        - network attach the drbd on the primary, using an artifice:
3983
          the drbd code for Attach() will connect to the network if it
3984
          finds a device which is connected to the good local disks but
3985
          not network enabled
3986
      - wait for sync across all devices
3987
      - remove all disks from the old secondary
3988

3989
    Failures are not very well handled.
3990

3991
    """
3992
    steps_total = 6
3993
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3994
    instance = self.instance
3995
    iv_names = {}
3996
    vgname = self.cfg.GetVGName()
3997
    # start of work
3998
    cfg = self.cfg
3999
    old_node = self.tgt_node
4000
    new_node = self.new_node
4001
    pri_node = instance.primary_node
4002

    
4003
    # Step: check device activation
4004
    self.proc.LogStep(1, steps_total, "check device existence")
4005
    info("checking volume groups")
4006
    my_vg = cfg.GetVGName()
4007
    results = rpc.call_vg_list([pri_node, new_node])
4008
    if not results:
4009
      raise errors.OpExecError("Can't list volume groups on the nodes")
4010
    for node in pri_node, new_node:
4011
      res = results.get(node, False)
4012
      if not res or my_vg not in res:
4013
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4014
                                 (my_vg, node))
4015
    for dev in instance.disks:
4016
      if not dev.iv_name in self.op.disks:
4017
        continue
4018
      info("checking %s on %s" % (dev.iv_name, pri_node))
4019
      cfg.SetDiskID(dev, pri_node)
4020
      if not rpc.call_blockdev_find(pri_node, dev):
4021
        raise errors.OpExecError("Can't find device %s on node %s" %
4022
                                 (dev.iv_name, pri_node))
4023

    
4024
    # Step: check other node consistency
4025
    self.proc.LogStep(2, steps_total, "check peer consistency")
4026
    for dev in instance.disks:
4027
      if not dev.iv_name in self.op.disks:
4028
        continue
4029
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4030
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4031
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4032
                                 " unsafe to replace the secondary" %
4033
                                 pri_node)
4034

    
4035
    # Step: create new storage
4036
    self.proc.LogStep(3, steps_total, "allocate new storage")
4037
    for dev in instance.disks:
4038
      size = dev.size
4039
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4040
      # since we *always* want to create this LV, we use the
4041
      # _Create...OnPrimary (which forces the creation), even if we
4042
      # are talking about the secondary node
4043
      for new_lv in dev.children:
4044
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4045
                                        _GetInstanceInfoText(instance)):
4046
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4047
                                   " node '%s'" %
4048
                                   (new_lv.logical_id[1], new_node))
4049

    
4050

    
4051
    # Step 4: dbrd minors and drbd setups changes
4052
    # after this, we must manually remove the drbd minors on both the
4053
    # error and the success paths
4054
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4055
                                   instance.name)
4056
    logging.debug("Allocated minors %s" % (minors,))
4057
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4058
    for dev, new_minor in zip(instance.disks, minors):
4059
      size = dev.size
4060
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4061
      # create new devices on new_node
4062
      if pri_node == dev.logical_id[0]:
4063
        new_logical_id = (pri_node, new_node,
4064
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4065
                          dev.logical_id[5])
4066
      else:
4067
        new_logical_id = (new_node, pri_node,
4068
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4069
                          dev.logical_id[5])
4070
      iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4071
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4072
                    new_logical_id)
4073
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4074
                              logical_id=new_logical_id,
4075
                              children=dev.children)
4076
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4077
                                        new_drbd, False,
4078
                                      _GetInstanceInfoText(instance)):
4079
        self.cfg.ReleaseDRBDMinors(instance.name)
4080
        raise errors.OpExecError("Failed to create new DRBD on"
4081
                                 " node '%s'" % new_node)
4082

    
4083
    for dev in instance.disks:
4084
      # we have new devices, shutdown the drbd on the old secondary
4085
      info("shutting down drbd for %s on old node" % dev.iv_name)
4086
      cfg.SetDiskID(dev, old_node)
4087
      if not rpc.call_blockdev_shutdown(old_node, dev):
4088
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4089
                hint="Please cleanup this device manually as soon as possible")
4090

    
4091
    info("detaching primary drbds from the network (=> standalone)")
4092
    done = 0
4093
    for dev in instance.disks:
4094
      cfg.SetDiskID(dev, pri_node)
4095
      # set the network part of the physical (unique in bdev terms) id
4096
      # to None, meaning detach from network
4097
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4098
      # and 'find' the device, which will 'fix' it to match the
4099
      # standalone state
4100
      if rpc.call_blockdev_find(pri_node, dev):
4101
        done += 1
4102
      else:
4103
        warning("Failed to detach drbd %s from network, unusual case" %
4104
                dev.iv_name)
4105

    
4106
    if not done:
4107
      # no detaches succeeded (very unlikely)
4108
      self.cfg.ReleaseDRBDMinors(instance.name)
4109
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4110

    
4111
    # if we managed to detach at least one, we update all the disks of
4112
    # the instance to point to the new secondary
4113
    info("updating instance configuration")
4114
    for dev, _, new_logical_id in iv_names.itervalues():
4115
      dev.logical_id = new_logical_id
4116
      cfg.SetDiskID(dev, pri_node)
4117
    cfg.Update(instance)
4118
    # we can remove now the temp minors as now the new values are
4119
    # written to the config file (and therefore stable)
4120
    self.cfg.ReleaseDRBDMinors(instance.name)
4121

    
4122
    # and now perform the drbd attach
4123
    info("attaching primary drbds to new secondary (standalone => connected)")
4124
    failures = []
4125
    for dev in instance.disks:
4126
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4127
      # since the attach is smart, it's enough to 'find' the device,
4128
      # it will automatically activate the network, if the physical_id
4129
      # is correct
4130
      cfg.SetDiskID(dev, pri_node)
4131
      logging.debug("Disk to attach: %s", dev)
4132
      if not rpc.call_blockdev_find(pri_node, dev):
4133
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4134
                "please do a gnt-instance info to see the status of disks")
4135

    
4136
    # this can fail as the old devices are degraded and _WaitForSync
4137
    # does a combined result over all disks, so we don't check its
4138
    # return value
4139
    self.proc.LogStep(5, steps_total, "sync devices")
4140
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4141

    
4142
    # so check manually all the devices
4143
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4144
      cfg.SetDiskID(dev, pri_node)
4145
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4146
      if is_degr:
4147
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4148

    
4149
    self.proc.LogStep(6, steps_total, "removing old storage")
4150
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4151
      info("remove logical volumes for %s" % name)
4152
      for lv in old_lvs:
4153
        cfg.SetDiskID(lv, old_node)
4154
        if not rpc.call_blockdev_remove(old_node, lv):
4155
          warning("Can't remove LV on old secondary",
4156
                  hint="Cleanup stale volumes by hand")
4157

    
4158
  def Exec(self, feedback_fn):
4159
    """Execute disk replacement.
4160

4161
    This dispatches the disk replacement to the appropriate handler.
4162

4163
    """
4164
    instance = self.instance
4165

    
4166
    # Activate the instance disks if we're replacing them on a down instance
4167
    if instance.status == "down":
4168
      _StartInstanceDisks(self.cfg, instance, True)
4169

    
4170
    if instance.disk_template == constants.DT_DRBD8:
4171
      if self.op.remote_node is None:
4172
        fn = self._ExecD8DiskOnly
4173
      else:
4174
        fn = self._ExecD8Secondary
4175
    else:
4176
      raise errors.ProgrammerError("Unhandled disk replacement case")
4177

    
4178
    ret = fn(feedback_fn)
4179

    
4180
    # Deactivate the instance disks if we're replacing them on a down instance
4181
    if instance.status == "down":
4182
      _SafeShutdownInstanceDisks(instance, self.cfg)
4183

    
4184
    return ret
4185

    
4186

    
4187
class LUGrowDisk(LogicalUnit):
4188
  """Grow a disk of an instance.
4189

4190
  """
4191
  HPATH = "disk-grow"
4192
  HTYPE = constants.HTYPE_INSTANCE
4193
  _OP_REQP = ["instance_name", "disk", "amount"]
4194
  REQ_BGL = False
4195

    
4196
  def ExpandNames(self):
4197
    self._ExpandAndLockInstance()
4198
    self.needed_locks[locking.LEVEL_NODE] = []
4199
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4200

    
4201
  def DeclareLocks(self, level):
4202
    if level == locking.LEVEL_NODE:
4203
      self._LockInstancesNodes()
4204

    
4205
  def BuildHooksEnv(self):
4206
    """Build hooks env.
4207

4208
    This runs on the master, the primary and all the secondaries.
4209

4210
    """
4211
    env = {
4212
      "DISK": self.op.disk,
4213
      "AMOUNT": self.op.amount,
4214
      }
4215
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4216
    nl = [
4217
      self.cfg.GetMasterNode(),
4218
      self.instance.primary_node,
4219
      ]
4220
    return env, nl, nl
4221

    
4222
  def CheckPrereq(self):
4223
    """Check prerequisites.
4224

4225
    This checks that the instance is in the cluster.
4226

4227
    """
4228
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4229
    assert instance is not None, \
4230
      "Cannot retrieve locked instance %s" % self.op.instance_name
4231

    
4232
    self.instance = instance
4233

    
4234
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4235
      raise errors.OpPrereqError("Instance's disk layout does not support"
4236
                                 " growing.")
4237

    
4238
    if instance.FindDisk(self.op.disk) is None:
4239
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4240
                                 (self.op.disk, instance.name))
4241

    
4242
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4243
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4244
    for node in nodenames:
4245
      info = nodeinfo.get(node, None)
4246
      if not info:
4247
        raise errors.OpPrereqError("Cannot get current information"
4248
                                   " from node '%s'" % node)
4249
      vg_free = info.get('vg_free', None)
4250
      if not isinstance(vg_free, int):
4251
        raise errors.OpPrereqError("Can't compute free disk space on"
4252
                                   " node %s" % node)
4253
      if self.op.amount > info['vg_free']:
4254
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4255
                                   " %d MiB available, %d MiB required" %
4256
                                   (node, info['vg_free'], self.op.amount))
4257

    
4258
  def Exec(self, feedback_fn):
4259
    """Execute disk grow.
4260

4261
    """
4262
    instance = self.instance
4263
    disk = instance.FindDisk(self.op.disk)
4264
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4265
      self.cfg.SetDiskID(disk, node)
4266
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4267
      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4268
        raise errors.OpExecError("grow request failed to node %s" % node)
4269
      elif not result[0]:
4270
        raise errors.OpExecError("grow request failed to node %s: %s" %
4271
                                 (node, result[1]))
4272
    disk.RecordGrow(self.op.amount)
4273
    self.cfg.Update(instance)
4274
    return
4275

    
4276

    
4277
class LUQueryInstanceData(NoHooksLU):
4278
  """Query runtime instance data.
4279

4280
  """
4281
  _OP_REQP = ["instances"]
4282
  REQ_BGL = False
4283

    
4284
  def ExpandNames(self):
4285
    self.needed_locks = {}
4286
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4287

    
4288
    if not isinstance(self.op.instances, list):
4289
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4290

    
4291
    if self.op.instances:
4292
      self.wanted_names = []
4293
      for name in self.op.instances:
4294
        full_name = self.cfg.ExpandInstanceName(name)
4295
        if full_name is None:
4296
          raise errors.OpPrereqError("Instance '%s' not known" %
4297
                                     self.op.instance_name)
4298
        self.wanted_names.append(full_name)
4299
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4300
    else:
4301
      self.wanted_names = None
4302
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4303

    
4304
    self.needed_locks[locking.LEVEL_NODE] = []
4305
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4306

    
4307
  def DeclareLocks(self, level):
4308
    if level == locking.LEVEL_NODE:
4309
      self._LockInstancesNodes()
4310

    
4311
  def CheckPrereq(self):
4312
    """Check prerequisites.
4313

4314
    This only checks the optional instance list against the existing names.
4315

4316
    """
4317
    if self.wanted_names is None:
4318
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4319

    
4320
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4321
                             in self.wanted_names]
4322
    return
4323

    
4324
  def _ComputeDiskStatus(self, instance, snode, dev):
4325
    """Compute block device status.
4326

4327
    """
4328
    self.cfg.SetDiskID(dev, instance.primary_node)
4329
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4330
    if dev.dev_type in constants.LDS_DRBD:
4331
      # we change the snode then (otherwise we use the one passed in)
4332
      if dev.logical_id[0] == instance.primary_node:
4333
        snode = dev.logical_id[1]
4334
      else:
4335
        snode = dev.logical_id[0]
4336

    
4337
    if snode:
4338
      self.cfg.SetDiskID(dev, snode)
4339
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4340
    else:
4341
      dev_sstatus = None
4342

    
4343
    if dev.children:
4344
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4345
                      for child in dev.children]
4346
    else:
4347
      dev_children = []
4348

    
4349
    data = {
4350
      "iv_name": dev.iv_name,
4351
      "dev_type": dev.dev_type,
4352
      "logical_id": dev.logical_id,
4353
      "physical_id": dev.physical_id,
4354
      "pstatus": dev_pstatus,
4355
      "sstatus": dev_sstatus,
4356
      "children": dev_children,
4357
      }
4358

    
4359
    return data
4360

    
4361
  def Exec(self, feedback_fn):
4362
    """Gather and return data"""
4363
    result = {}
4364
    for instance in self.wanted_instances:
4365
      remote_info = rpc.call_instance_info(instance.primary_node,
4366
                                                instance.name)
4367
      if remote_info and "state" in remote_info:
4368
        remote_state = "up"
4369
      else:
4370
        remote_state = "down"
4371
      if instance.status == "down":
4372
        config_state = "down"
4373
      else:
4374
        config_state = "up"
4375

    
4376
      disks = [self._ComputeDiskStatus(instance, None, device)
4377
               for device in instance.disks]
4378

    
4379
      idict = {
4380
        "name": instance.name,
4381
        "config_state": config_state,
4382
        "run_state": remote_state,
4383
        "pnode": instance.primary_node,
4384
        "snodes": instance.secondary_nodes,
4385
        "os": instance.os,
4386
        "memory": instance.memory,
4387
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4388
        "disks": disks,
4389
        "vcpus": instance.vcpus,
4390
        }
4391

    
4392
      htkind = self.cfg.GetHypervisorType()
4393
      if htkind == constants.HT_XEN_PVM30:
4394
        idict["kernel_path"] = instance.kernel_path
4395
        idict["initrd_path"] = instance.initrd_path
4396

    
4397
      if htkind == constants.HT_XEN_HVM31:
4398
        idict["hvm_boot_order"] = instance.hvm_boot_order
4399
        idict["hvm_acpi"] = instance.hvm_acpi
4400
        idict["hvm_pae"] = instance.hvm_pae
4401
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4402
        idict["hvm_nic_type"] = instance.hvm_nic_type
4403
        idict["hvm_disk_type"] = instance.hvm_disk_type
4404

    
4405
      if htkind in constants.HTS_REQ_PORT:
4406
        if instance.vnc_bind_address is None:
4407
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4408
        else:
4409
          vnc_bind_address = instance.vnc_bind_address
4410
        if instance.network_port is None:
4411
          vnc_console_port = None
4412
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4413
          vnc_console_port = "%s:%s" % (instance.primary_node,
4414
                                       instance.network_port)
4415
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4416
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4417
                                                   instance.network_port,
4418
                                                   instance.primary_node)
4419
        else:
4420
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4421
                                        instance.network_port)
4422
        idict["vnc_console_port"] = vnc_console_port
4423
        idict["vnc_bind_address"] = vnc_bind_address
4424
        idict["network_port"] = instance.network_port
4425

    
4426
      result[instance.name] = idict
4427

    
4428
    return result
4429

    
4430

    
4431
class LUSetInstanceParams(LogicalUnit):
4432
  """Modifies an instances's parameters.
4433

4434
  """
4435
  HPATH = "instance-modify"
4436
  HTYPE = constants.HTYPE_INSTANCE
4437
  _OP_REQP = ["instance_name"]
4438
  REQ_BGL = False
4439

    
4440
  def ExpandNames(self):
4441
    self._ExpandAndLockInstance()
4442

    
4443
  def BuildHooksEnv(self):
4444
    """Build hooks env.
4445

4446
    This runs on the master, primary and secondaries.
4447

4448
    """
4449
    args = dict()
4450
    if self.mem:
4451
      args['memory'] = self.mem
4452
    if self.vcpus:
4453
      args['vcpus'] = self.vcpus
4454
    if self.do_ip or self.do_bridge or self.mac:
4455
      if self.do_ip:
4456
        ip = self.ip
4457
      else:
4458
        ip = self.instance.nics[0].ip
4459
      if self.bridge:
4460
        bridge = self.bridge
4461
      else:
4462
        bridge = self.instance.nics[0].bridge
4463
      if self.mac:
4464
        mac = self.mac
4465
      else:
4466
        mac = self.instance.nics[0].mac
4467
      args['nics'] = [(ip, bridge, mac)]
4468
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4469
    nl = [self.cfg.GetMasterNode(),
4470
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4471
    return env, nl, nl
4472

    
4473
  def CheckPrereq(self):
4474
    """Check prerequisites.
4475

4476
    This only checks the instance list against the existing names.
4477

4478
    """
4479
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4480
    # a separate CheckArguments function, if we implement one, so the operation
4481
    # can be aborted without waiting for any lock, should it have an error...
4482
    self.mem = getattr(self.op, "mem", None)
4483
    self.vcpus = getattr(self.op, "vcpus", None)
4484
    self.ip = getattr(self.op, "ip", None)
4485
    self.mac = getattr(self.op, "mac", None)
4486
    self.bridge = getattr(self.op, "bridge", None)
4487
    self.kernel_path = getattr(self.op, "kernel_path", None)
4488
    self.initrd_path = getattr(self.op, "initrd_path", None)
4489
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4490
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4491
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4492
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4493
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4494
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4495
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4496
    self.force = getattr(self.op, "force", None)
4497
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4498
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4499
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4500
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4501
    if all_parms.count(None) == len(all_parms):
4502
      raise errors.OpPrereqError("No changes submitted")
4503
    if self.mem is not None:
4504
      try:
4505
        self.mem = int(self.mem)
4506
      except ValueError, err:
4507
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4508
    if self.vcpus is not None:
4509
      try:
4510
        self.vcpus = int(self.vcpus)
4511
      except ValueError, err:
4512
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4513
    if self.ip is not None:
4514
      self.do_ip = True
4515
      if self.ip.lower() == "none":
4516
        self.ip = None
4517
      else:
4518
        if not utils.IsValidIP(self.ip):
4519
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4520
    else:
4521
      self.do_ip = False
4522
    self.do_bridge = (self.bridge is not None)
4523
    if self.mac is not None:
4524
      if self.cfg.IsMacInUse(self.mac):
4525
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4526
                                   self.mac)
4527
      if not utils.IsValidMac(self.mac):
4528
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4529

    
4530
    if self.kernel_path is not None:
4531
      self.do_kernel_path = True
4532
      if self.kernel_path == constants.VALUE_NONE:
4533
        raise errors.OpPrereqError("Can't set instance to no kernel")
4534

    
4535
      if self.kernel_path != constants.VALUE_DEFAULT:
4536
        if not os.path.isabs(self.kernel_path):
4537
          raise errors.OpPrereqError("The kernel path must be an absolute"
4538
                                    " filename")
4539
    else:
4540
      self.do_kernel_path = False
4541

    
4542
    if self.initrd_path is not None:
4543
      self.do_initrd_path = True
4544
      if self.initrd_path not in (constants.VALUE_NONE,
4545
                                  constants.VALUE_DEFAULT):
4546
        if not os.path.isabs(self.initrd_path):
4547
          raise errors.OpPrereqError("The initrd path must be an absolute"
4548
                                    " filename")
4549
    else:
4550
      self.do_initrd_path = False
4551

    
4552
    # boot order verification
4553
    if self.hvm_boot_order is not None:
4554
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4555
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4556
          raise errors.OpPrereqError("invalid boot order specified,"
4557
                                     " must be one or more of [acdn]"
4558
                                     " or 'default'")
4559

    
4560
    # hvm_cdrom_image_path verification
4561
    if self.op.hvm_cdrom_image_path is not None:
4562
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4563
              self.op.hvm_cdrom_image_path.lower() == "none"):
4564
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4565
                                   " be an absolute path or None, not %s" %
4566
                                   self.op.hvm_cdrom_image_path)
4567
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4568
              self.op.hvm_cdrom_image_path.lower() == "none"):
4569
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4570
                                   " regular file or a symlink pointing to"
4571
                                   " an existing regular file, not %s" %
4572
                                   self.op.hvm_cdrom_image_path)
4573

    
4574
    # vnc_bind_address verification
4575
    if self.op.vnc_bind_address is not None:
4576
      if not utils.IsValidIP(self.op.vnc_bind_address):
4577
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4578
                                   " like a valid IP address" %
4579
                                   self.op.vnc_bind_address)
4580

    
4581
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4582
    assert self.instance is not None, \
4583
      "Cannot retrieve locked instance %s" % self.op.instance_name
4584
    self.warn = []
4585
    if self.mem is not None and not self.force:
4586
      pnode = self.instance.primary_node
4587
      nodelist = [pnode]
4588
      nodelist.extend(instance.secondary_nodes)
4589
      instance_info = rpc.call_instance_info(pnode, instance.name)
4590
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4591

    
4592
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4593
        # Assume the primary node is unreachable and go ahead
4594
        self.warn.append("Can't get info from primary node %s" % pnode)
4595
      else:
4596
        if instance_info:
4597
          current_mem = instance_info['memory']
4598
        else:
4599
          # Assume instance not running
4600
          # (there is a slight race condition here, but it's not very probable,
4601
          # and we have no other way to check)
4602
          current_mem = 0
4603
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4604
        if miss_mem > 0:
4605
          raise errors.OpPrereqError("This change will prevent the instance"
4606
                                     " from starting, due to %d MB of memory"
4607
                                     " missing on its primary node" % miss_mem)
4608

    
4609
      for node in instance.secondary_nodes:
4610
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4611
          self.warn.append("Can't get info from secondary node %s" % node)
4612
        elif self.mem > nodeinfo[node]['memory_free']:
4613
          self.warn.append("Not enough memory to failover instance to secondary"
4614
                           " node %s" % node)
4615

    
4616
    # Xen HVM device type checks
4617
    if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
4618
      if self.op.hvm_nic_type is not None:
4619
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4620
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4621
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4622
      if self.op.hvm_disk_type is not None:
4623
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4624
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4625
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4626

    
4627
    return
4628

    
4629
  def Exec(self, feedback_fn):
4630
    """Modifies an instance.
4631

4632
    All parameters take effect only at the next restart of the instance.
4633
    """
4634
    # Process here the warnings from CheckPrereq, as we don't have a
4635
    # feedback_fn there.
4636
    for warn in self.warn:
4637
      feedback_fn("WARNING: %s" % warn)
4638

    
4639
    result = []
4640
    instance = self.instance
4641
    if self.mem:
4642
      instance.memory = self.mem
4643
      result.append(("mem", self.mem))
4644
    if self.vcpus:
4645
      instance.vcpus = self.vcpus
4646
      result.append(("vcpus",  self.vcpus))
4647
    if self.do_ip:
4648
      instance.nics[0].ip = self.ip
4649
      result.append(("ip", self.ip))
4650
    if self.bridge:
4651
      instance.nics[0].bridge = self.bridge
4652
      result.append(("bridge", self.bridge))
4653
    if self.mac:
4654
      instance.nics[0].mac = self.mac
4655
      result.append(("mac", self.mac))
4656
    if self.do_kernel_path:
4657
      instance.kernel_path = self.kernel_path
4658
      result.append(("kernel_path", self.kernel_path))
4659
    if self.do_initrd_path:
4660
      instance.initrd_path = self.initrd_path
4661
      result.append(("initrd_path", self.initrd_path))
4662
    if self.hvm_boot_order:
4663
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4664
        instance.hvm_boot_order = None
4665
      else:
4666
        instance.hvm_boot_order = self.hvm_boot_order
4667
      result.append(("hvm_boot_order", self.hvm_boot_order))
4668
    if self.hvm_acpi is not None:
4669
      instance.hvm_acpi = self.hvm_acpi
4670
      result.append(("hvm_acpi", self.hvm_acpi))
4671
    if self.hvm_pae is not None:
4672
      instance.hvm_pae = self.hvm_pae
4673
      result.append(("hvm_pae", self.hvm_pae))
4674
    if self.hvm_nic_type is not None:
4675
      instance.hvm_nic_type = self.hvm_nic_type
4676
      result.append(("hvm_nic_type", self.hvm_nic_type))
4677
    if self.hvm_disk_type is not None:
4678
      instance.hvm_disk_type = self.hvm_disk_type
4679
      result.append(("hvm_disk_type", self.hvm_disk_type))
4680
    if self.hvm_cdrom_image_path:
4681
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4682
        instance.hvm_cdrom_image_path = None
4683
      else:
4684
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4685
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4686
    if self.vnc_bind_address:
4687
      instance.vnc_bind_address = self.vnc_bind_address
4688
      result.append(("vnc_bind_address", self.vnc_bind_address))
4689

    
4690
    self.cfg.Update(instance)
4691

    
4692
    return result
4693

    
4694

    
4695
class LUQueryExports(NoHooksLU):
4696
  """Query the exports list
4697

4698
  """
4699
  _OP_REQP = ['nodes']
4700
  REQ_BGL = False
4701

    
4702
  def ExpandNames(self):
4703
    self.needed_locks = {}
4704
    self.share_locks[locking.LEVEL_NODE] = 1
4705
    if not self.op.nodes:
4706
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4707
    else:
4708
      self.needed_locks[locking.LEVEL_NODE] = \
4709
        _GetWantedNodes(self, self.op.nodes)
4710

    
4711
  def CheckPrereq(self):
4712
    """Check prerequisites.
4713

4714
    """
4715
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4716

    
4717
  def Exec(self, feedback_fn):
4718
    """Compute the list of all the exported system images.
4719

4720
    Returns:
4721
      a dictionary with the structure node->(export-list)
4722
      where export-list is a list of the instances exported on
4723
      that node.
4724

4725
    """
4726
    return rpc.call_export_list(self.nodes)
4727

    
4728

    
4729
class LUExportInstance(LogicalUnit):
4730
  """Export an instance to an image in the cluster.
4731

4732
  """
4733
  HPATH = "instance-export"
4734
  HTYPE = constants.HTYPE_INSTANCE
4735
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4736
  REQ_BGL = False
4737

    
4738
  def ExpandNames(self):
4739
    self._ExpandAndLockInstance()
4740
    # FIXME: lock only instance primary and destination node
4741
    #
4742
    # Sad but true, for now we have do lock all nodes, as we don't know where
4743
    # the previous export might be, and and in this LU we search for it and
4744
    # remove it from its current node. In the future we could fix this by:
4745
    #  - making a tasklet to search (share-lock all), then create the new one,
4746
    #    then one to remove, after
4747
    #  - removing the removal operation altoghether
4748
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4749

    
4750
  def DeclareLocks(self, level):
4751
    """Last minute lock declaration."""
4752
    # All nodes are locked anyway, so nothing to do here.
4753

    
4754
  def BuildHooksEnv(self):
4755
    """Build hooks env.
4756

4757
    This will run on the master, primary node and target node.
4758

4759
    """
4760
    env = {
4761
      "EXPORT_NODE": self.op.target_node,
4762
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4763
      }
4764
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4765
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4766
          self.op.target_node]
4767
    return env, nl, nl
4768

    
4769
  def CheckPrereq(self):
4770
    """Check prerequisites.
4771

4772
    This checks that the instance and node names are valid.
4773

4774
    """
4775
    instance_name = self.op.instance_name
4776
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4777
    assert self.instance is not None, \
4778
          "Cannot retrieve locked instance %s" % self.op.instance_name
4779

    
4780
    self.dst_node = self.cfg.GetNodeInfo(
4781
      self.cfg.ExpandNodeName(self.op.target_node))
4782

    
4783
    assert self.dst_node is not None, \
4784
          "Cannot retrieve locked node %s" % self.op.target_node
4785

    
4786
    # instance disk type verification
4787
    for disk in self.instance.disks:
4788
      if disk.dev_type == constants.LD_FILE:
4789
        raise errors.OpPrereqError("Export not supported for instances with"
4790
                                   " file-based disks")
4791

    
4792
  def Exec(self, feedback_fn):
4793
    """Export an instance to an image in the cluster.
4794

4795
    """
4796
    instance = self.instance
4797
    dst_node = self.dst_node
4798
    src_node = instance.primary_node
4799
    if self.op.shutdown:
4800
      # shutdown the instance, but not the disks
4801
      if not rpc.call_instance_shutdown(src_node, instance):
4802
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4803
                                 (instance.name, src_node))
4804

    
4805
    vgname = self.cfg.GetVGName()
4806

    
4807
    snap_disks = []
4808

    
4809
    try:
4810
      for disk in instance.disks:
4811
        if disk.iv_name == "sda":
4812
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4813
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4814

    
4815
          if not new_dev_name:
4816
            logger.Error("could not snapshot block device %s on node %s" %
4817
                         (disk.logical_id[1], src_node))
4818
          else:
4819
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4820
                                      logical_id=(vgname, new_dev_name),
4821
                                      physical_id=(vgname, new_dev_name),
4822
                                      iv_name=disk.iv_name)
4823
            snap_disks.append(new_dev)
4824

    
4825
    finally:
4826
      if self.op.shutdown and instance.status == "up":
4827
        if not rpc.call_instance_start(src_node, instance, None):
4828
          _ShutdownInstanceDisks(instance, self.cfg)
4829
          raise errors.OpExecError("Could not start instance")
4830

    
4831
    # TODO: check for size
4832

    
4833
    for dev in snap_disks:
4834
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4835
        logger.Error("could not export block device %s from node %s to node %s"
4836
                     % (dev.logical_id[1], src_node, dst_node.name))
4837
      if not rpc.call_blockdev_remove(src_node, dev):
4838
        logger.Error("could not remove snapshot block device %s from node %s" %
4839
                     (dev.logical_id[1], src_node))
4840

    
4841
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4842
      logger.Error("could not finalize export for instance %s on node %s" %
4843
                   (instance.name, dst_node.name))
4844

    
4845
    nodelist = self.cfg.GetNodeList()
4846
    nodelist.remove(dst_node.name)
4847

    
4848
    # on one-node clusters nodelist will be empty after the removal
4849
    # if we proceed the backup would be removed because OpQueryExports
4850
    # substitutes an empty list with the full cluster node list.
4851
    if nodelist:
4852
      exportlist = rpc.call_export_list(nodelist)
4853
      for node in exportlist:
4854
        if instance.name in exportlist[node]:
4855
          if not rpc.call_export_remove(node, instance.name):
4856
            logger.Error("could not remove older export for instance %s"
4857
                         " on node %s" % (instance.name, node))
4858

    
4859

    
4860
class LURemoveExport(NoHooksLU):
4861
  """Remove exports related to the named instance.
4862

4863
  """
4864
  _OP_REQP = ["instance_name"]
4865
  REQ_BGL = False
4866

    
4867
  def ExpandNames(self):
4868
    self.needed_locks = {}
4869
    # We need all nodes to be locked in order for RemoveExport to work, but we
4870
    # don't need to lock the instance itself, as nothing will happen to it (and
4871
    # we can remove exports also for a removed instance)
4872
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4873

    
4874
  def CheckPrereq(self):
4875
    """Check prerequisites.
4876
    """
4877
    pass
4878

    
4879
  def Exec(self, feedback_fn):
4880
    """Remove any export.
4881

4882
    """
4883
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4884
    # If the instance was not found we'll try with the name that was passed in.
4885
    # This will only work if it was an FQDN, though.
4886
    fqdn_warn = False
4887
    if not instance_name:
4888
      fqdn_warn = True
4889
      instance_name = self.op.instance_name
4890

    
4891
    exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4892
    found = False
4893
    for node in exportlist:
4894
      if instance_name in exportlist[node]:
4895
        found = True
4896
        if not rpc.call_export_remove(node, instance_name):
4897
          logger.Error("could not remove export for instance %s"
4898
                       " on node %s" % (instance_name, node))
4899

    
4900
    if fqdn_warn and not found:
4901
      feedback_fn("Export not found. If trying to remove an export belonging"
4902
                  " to a deleted instance please use its Fully Qualified"
4903
                  " Domain Name.")
4904

    
4905

    
4906
class TagsLU(NoHooksLU):
4907
  """Generic tags LU.
4908

4909
  This is an abstract class which is the parent of all the other tags LUs.
4910

4911
  """
4912

    
4913
  def ExpandNames(self):
4914
    self.needed_locks = {}
4915
    if self.op.kind == constants.TAG_NODE:
4916
      name = self.cfg.ExpandNodeName(self.op.name)
4917
      if name is None:
4918
        raise errors.OpPrereqError("Invalid node name (%s)" %
4919
                                   (self.op.name,))
4920
      self.op.name = name
4921
      self.needed_locks[locking.LEVEL_NODE] = name
4922
    elif self.op.kind == constants.TAG_INSTANCE:
4923
      name = self.cfg.ExpandInstanceName(self.op.name)
4924
      if name is None:
4925
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4926
                                   (self.op.name,))
4927
      self.op.name = name
4928
      self.needed_locks[locking.LEVEL_INSTANCE] = name
4929

    
4930
  def CheckPrereq(self):
4931
    """Check prerequisites.
4932

4933
    """
4934
    if self.op.kind == constants.TAG_CLUSTER:
4935
      self.target = self.cfg.GetClusterInfo()
4936
    elif self.op.kind == constants.TAG_NODE:
4937
      self.target = self.cfg.GetNodeInfo(self.op.name)
4938
    elif self.op.kind == constants.TAG_INSTANCE:
4939
      self.target = self.cfg.GetInstanceInfo(self.op.name)
4940
    else:
4941
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4942
                                 str(self.op.kind))
4943

    
4944

    
4945
class LUGetTags(TagsLU):
4946
  """Returns the tags of a given object.
4947

4948
  """
4949
  _OP_REQP = ["kind", "name"]
4950
  REQ_BGL = False
4951

    
4952
  def Exec(self, feedback_fn):
4953
    """Returns the tag list.
4954

4955
    """
4956
    return list(self.target.GetTags())
4957

    
4958

    
4959
class LUSearchTags(NoHooksLU):
4960
  """Searches the tags for a given pattern.
4961

4962
  """
4963
  _OP_REQP = ["pattern"]
4964
  REQ_BGL = False
4965

    
4966
  def ExpandNames(self):
4967
    self.needed_locks = {}
4968

    
4969
  def CheckPrereq(self):
4970
    """Check prerequisites.
4971

4972
    This checks the pattern passed for validity by compiling it.
4973

4974
    """
4975
    try:
4976
      self.re = re.compile(self.op.pattern)
4977
    except re.error, err:
4978
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4979
                                 (self.op.pattern, err))
4980

    
4981
  def Exec(self, feedback_fn):
4982
    """Returns the tag list.
4983

4984
    """
4985
    cfg = self.cfg
4986
    tgts = [("/cluster", cfg.GetClusterInfo())]
4987
    ilist = cfg.GetAllInstancesInfo().values()
4988
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4989
    nlist = cfg.GetAllNodesInfo().values()
4990
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4991
    results = []
4992
    for path, target in tgts:
4993
      for tag in target.GetTags():
4994
        if self.re.search(tag):
4995
          results.append((path, tag))
4996
    return results
4997

    
4998

    
4999
class LUAddTags(TagsLU):
5000
  """Sets a tag on a given object.
5001

5002
  """
5003
  _OP_REQP = ["kind", "name", "tags"]
5004
  REQ_BGL = False
5005

    
5006
  def CheckPrereq(self):
5007
    """Check prerequisites.
5008

5009
    This checks the type and length of the tag name and value.
5010

5011
    """
5012
    TagsLU.CheckPrereq(self)
5013
    for tag in self.op.tags:
5014
      objects.TaggableObject.ValidateTag(tag)
5015

    
5016
  def Exec(self, feedback_fn):
5017
    """Sets the tag.
5018

5019
    """
5020
    try:
5021
      for tag in self.op.tags:
5022
        self.target.AddTag(tag)
5023
    except errors.TagError, err:
5024
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5025
    try:
5026
      self.cfg.Update(self.target)
5027
    except errors.ConfigurationError:
5028
      raise errors.OpRetryError("There has been a modification to the"
5029
                                " config file and the operation has been"
5030
                                " aborted. Please retry.")
5031

    
5032

    
5033
class LUDelTags(TagsLU):
5034
  """Delete a list of tags from a given object.
5035

5036
  """
5037
  _OP_REQP = ["kind", "name", "tags"]
5038
  REQ_BGL = False
5039

    
5040
  def CheckPrereq(self):
5041
    """Check prerequisites.
5042

5043
    This checks that we have the given tag.
5044

5045
    """
5046
    TagsLU.CheckPrereq(self)
5047
    for tag in self.op.tags:
5048
      objects.TaggableObject.ValidateTag(tag)
5049
    del_tags = frozenset(self.op.tags)
5050
    cur_tags = self.target.GetTags()
5051
    if not del_tags <= cur_tags:
5052
      diff_tags = del_tags - cur_tags
5053
      diff_names = ["'%s'" % tag for tag in diff_tags]
5054
      diff_names.sort()
5055
      raise errors.OpPrereqError("Tag(s) %s not found" %
5056
                                 (",".join(diff_names)))
5057

    
5058
  def Exec(self, feedback_fn):
5059
    """Remove the tag from the object.
5060

5061
    """
5062
    for tag in self.op.tags:
5063
      self.target.RemoveTag(tag)
5064
    try:
5065
      self.cfg.Update(self.target)
5066
    except errors.ConfigurationError:
5067
      raise errors.OpRetryError("There has been a modification to the"
5068
                                " config file and the operation has been"
5069
                                " aborted. Please retry.")
5070

    
5071

    
5072
class LUTestDelay(NoHooksLU):
5073
  """Sleep for a specified amount of time.
5074

5075
  This LU sleeps on the master and/or nodes for a specified amount of
5076
  time.
5077

5078
  """
5079
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5080
  REQ_BGL = False
5081

    
5082
  def ExpandNames(self):
5083
    """Expand names and set required locks.
5084

5085
    This expands the node list, if any.
5086

5087
    """
5088
    self.needed_locks = {}
5089
    if self.op.on_nodes:
5090
      # _GetWantedNodes can be used here, but is not always appropriate to use
5091
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5092
      # more information.
5093
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5094
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5095

    
5096
  def CheckPrereq(self):
5097
    """Check prerequisites.
5098

5099
    """
5100

    
5101
  def Exec(self, feedback_fn):
5102
    """Do the actual sleep.
5103

5104
    """
5105
    if self.op.on_master:
5106
      if not utils.TestDelay(self.op.duration):
5107
        raise errors.OpExecError("Error during master delay test")
5108
    if self.op.on_nodes:
5109
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5110
      if not result:
5111
        raise errors.OpExecError("Complete failure from rpc call")
5112
      for node, node_result in result.items():
5113
        if not node_result:
5114
          raise errors.OpExecError("Failure during rpc call to node %s,"
5115
                                   " result: %s" % (node, node_result))
5116

    
5117

    
5118
class IAllocator(object):
5119
  """IAllocator framework.
5120

5121
  An IAllocator instance has three sets of attributes:
5122
    - cfg that is needed to query the cluster
5123
    - input data (all members of the _KEYS class attribute are required)
5124
    - four buffer attributes (in|out_data|text), that represent the
5125
      input (to the external script) in text and data structure format,
5126
      and the output from it, again in two formats
5127
    - the result variables from the script (success, info, nodes) for
5128
      easy usage
5129

5130
  """
5131
  _ALLO_KEYS = [
5132
    "mem_size", "disks", "disk_template",
5133
    "os", "tags", "nics", "vcpus",
5134
    ]
5135
  _RELO_KEYS = [
5136
    "relocate_from",
5137
    ]
5138

    
5139
  def __init__(self, cfg, mode, name, **kwargs):
5140
    self.cfg = cfg
5141
    # init buffer variables
5142
    self.in_text = self.out_text = self.in_data = self.out_data = None
5143
    # init all input fields so that pylint is happy
5144
    self.mode = mode
5145
    self.name = name
5146
    self.mem_size = self.disks = self.disk_template = None
5147
    self.os = self.tags = self.nics = self.vcpus = None
5148
    self.relocate_from = None
5149
    # computed fields
5150
    self.required_nodes = None
5151
    # init result fields
5152
    self.success = self.info = self.nodes = None
5153
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5154
      keyset = self._ALLO_KEYS
5155
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5156
      keyset = self._RELO_KEYS
5157
    else:
5158
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5159
                                   " IAllocator" % self.mode)
5160
    for key in kwargs:
5161
      if key not in keyset:
5162
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5163
                                     " IAllocator" % key)
5164
      setattr(self, key, kwargs[key])
5165
    for key in keyset:
5166
      if key not in kwargs:
5167
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5168
                                     " IAllocator" % key)
5169
    self._BuildInputData()
5170

    
5171
  def _ComputeClusterData(self):
5172
    """Compute the generic allocator input data.
5173

5174
    This is the data that is independent of the actual operation.
5175

5176
    """
5177
    cfg = self.cfg
5178
    # cluster data
5179
    data = {
5180
      "version": 1,
5181
      "cluster_name": self.cfg.GetClusterName(),
5182
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5183
      "hypervisor_type": self.cfg.GetHypervisorType(),
5184
      # we don't have job IDs
5185
      }
5186

    
5187
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5188

    
5189
    # node data
5190
    node_results = {}
5191
    node_list = cfg.GetNodeList()
5192
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5193
    for nname in node_list:
5194
      ninfo = cfg.GetNodeInfo(nname)
5195
      if nname not in node_data or not isinstance(node_data[nname], dict):
5196
        raise errors.OpExecError("Can't get data for node %s" % nname)
5197
      remote_info = node_data[nname]
5198
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5199
                   'vg_size', 'vg_free', 'cpu_total']:
5200
        if attr not in remote_info:
5201
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5202
                                   (nname, attr))
5203
        try:
5204
          remote_info[attr] = int(remote_info[attr])
5205
        except ValueError, err:
5206
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5207
                                   " %s" % (nname, attr, str(err)))
5208
      # compute memory used by primary instances
5209
      i_p_mem = i_p_up_mem = 0
5210
      for iinfo in i_list:
5211
        if iinfo.primary_node == nname:
5212
          i_p_mem += iinfo.memory
5213
          if iinfo.status == "up":
5214
            i_p_up_mem += iinfo.memory
5215

    
5216
      # compute memory used by instances
5217
      pnr = {
5218
        "tags": list(ninfo.GetTags()),
5219
        "total_memory": remote_info['memory_total'],
5220
        "reserved_memory": remote_info['memory_dom0'],
5221
        "free_memory": remote_info['memory_free'],
5222
        "i_pri_memory": i_p_mem,
5223
        "i_pri_up_memory": i_p_up_mem,
5224
        "total_disk": remote_info['vg_size'],
5225
        "free_disk": remote_info['vg_free'],
5226
        "primary_ip": ninfo.primary_ip,
5227
        "secondary_ip": ninfo.secondary_ip,
5228
        "total_cpus": remote_info['cpu_total'],
5229
        }
5230
      node_results[nname] = pnr
5231
    data["nodes"] = node_results
5232

    
5233
    # instance data
5234
    instance_data = {}
5235
    for iinfo in i_list:
5236
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5237
                  for n in iinfo.nics]
5238
      pir = {
5239
        "tags": list(iinfo.GetTags()),
5240
        "should_run": iinfo.status == "up",
5241
        "vcpus": iinfo.vcpus,
5242
        "memory": iinfo.memory,
5243
        "os": iinfo.os,
5244
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5245
        "nics": nic_data,
5246
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5247
        "disk_template": iinfo.disk_template,
5248
        }
5249
      instance_data[iinfo.name] = pir
5250

    
5251
    data["instances"] = instance_data
5252

    
5253
    self.in_data = data
5254

    
5255
  def _AddNewInstance(self):
5256
    """Add new instance data to allocator structure.
5257

5258
    This in combination with _AllocatorGetClusterData will create the
5259
    correct structure needed as input for the allocator.
5260

5261
    The checks for the completeness of the opcode must have already been
5262
    done.
5263

5264
    """
5265
    data = self.in_data
5266
    if len(self.disks) != 2:
5267
      raise errors.OpExecError("Only two-disk configurations supported")
5268

    
5269
    disk_space = _ComputeDiskSize(self.disk_template,
5270
                                  self.disks[0]["size"], self.disks[1]["size"])
5271

    
5272
    if self.disk_template in constants.DTS_NET_MIRROR:
5273
      self.required_nodes = 2
5274
    else:
5275
      self.required_nodes = 1
5276
    request = {
5277
      "type": "allocate",
5278
      "name": self.name,
5279
      "disk_template": self.disk_template,
5280
      "tags": self.tags,
5281
      "os": self.os,
5282
      "vcpus": self.vcpus,
5283
      "memory": self.mem_size,
5284
      "disks": self.disks,
5285
      "disk_space_total": disk_space,
5286
      "nics": self.nics,
5287
      "required_nodes": self.required_nodes,
5288
      }
5289
    data["request"] = request
5290

    
5291
  def _AddRelocateInstance(self):
5292
    """Add relocate instance data to allocator structure.
5293

5294
    This in combination with _IAllocatorGetClusterData will create the
5295
    correct structure needed as input for the allocator.
5296

5297
    The checks for the completeness of the opcode must have already been
5298
    done.
5299

5300
    """
5301
    instance = self.cfg.GetInstanceInfo(self.name)
5302
    if instance is None:
5303
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5304
                                   " IAllocator" % self.name)
5305

    
5306
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5307
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5308

    
5309
    if len(instance.secondary_nodes) != 1:
5310
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5311

    
5312
    self.required_nodes = 1
5313

    
5314
    disk_space = _ComputeDiskSize(instance.disk_template,
5315
                                  instance.disks[0].size,
5316
                                  instance.disks[1].size)
5317

    
5318
    request = {
5319
      "type": "relocate",
5320
      "name": self.name,
5321
      "disk_space_total": disk_space,
5322
      "required_nodes": self.required_nodes,
5323
      "relocate_from": self.relocate_from,
5324
      }
5325
    self.in_data["request"] = request
5326

    
5327
  def _BuildInputData(self):
5328
    """Build input data structures.
5329

5330
    """
5331
    self._ComputeClusterData()
5332

    
5333
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5334
      self._AddNewInstance()
5335
    else:
5336
      self._AddRelocateInstance()
5337

    
5338
    self.in_text = serializer.Dump(self.in_data)
5339

    
5340
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5341
    """Run an instance allocator and return the results.
5342

5343
    """
5344
    data = self.in_text
5345

    
5346
    result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
5347

    
5348
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5349
      raise errors.OpExecError("Invalid result from master iallocator runner")
5350

    
5351
    rcode, stdout, stderr, fail = result
5352

    
5353
    if rcode == constants.IARUN_NOTFOUND:
5354
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5355
    elif rcode == constants.IARUN_FAILURE:
5356
      raise errors.OpExecError("Instance allocator call failed: %s,"
5357
                               " output: %s" % (fail, stdout+stderr))
5358
    self.out_text = stdout
5359
    if validate:
5360
      self._ValidateResult()
5361

    
5362
  def _ValidateResult(self):
5363
    """Process the allocator results.
5364

5365
    This will process and if successful save the result in
5366
    self.out_data and the other parameters.
5367

5368
    """
5369
    try:
5370
      rdict = serializer.Load(self.out_text)
5371
    except Exception, err:
5372
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5373

    
5374
    if not isinstance(rdict, dict):
5375
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5376

    
5377
    for key in "success", "info", "nodes":
5378
      if key not in rdict:
5379
        raise errors.OpExecError("Can't parse iallocator results:"
5380
                                 " missing key '%s'" % key)
5381
      setattr(self, key, rdict[key])
5382

    
5383
    if not isinstance(rdict["nodes"], list):
5384
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5385
                               " is not a list")
5386
    self.out_data = rdict
5387

    
5388

    
5389
class LUTestAllocator(NoHooksLU):
5390
  """Run allocator tests.
5391

5392
  This LU runs the allocator tests
5393

5394
  """
5395
  _OP_REQP = ["direction", "mode", "name"]
5396

    
5397
  def CheckPrereq(self):
5398
    """Check prerequisites.
5399

5400
    This checks the opcode parameters depending on the director and mode test.
5401

5402
    """
5403
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5404
      for attr in ["name", "mem_size", "disks", "disk_template",
5405
                   "os", "tags", "nics", "vcpus"]:
5406
        if not hasattr(self.op, attr):
5407
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5408
                                     attr)
5409
      iname = self.cfg.ExpandInstanceName(self.op.name)
5410
      if iname is not None:
5411
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5412
                                   iname)
5413
      if not isinstance(self.op.nics, list):
5414
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5415
      for row in self.op.nics:
5416
        if (not isinstance(row, dict) or
5417
            "mac" not in row or
5418
            "ip" not in row or
5419
            "bridge" not in row):
5420
          raise errors.OpPrereqError("Invalid contents of the"
5421
                                     " 'nics' parameter")
5422
      if not isinstance(self.op.disks, list):
5423
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5424
      if len(self.op.disks) != 2:
5425
        raise errors.OpPrereqError("Only two-disk configurations supported")
5426
      for row in self.op.disks:
5427
        if (not isinstance(row, dict) or
5428
            "size" not in row or
5429
            not isinstance(row["size"], int) or
5430
            "mode" not in row or
5431
            row["mode"] not in ['r', 'w']):
5432
          raise errors.OpPrereqError("Invalid contents of the"
5433
                                     " 'disks' parameter")
5434
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5435
      if not hasattr(self.op, "name"):
5436
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5437
      fname = self.cfg.ExpandInstanceName(self.op.name)
5438
      if fname is None:
5439
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5440
                                   self.op.name)
5441
      self.op.name = fname
5442
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5443
    else:
5444
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5445
                                 self.op.mode)
5446

    
5447
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5448
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5449
        raise errors.OpPrereqError("Missing allocator name")
5450
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5451
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5452
                                 self.op.direction)
5453

    
5454
  def Exec(self, feedback_fn):
5455
    """Run the allocator test.
5456

5457
    """
5458
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5459
      ial = IAllocator(self.cfg,
5460
                       mode=self.op.mode,
5461
                       name=self.op.name,
5462
                       mem_size=self.op.mem_size,
5463
                       disks=self.op.disks,
5464
                       disk_template=self.op.disk_template,
5465
                       os=self.op.os,
5466
                       tags=self.op.tags,
5467
                       nics=self.op.nics,
5468
                       vcpus=self.op.vcpus,
5469
                       )
5470
    else:
5471
      ial = IAllocator(self.cfg,
5472
                       mode=self.op.mode,
5473
                       name=self.op.name,
5474
                       relocate_from=list(self.relocate_from),
5475
                       )
5476

    
5477
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5478
      result = ial.in_text
5479
    else:
5480
      ial.Run(self.op.allocator, validate=False)
5481
      result = ial.out_text
5482
    return result