Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ c9e5c064

History | View | Annotate | Download (172.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    # Used to force good behavior when calling helper functions
86
    self.recalculate_locks = {}
87
    self.__ssh = None
88

    
89
    for attr_name in self._OP_REQP:
90
      attr_val = getattr(op, attr_name, None)
91
      if attr_val is None:
92
        raise errors.OpPrereqError("Required parameter '%s' missing" %
93
                                   attr_name)
94

    
95
    if not self.cfg.IsCluster():
96
      raise errors.OpPrereqError("Cluster not initialized yet,"
97
                                 " use 'gnt-cluster init' first.")
98
    if self.REQ_MASTER:
99
      master = sstore.GetMasterNode()
100
      if master != utils.HostInfo().name:
101
        raise errors.OpPrereqError("Commands must be run on the master"
102
                                   " node %s" % master)
103

    
104
  def __GetSSH(self):
105
    """Returns the SshRunner object
106

107
    """
108
    if not self.__ssh:
109
      self.__ssh = ssh.SshRunner(self.sstore)
110
    return self.__ssh
111

    
112
  ssh = property(fget=__GetSSH)
113

    
114
  def ExpandNames(self):
115
    """Expand names for this LU.
116

117
    This method is called before starting to execute the opcode, and it should
118
    update all the parameters of the opcode to their canonical form (e.g. a
119
    short node name must be fully expanded after this method has successfully
120
    completed). This way locking, hooks, logging, ecc. can work correctly.
121

122
    LUs which implement this method must also populate the self.needed_locks
123
    member, as a dict with lock levels as keys, and a list of needed lock names
124
    as values. Rules:
125
      - Use an empty dict if you don't need any lock
126
      - If you don't need any lock at a particular level omit that level
127
      - Don't put anything for the BGL level
128
      - If you want all locks at a level use None as a value
129
        (this reflects what LockSet does, and will be replaced before
130
        CheckPrereq with the full list of nodes that have been locked)
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: None,
140
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265
  def _LockInstancesNodes(self):
266
    """Helper function to declare instances' nodes for locking.
267

268
    This function should be called after locking one or more instances to lock
269
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270
    with all primary or secondary nodes for instances already locked and
271
    present in self.needed_locks[locking.LEVEL_INSTANCE].
272

273
    It should be called from DeclareLocks, and for safety only works if
274
    self.recalculate_locks[locking.LEVEL_NODE] is set.
275

276
    In the future it may grow parameters to just lock some instance's nodes, or
277
    to just lock primaries or secondary nodes, if needed.
278

279
    If should be called in DeclareLocks in a way similar to:
280

281
    if level == locking.LEVEL_NODE:
282
      self._LockInstancesNodes()
283

284
    """
285
    assert locking.LEVEL_NODE in self.recalculate_locks, \
286
      "_LockInstancesNodes helper function called with no nodes to recalculate"
287

    
288
    # TODO: check if we're really been called with the instance locks held
289

    
290
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
291
    # future we might want to have different behaviors depending on the value
292
    # of self.recalculate_locks[locking.LEVEL_NODE]
293
    wanted_nodes = []
294
    for instance_name in self.needed_locks[locking.LEVEL_INSTANCE]:
295
      instance = self.context.cfg.GetInstanceInfo(instance_name)
296
      wanted_nodes.append(instance.primary_node)
297
      wanted_nodes.extend(instance.secondary_nodes)
298
    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
299

    
300
    del self.recalculate_locks[locking.LEVEL_NODE]
301

    
302

    
303
class NoHooksLU(LogicalUnit):
304
  """Simple LU which runs no hooks.
305

306
  This LU is intended as a parent for other LogicalUnits which will
307
  run no hooks, in order to reduce duplicate code.
308

309
  """
310
  HPATH = None
311
  HTYPE = None
312

    
313

    
314
def _GetWantedNodes(lu, nodes):
315
  """Returns list of checked and expanded node names.
316

317
  Args:
318
    nodes: List of nodes (strings) or None for all
319

320
  """
321
  if not isinstance(nodes, list):
322
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
323

    
324
  if nodes:
325
    wanted = []
326

    
327
    for name in nodes:
328
      node = lu.cfg.ExpandNodeName(name)
329
      if node is None:
330
        raise errors.OpPrereqError("No such node name '%s'" % name)
331
      wanted.append(node)
332

    
333
  else:
334
    wanted = lu.cfg.GetNodeList()
335
  return utils.NiceSort(wanted)
336

    
337

    
338
def _GetWantedInstances(lu, instances):
339
  """Returns list of checked and expanded instance names.
340

341
  Args:
342
    instances: List of instances (strings) or None for all
343

344
  """
345
  if not isinstance(instances, list):
346
    raise errors.OpPrereqError("Invalid argument type 'instances'")
347

    
348
  if instances:
349
    wanted = []
350

    
351
    for name in instances:
352
      instance = lu.cfg.ExpandInstanceName(name)
353
      if instance is None:
354
        raise errors.OpPrereqError("No such instance name '%s'" % name)
355
      wanted.append(instance)
356

    
357
  else:
358
    wanted = lu.cfg.GetInstanceList()
359
  return utils.NiceSort(wanted)
360

    
361

    
362
def _CheckOutputFields(static, dynamic, selected):
363
  """Checks whether all selected fields are valid.
364

365
  Args:
366
    static: Static fields
367
    dynamic: Dynamic fields
368

369
  """
370
  static_fields = frozenset(static)
371
  dynamic_fields = frozenset(dynamic)
372

    
373
  all_fields = static_fields | dynamic_fields
374

    
375
  if not all_fields.issuperset(selected):
376
    raise errors.OpPrereqError("Unknown output fields selected: %s"
377
                               % ",".join(frozenset(selected).
378
                                          difference(all_fields)))
379

    
380

    
381
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
382
                          memory, vcpus, nics):
383
  """Builds instance related env variables for hooks from single variables.
384

385
  Args:
386
    secondary_nodes: List of secondary nodes as strings
387
  """
388
  env = {
389
    "OP_TARGET": name,
390
    "INSTANCE_NAME": name,
391
    "INSTANCE_PRIMARY": primary_node,
392
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
393
    "INSTANCE_OS_TYPE": os_type,
394
    "INSTANCE_STATUS": status,
395
    "INSTANCE_MEMORY": memory,
396
    "INSTANCE_VCPUS": vcpus,
397
  }
398

    
399
  if nics:
400
    nic_count = len(nics)
401
    for idx, (ip, bridge, mac) in enumerate(nics):
402
      if ip is None:
403
        ip = ""
404
      env["INSTANCE_NIC%d_IP" % idx] = ip
405
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
406
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
407
  else:
408
    nic_count = 0
409

    
410
  env["INSTANCE_NIC_COUNT"] = nic_count
411

    
412
  return env
413

    
414

    
415
def _BuildInstanceHookEnvByObject(instance, override=None):
416
  """Builds instance related env variables for hooks from an object.
417

418
  Args:
419
    instance: objects.Instance object of instance
420
    override: dict of values to override
421
  """
422
  args = {
423
    'name': instance.name,
424
    'primary_node': instance.primary_node,
425
    'secondary_nodes': instance.secondary_nodes,
426
    'os_type': instance.os,
427
    'status': instance.os,
428
    'memory': instance.memory,
429
    'vcpus': instance.vcpus,
430
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
431
  }
432
  if override:
433
    args.update(override)
434
  return _BuildInstanceHookEnv(**args)
435

    
436

    
437
def _CheckInstanceBridgesExist(instance):
438
  """Check that the brigdes needed by an instance exist.
439

440
  """
441
  # check bridges existance
442
  brlist = [nic.bridge for nic in instance.nics]
443
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
444
    raise errors.OpPrereqError("one or more target bridges %s does not"
445
                               " exist on destination node '%s'" %
446
                               (brlist, instance.primary_node))
447

    
448

    
449
class LUDestroyCluster(NoHooksLU):
450
  """Logical unit for destroying the cluster.
451

452
  """
453
  _OP_REQP = []
454

    
455
  def CheckPrereq(self):
456
    """Check prerequisites.
457

458
    This checks whether the cluster is empty.
459

460
    Any errors are signalled by raising errors.OpPrereqError.
461

462
    """
463
    master = self.sstore.GetMasterNode()
464

    
465
    nodelist = self.cfg.GetNodeList()
466
    if len(nodelist) != 1 or nodelist[0] != master:
467
      raise errors.OpPrereqError("There are still %d node(s) in"
468
                                 " this cluster." % (len(nodelist) - 1))
469
    instancelist = self.cfg.GetInstanceList()
470
    if instancelist:
471
      raise errors.OpPrereqError("There are still %d instance(s) in"
472
                                 " this cluster." % len(instancelist))
473

    
474
  def Exec(self, feedback_fn):
475
    """Destroys the cluster.
476

477
    """
478
    master = self.sstore.GetMasterNode()
479
    if not rpc.call_node_stop_master(master, False):
480
      raise errors.OpExecError("Could not disable the master role")
481
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
482
    utils.CreateBackup(priv_key)
483
    utils.CreateBackup(pub_key)
484
    rpc.call_node_leave_cluster(master)
485

    
486

    
487
class LUVerifyCluster(LogicalUnit):
488
  """Verifies the cluster status.
489

490
  """
491
  HPATH = "cluster-verify"
492
  HTYPE = constants.HTYPE_CLUSTER
493
  _OP_REQP = ["skip_checks"]
494

    
495
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
496
                  remote_version, feedback_fn):
497
    """Run multiple tests against a node.
498

499
    Test list:
500
      - compares ganeti version
501
      - checks vg existance and size > 20G
502
      - checks config file checksum
503
      - checks ssh to other nodes
504

505
    Args:
506
      node: name of the node to check
507
      file_list: required list of files
508
      local_cksum: dictionary of local files and their checksums
509

510
    """
511
    # compares ganeti version
512
    local_version = constants.PROTOCOL_VERSION
513
    if not remote_version:
514
      feedback_fn("  - ERROR: connection to %s failed" % (node))
515
      return True
516

    
517
    if local_version != remote_version:
518
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
519
                      (local_version, node, remote_version))
520
      return True
521

    
522
    # checks vg existance and size > 20G
523

    
524
    bad = False
525
    if not vglist:
526
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
527
                      (node,))
528
      bad = True
529
    else:
530
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
531
                                            constants.MIN_VG_SIZE)
532
      if vgstatus:
533
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
534
        bad = True
535

    
536
    # checks config file checksum
537
    # checks ssh to any
538

    
539
    if 'filelist' not in node_result:
540
      bad = True
541
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
542
    else:
543
      remote_cksum = node_result['filelist']
544
      for file_name in file_list:
545
        if file_name not in remote_cksum:
546
          bad = True
547
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
548
        elif remote_cksum[file_name] != local_cksum[file_name]:
549
          bad = True
550
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
551

    
552
    if 'nodelist' not in node_result:
553
      bad = True
554
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
555
    else:
556
      if node_result['nodelist']:
557
        bad = True
558
        for node in node_result['nodelist']:
559
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
560
                          (node, node_result['nodelist'][node]))
561
    if 'node-net-test' not in node_result:
562
      bad = True
563
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
564
    else:
565
      if node_result['node-net-test']:
566
        bad = True
567
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
568
        for node in nlist:
569
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
570
                          (node, node_result['node-net-test'][node]))
571

    
572
    hyp_result = node_result.get('hypervisor', None)
573
    if hyp_result is not None:
574
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
575
    return bad
576

    
577
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
578
                      node_instance, feedback_fn):
579
    """Verify an instance.
580

581
    This function checks to see if the required block devices are
582
    available on the instance's node.
583

584
    """
585
    bad = False
586

    
587
    node_current = instanceconfig.primary_node
588

    
589
    node_vol_should = {}
590
    instanceconfig.MapLVsByNode(node_vol_should)
591

    
592
    for node in node_vol_should:
593
      for volume in node_vol_should[node]:
594
        if node not in node_vol_is or volume not in node_vol_is[node]:
595
          feedback_fn("  - ERROR: volume %s missing on node %s" %
596
                          (volume, node))
597
          bad = True
598

    
599
    if not instanceconfig.status == 'down':
600
      if (node_current not in node_instance or
601
          not instance in node_instance[node_current]):
602
        feedback_fn("  - ERROR: instance %s not running on node %s" %
603
                        (instance, node_current))
604
        bad = True
605

    
606
    for node in node_instance:
607
      if (not node == node_current):
608
        if instance in node_instance[node]:
609
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
610
                          (instance, node))
611
          bad = True
612

    
613
    return bad
614

    
615
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
616
    """Verify if there are any unknown volumes in the cluster.
617

618
    The .os, .swap and backup volumes are ignored. All other volumes are
619
    reported as unknown.
620

621
    """
622
    bad = False
623

    
624
    for node in node_vol_is:
625
      for volume in node_vol_is[node]:
626
        if node not in node_vol_should or volume not in node_vol_should[node]:
627
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
628
                      (volume, node))
629
          bad = True
630
    return bad
631

    
632
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
633
    """Verify the list of running instances.
634

635
    This checks what instances are running but unknown to the cluster.
636

637
    """
638
    bad = False
639
    for node in node_instance:
640
      for runninginstance in node_instance[node]:
641
        if runninginstance not in instancelist:
642
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
643
                          (runninginstance, node))
644
          bad = True
645
    return bad
646

    
647
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
648
    """Verify N+1 Memory Resilience.
649

650
    Check that if one single node dies we can still start all the instances it
651
    was primary for.
652

653
    """
654
    bad = False
655

    
656
    for node, nodeinfo in node_info.iteritems():
657
      # This code checks that every node which is now listed as secondary has
658
      # enough memory to host all instances it is supposed to should a single
659
      # other node in the cluster fail.
660
      # FIXME: not ready for failover to an arbitrary node
661
      # FIXME: does not support file-backed instances
662
      # WARNING: we currently take into account down instances as well as up
663
      # ones, considering that even if they're down someone might want to start
664
      # them even in the event of a node failure.
665
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
666
        needed_mem = 0
667
        for instance in instances:
668
          needed_mem += instance_cfg[instance].memory
669
        if nodeinfo['mfree'] < needed_mem:
670
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
671
                      " failovers should node %s fail" % (node, prinode))
672
          bad = True
673
    return bad
674

    
675
  def CheckPrereq(self):
676
    """Check prerequisites.
677

678
    Transform the list of checks we're going to skip into a set and check that
679
    all its members are valid.
680

681
    """
682
    self.skip_set = frozenset(self.op.skip_checks)
683
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
684
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
685

    
686
  def BuildHooksEnv(self):
687
    """Build hooks env.
688

689
    Cluster-Verify hooks just rone in the post phase and their failure makes
690
    the output be logged in the verify output and the verification to fail.
691

692
    """
693
    all_nodes = self.cfg.GetNodeList()
694
    # TODO: populate the environment with useful information for verify hooks
695
    env = {}
696
    return env, [], all_nodes
697

    
698
  def Exec(self, feedback_fn):
699
    """Verify integrity of cluster, performing various test on nodes.
700

701
    """
702
    bad = False
703
    feedback_fn("* Verifying global settings")
704
    for msg in self.cfg.VerifyConfig():
705
      feedback_fn("  - ERROR: %s" % msg)
706

    
707
    vg_name = self.cfg.GetVGName()
708
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
709
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
710
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
711
    i_non_redundant = [] # Non redundant instances
712
    node_volume = {}
713
    node_instance = {}
714
    node_info = {}
715
    instance_cfg = {}
716

    
717
    # FIXME: verify OS list
718
    # do local checksums
719
    file_names = list(self.sstore.GetFileList())
720
    file_names.append(constants.SSL_CERT_FILE)
721
    file_names.append(constants.CLUSTER_CONF_FILE)
722
    local_checksums = utils.FingerprintFiles(file_names)
723

    
724
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
725
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
726
    all_instanceinfo = rpc.call_instance_list(nodelist)
727
    all_vglist = rpc.call_vg_list(nodelist)
728
    node_verify_param = {
729
      'filelist': file_names,
730
      'nodelist': nodelist,
731
      'hypervisor': None,
732
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
733
                        for node in nodeinfo]
734
      }
735
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
736
    all_rversion = rpc.call_version(nodelist)
737
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
738

    
739
    for node in nodelist:
740
      feedback_fn("* Verifying node %s" % node)
741
      result = self._VerifyNode(node, file_names, local_checksums,
742
                                all_vglist[node], all_nvinfo[node],
743
                                all_rversion[node], feedback_fn)
744
      bad = bad or result
745

    
746
      # node_volume
747
      volumeinfo = all_volumeinfo[node]
748

    
749
      if isinstance(volumeinfo, basestring):
750
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
751
                    (node, volumeinfo[-400:].encode('string_escape')))
752
        bad = True
753
        node_volume[node] = {}
754
      elif not isinstance(volumeinfo, dict):
755
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
756
        bad = True
757
        continue
758
      else:
759
        node_volume[node] = volumeinfo
760

    
761
      # node_instance
762
      nodeinstance = all_instanceinfo[node]
763
      if type(nodeinstance) != list:
764
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
765
        bad = True
766
        continue
767

    
768
      node_instance[node] = nodeinstance
769

    
770
      # node_info
771
      nodeinfo = all_ninfo[node]
772
      if not isinstance(nodeinfo, dict):
773
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
774
        bad = True
775
        continue
776

    
777
      try:
778
        node_info[node] = {
779
          "mfree": int(nodeinfo['memory_free']),
780
          "dfree": int(nodeinfo['vg_free']),
781
          "pinst": [],
782
          "sinst": [],
783
          # dictionary holding all instances this node is secondary for,
784
          # grouped by their primary node. Each key is a cluster node, and each
785
          # value is a list of instances which have the key as primary and the
786
          # current node as secondary.  this is handy to calculate N+1 memory
787
          # availability if you can only failover from a primary to its
788
          # secondary.
789
          "sinst-by-pnode": {},
790
        }
791
      except ValueError:
792
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
793
        bad = True
794
        continue
795

    
796
    node_vol_should = {}
797

    
798
    for instance in instancelist:
799
      feedback_fn("* Verifying instance %s" % instance)
800
      inst_config = self.cfg.GetInstanceInfo(instance)
801
      result =  self._VerifyInstance(instance, inst_config, node_volume,
802
                                     node_instance, feedback_fn)
803
      bad = bad or result
804

    
805
      inst_config.MapLVsByNode(node_vol_should)
806

    
807
      instance_cfg[instance] = inst_config
808

    
809
      pnode = inst_config.primary_node
810
      if pnode in node_info:
811
        node_info[pnode]['pinst'].append(instance)
812
      else:
813
        feedback_fn("  - ERROR: instance %s, connection to primary node"
814
                    " %s failed" % (instance, pnode))
815
        bad = True
816

    
817
      # If the instance is non-redundant we cannot survive losing its primary
818
      # node, so we are not N+1 compliant. On the other hand we have no disk
819
      # templates with more than one secondary so that situation is not well
820
      # supported either.
821
      # FIXME: does not support file-backed instances
822
      if len(inst_config.secondary_nodes) == 0:
823
        i_non_redundant.append(instance)
824
      elif len(inst_config.secondary_nodes) > 1:
825
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
826
                    % instance)
827

    
828
      for snode in inst_config.secondary_nodes:
829
        if snode in node_info:
830
          node_info[snode]['sinst'].append(instance)
831
          if pnode not in node_info[snode]['sinst-by-pnode']:
832
            node_info[snode]['sinst-by-pnode'][pnode] = []
833
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
834
        else:
835
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
836
                      " %s failed" % (instance, snode))
837

    
838
    feedback_fn("* Verifying orphan volumes")
839
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
840
                                       feedback_fn)
841
    bad = bad or result
842

    
843
    feedback_fn("* Verifying remaining instances")
844
    result = self._VerifyOrphanInstances(instancelist, node_instance,
845
                                         feedback_fn)
846
    bad = bad or result
847

    
848
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
849
      feedback_fn("* Verifying N+1 Memory redundancy")
850
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
851
      bad = bad or result
852

    
853
    feedback_fn("* Other Notes")
854
    if i_non_redundant:
855
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
856
                  % len(i_non_redundant))
857

    
858
    return int(bad)
859

    
860
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
861
    """Analize the post-hooks' result, handle it, and send some
862
    nicely-formatted feedback back to the user.
863

864
    Args:
865
      phase: the hooks phase that has just been run
866
      hooks_results: the results of the multi-node hooks rpc call
867
      feedback_fn: function to send feedback back to the caller
868
      lu_result: previous Exec result
869

870
    """
871
    # We only really run POST phase hooks, and are only interested in
872
    # their results
873
    if phase == constants.HOOKS_PHASE_POST:
874
      # Used to change hooks' output to proper indentation
875
      indent_re = re.compile('^', re.M)
876
      feedback_fn("* Hooks Results")
877
      if not hooks_results:
878
        feedback_fn("  - ERROR: general communication failure")
879
        lu_result = 1
880
      else:
881
        for node_name in hooks_results:
882
          show_node_header = True
883
          res = hooks_results[node_name]
884
          if res is False or not isinstance(res, list):
885
            feedback_fn("    Communication failure")
886
            lu_result = 1
887
            continue
888
          for script, hkr, output in res:
889
            if hkr == constants.HKR_FAIL:
890
              # The node header is only shown once, if there are
891
              # failing hooks on that node
892
              if show_node_header:
893
                feedback_fn("  Node %s:" % node_name)
894
                show_node_header = False
895
              feedback_fn("    ERROR: Script %s failed, output:" % script)
896
              output = indent_re.sub('      ', output)
897
              feedback_fn("%s" % output)
898
              lu_result = 1
899

    
900
      return lu_result
901

    
902

    
903
class LUVerifyDisks(NoHooksLU):
904
  """Verifies the cluster disks status.
905

906
  """
907
  _OP_REQP = []
908

    
909
  def CheckPrereq(self):
910
    """Check prerequisites.
911

912
    This has no prerequisites.
913

914
    """
915
    pass
916

    
917
  def Exec(self, feedback_fn):
918
    """Verify integrity of cluster disks.
919

920
    """
921
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
922

    
923
    vg_name = self.cfg.GetVGName()
924
    nodes = utils.NiceSort(self.cfg.GetNodeList())
925
    instances = [self.cfg.GetInstanceInfo(name)
926
                 for name in self.cfg.GetInstanceList()]
927

    
928
    nv_dict = {}
929
    for inst in instances:
930
      inst_lvs = {}
931
      if (inst.status != "up" or
932
          inst.disk_template not in constants.DTS_NET_MIRROR):
933
        continue
934
      inst.MapLVsByNode(inst_lvs)
935
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
936
      for node, vol_list in inst_lvs.iteritems():
937
        for vol in vol_list:
938
          nv_dict[(node, vol)] = inst
939

    
940
    if not nv_dict:
941
      return result
942

    
943
    node_lvs = rpc.call_volume_list(nodes, vg_name)
944

    
945
    to_act = set()
946
    for node in nodes:
947
      # node_volume
948
      lvs = node_lvs[node]
949

    
950
      if isinstance(lvs, basestring):
951
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
952
        res_nlvm[node] = lvs
953
      elif not isinstance(lvs, dict):
954
        logger.Info("connection to node %s failed or invalid data returned" %
955
                    (node,))
956
        res_nodes.append(node)
957
        continue
958

    
959
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
960
        inst = nv_dict.pop((node, lv_name), None)
961
        if (not lv_online and inst is not None
962
            and inst.name not in res_instances):
963
          res_instances.append(inst.name)
964

    
965
    # any leftover items in nv_dict are missing LVs, let's arrange the
966
    # data better
967
    for key, inst in nv_dict.iteritems():
968
      if inst.name not in res_missing:
969
        res_missing[inst.name] = []
970
      res_missing[inst.name].append(key)
971

    
972
    return result
973

    
974

    
975
class LURenameCluster(LogicalUnit):
976
  """Rename the cluster.
977

978
  """
979
  HPATH = "cluster-rename"
980
  HTYPE = constants.HTYPE_CLUSTER
981
  _OP_REQP = ["name"]
982
  REQ_WSSTORE = True
983

    
984
  def BuildHooksEnv(self):
985
    """Build hooks env.
986

987
    """
988
    env = {
989
      "OP_TARGET": self.sstore.GetClusterName(),
990
      "NEW_NAME": self.op.name,
991
      }
992
    mn = self.sstore.GetMasterNode()
993
    return env, [mn], [mn]
994

    
995
  def CheckPrereq(self):
996
    """Verify that the passed name is a valid one.
997

998
    """
999
    hostname = utils.HostInfo(self.op.name)
1000

    
1001
    new_name = hostname.name
1002
    self.ip = new_ip = hostname.ip
1003
    old_name = self.sstore.GetClusterName()
1004
    old_ip = self.sstore.GetMasterIP()
1005
    if new_name == old_name and new_ip == old_ip:
1006
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1007
                                 " cluster has changed")
1008
    if new_ip != old_ip:
1009
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1010
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1011
                                   " reachable on the network. Aborting." %
1012
                                   new_ip)
1013

    
1014
    self.op.name = new_name
1015

    
1016
  def Exec(self, feedback_fn):
1017
    """Rename the cluster.
1018

1019
    """
1020
    clustername = self.op.name
1021
    ip = self.ip
1022
    ss = self.sstore
1023

    
1024
    # shutdown the master IP
1025
    master = ss.GetMasterNode()
1026
    if not rpc.call_node_stop_master(master, False):
1027
      raise errors.OpExecError("Could not disable the master role")
1028

    
1029
    try:
1030
      # modify the sstore
1031
      ss.SetKey(ss.SS_MASTER_IP, ip)
1032
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1033

    
1034
      # Distribute updated ss config to all nodes
1035
      myself = self.cfg.GetNodeInfo(master)
1036
      dist_nodes = self.cfg.GetNodeList()
1037
      if myself.name in dist_nodes:
1038
        dist_nodes.remove(myself.name)
1039

    
1040
      logger.Debug("Copying updated ssconf data to all nodes")
1041
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1042
        fname = ss.KeyToFilename(keyname)
1043
        result = rpc.call_upload_file(dist_nodes, fname)
1044
        for to_node in dist_nodes:
1045
          if not result[to_node]:
1046
            logger.Error("copy of file %s to node %s failed" %
1047
                         (fname, to_node))
1048
    finally:
1049
      if not rpc.call_node_start_master(master, False):
1050
        logger.Error("Could not re-enable the master role on the master,"
1051
                     " please restart manually.")
1052

    
1053

    
1054
def _RecursiveCheckIfLVMBased(disk):
1055
  """Check if the given disk or its children are lvm-based.
1056

1057
  Args:
1058
    disk: ganeti.objects.Disk object
1059

1060
  Returns:
1061
    boolean indicating whether a LD_LV dev_type was found or not
1062

1063
  """
1064
  if disk.children:
1065
    for chdisk in disk.children:
1066
      if _RecursiveCheckIfLVMBased(chdisk):
1067
        return True
1068
  return disk.dev_type == constants.LD_LV
1069

    
1070

    
1071
class LUSetClusterParams(LogicalUnit):
1072
  """Change the parameters of the cluster.
1073

1074
  """
1075
  HPATH = "cluster-modify"
1076
  HTYPE = constants.HTYPE_CLUSTER
1077
  _OP_REQP = []
1078

    
1079
  def BuildHooksEnv(self):
1080
    """Build hooks env.
1081

1082
    """
1083
    env = {
1084
      "OP_TARGET": self.sstore.GetClusterName(),
1085
      "NEW_VG_NAME": self.op.vg_name,
1086
      }
1087
    mn = self.sstore.GetMasterNode()
1088
    return env, [mn], [mn]
1089

    
1090
  def CheckPrereq(self):
1091
    """Check prerequisites.
1092

1093
    This checks whether the given params don't conflict and
1094
    if the given volume group is valid.
1095

1096
    """
1097
    if not self.op.vg_name:
1098
      instances = [self.cfg.GetInstanceInfo(name)
1099
                   for name in self.cfg.GetInstanceList()]
1100
      for inst in instances:
1101
        for disk in inst.disks:
1102
          if _RecursiveCheckIfLVMBased(disk):
1103
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1104
                                       " lvm-based instances exist")
1105

    
1106
    # if vg_name not None, checks given volume group on all nodes
1107
    if self.op.vg_name:
1108
      node_list = self.cfg.GetNodeList()
1109
      vglist = rpc.call_vg_list(node_list)
1110
      for node in node_list:
1111
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1112
                                              constants.MIN_VG_SIZE)
1113
        if vgstatus:
1114
          raise errors.OpPrereqError("Error on node '%s': %s" %
1115
                                     (node, vgstatus))
1116

    
1117
  def Exec(self, feedback_fn):
1118
    """Change the parameters of the cluster.
1119

1120
    """
1121
    if self.op.vg_name != self.cfg.GetVGName():
1122
      self.cfg.SetVGName(self.op.vg_name)
1123
    else:
1124
      feedback_fn("Cluster LVM configuration already in desired"
1125
                  " state, not changing")
1126

    
1127

    
1128
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1129
  """Sleep and poll for an instance's disk to sync.
1130

1131
  """
1132
  if not instance.disks:
1133
    return True
1134

    
1135
  if not oneshot:
1136
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1137

    
1138
  node = instance.primary_node
1139

    
1140
  for dev in instance.disks:
1141
    cfgw.SetDiskID(dev, node)
1142

    
1143
  retries = 0
1144
  while True:
1145
    max_time = 0
1146
    done = True
1147
    cumul_degraded = False
1148
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1149
    if not rstats:
1150
      proc.LogWarning("Can't get any data from node %s" % node)
1151
      retries += 1
1152
      if retries >= 10:
1153
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1154
                                 " aborting." % node)
1155
      time.sleep(6)
1156
      continue
1157
    retries = 0
1158
    for i in range(len(rstats)):
1159
      mstat = rstats[i]
1160
      if mstat is None:
1161
        proc.LogWarning("Can't compute data for node %s/%s" %
1162
                        (node, instance.disks[i].iv_name))
1163
        continue
1164
      # we ignore the ldisk parameter
1165
      perc_done, est_time, is_degraded, _ = mstat
1166
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1167
      if perc_done is not None:
1168
        done = False
1169
        if est_time is not None:
1170
          rem_time = "%d estimated seconds remaining" % est_time
1171
          max_time = est_time
1172
        else:
1173
          rem_time = "no time estimate"
1174
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1175
                     (instance.disks[i].iv_name, perc_done, rem_time))
1176
    if done or oneshot:
1177
      break
1178

    
1179
    time.sleep(min(60, max_time))
1180

    
1181
  if done:
1182
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1183
  return not cumul_degraded
1184

    
1185

    
1186
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1187
  """Check that mirrors are not degraded.
1188

1189
  The ldisk parameter, if True, will change the test from the
1190
  is_degraded attribute (which represents overall non-ok status for
1191
  the device(s)) to the ldisk (representing the local storage status).
1192

1193
  """
1194
  cfgw.SetDiskID(dev, node)
1195
  if ldisk:
1196
    idx = 6
1197
  else:
1198
    idx = 5
1199

    
1200
  result = True
1201
  if on_primary or dev.AssembleOnSecondary():
1202
    rstats = rpc.call_blockdev_find(node, dev)
1203
    if not rstats:
1204
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1205
      result = False
1206
    else:
1207
      result = result and (not rstats[idx])
1208
  if dev.children:
1209
    for child in dev.children:
1210
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1211

    
1212
  return result
1213

    
1214

    
1215
class LUDiagnoseOS(NoHooksLU):
1216
  """Logical unit for OS diagnose/query.
1217

1218
  """
1219
  _OP_REQP = ["output_fields", "names"]
1220

    
1221
  def CheckPrereq(self):
1222
    """Check prerequisites.
1223

1224
    This always succeeds, since this is a pure query LU.
1225

1226
    """
1227
    if self.op.names:
1228
      raise errors.OpPrereqError("Selective OS query not supported")
1229

    
1230
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1231
    _CheckOutputFields(static=[],
1232
                       dynamic=self.dynamic_fields,
1233
                       selected=self.op.output_fields)
1234

    
1235
  @staticmethod
1236
  def _DiagnoseByOS(node_list, rlist):
1237
    """Remaps a per-node return list into an a per-os per-node dictionary
1238

1239
      Args:
1240
        node_list: a list with the names of all nodes
1241
        rlist: a map with node names as keys and OS objects as values
1242

1243
      Returns:
1244
        map: a map with osnames as keys and as value another map, with
1245
             nodes as
1246
             keys and list of OS objects as values
1247
             e.g. {"debian-etch": {"node1": [<object>,...],
1248
                                   "node2": [<object>,]}
1249
                  }
1250

1251
    """
1252
    all_os = {}
1253
    for node_name, nr in rlist.iteritems():
1254
      if not nr:
1255
        continue
1256
      for os_obj in nr:
1257
        if os_obj.name not in all_os:
1258
          # build a list of nodes for this os containing empty lists
1259
          # for each node in node_list
1260
          all_os[os_obj.name] = {}
1261
          for nname in node_list:
1262
            all_os[os_obj.name][nname] = []
1263
        all_os[os_obj.name][node_name].append(os_obj)
1264
    return all_os
1265

    
1266
  def Exec(self, feedback_fn):
1267
    """Compute the list of OSes.
1268

1269
    """
1270
    node_list = self.cfg.GetNodeList()
1271
    node_data = rpc.call_os_diagnose(node_list)
1272
    if node_data == False:
1273
      raise errors.OpExecError("Can't gather the list of OSes")
1274
    pol = self._DiagnoseByOS(node_list, node_data)
1275
    output = []
1276
    for os_name, os_data in pol.iteritems():
1277
      row = []
1278
      for field in self.op.output_fields:
1279
        if field == "name":
1280
          val = os_name
1281
        elif field == "valid":
1282
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1283
        elif field == "node_status":
1284
          val = {}
1285
          for node_name, nos_list in os_data.iteritems():
1286
            val[node_name] = [(v.status, v.path) for v in nos_list]
1287
        else:
1288
          raise errors.ParameterError(field)
1289
        row.append(val)
1290
      output.append(row)
1291

    
1292
    return output
1293

    
1294

    
1295
class LURemoveNode(LogicalUnit):
1296
  """Logical unit for removing a node.
1297

1298
  """
1299
  HPATH = "node-remove"
1300
  HTYPE = constants.HTYPE_NODE
1301
  _OP_REQP = ["node_name"]
1302

    
1303
  def BuildHooksEnv(self):
1304
    """Build hooks env.
1305

1306
    This doesn't run on the target node in the pre phase as a failed
1307
    node would then be impossible to remove.
1308

1309
    """
1310
    env = {
1311
      "OP_TARGET": self.op.node_name,
1312
      "NODE_NAME": self.op.node_name,
1313
      }
1314
    all_nodes = self.cfg.GetNodeList()
1315
    all_nodes.remove(self.op.node_name)
1316
    return env, all_nodes, all_nodes
1317

    
1318
  def CheckPrereq(self):
1319
    """Check prerequisites.
1320

1321
    This checks:
1322
     - the node exists in the configuration
1323
     - it does not have primary or secondary instances
1324
     - it's not the master
1325

1326
    Any errors are signalled by raising errors.OpPrereqError.
1327

1328
    """
1329
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1330
    if node is None:
1331
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1332

    
1333
    instance_list = self.cfg.GetInstanceList()
1334

    
1335
    masternode = self.sstore.GetMasterNode()
1336
    if node.name == masternode:
1337
      raise errors.OpPrereqError("Node is the master node,"
1338
                                 " you need to failover first.")
1339

    
1340
    for instance_name in instance_list:
1341
      instance = self.cfg.GetInstanceInfo(instance_name)
1342
      if node.name == instance.primary_node:
1343
        raise errors.OpPrereqError("Instance %s still running on the node,"
1344
                                   " please remove first." % instance_name)
1345
      if node.name in instance.secondary_nodes:
1346
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1347
                                   " please remove first." % instance_name)
1348
    self.op.node_name = node.name
1349
    self.node = node
1350

    
1351
  def Exec(self, feedback_fn):
1352
    """Removes the node from the cluster.
1353

1354
    """
1355
    node = self.node
1356
    logger.Info("stopping the node daemon and removing configs from node %s" %
1357
                node.name)
1358

    
1359
    rpc.call_node_leave_cluster(node.name)
1360

    
1361
    logger.Info("Removing node %s from config" % node.name)
1362

    
1363
    self.cfg.RemoveNode(node.name)
1364
    # Remove the node from the Ganeti Lock Manager
1365
    self.context.glm.remove(locking.LEVEL_NODE, node.name)
1366

    
1367
    utils.RemoveHostFromEtcHosts(node.name)
1368

    
1369

    
1370
class LUQueryNodes(NoHooksLU):
1371
  """Logical unit for querying nodes.
1372

1373
  """
1374
  _OP_REQP = ["output_fields", "names"]
1375

    
1376
  def CheckPrereq(self):
1377
    """Check prerequisites.
1378

1379
    This checks that the fields required are valid output fields.
1380

1381
    """
1382
    self.dynamic_fields = frozenset([
1383
      "dtotal", "dfree",
1384
      "mtotal", "mnode", "mfree",
1385
      "bootid",
1386
      "ctotal",
1387
      ])
1388

    
1389
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1390
                               "pinst_list", "sinst_list",
1391
                               "pip", "sip", "tags"],
1392
                       dynamic=self.dynamic_fields,
1393
                       selected=self.op.output_fields)
1394

    
1395
    self.wanted = _GetWantedNodes(self, self.op.names)
1396

    
1397
  def Exec(self, feedback_fn):
1398
    """Computes the list of nodes and their attributes.
1399

1400
    """
1401
    nodenames = self.wanted
1402
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1403

    
1404
    # begin data gathering
1405

    
1406
    if self.dynamic_fields.intersection(self.op.output_fields):
1407
      live_data = {}
1408
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1409
      for name in nodenames:
1410
        nodeinfo = node_data.get(name, None)
1411
        if nodeinfo:
1412
          live_data[name] = {
1413
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1414
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1415
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1416
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1417
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1418
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1419
            "bootid": nodeinfo['bootid'],
1420
            }
1421
        else:
1422
          live_data[name] = {}
1423
    else:
1424
      live_data = dict.fromkeys(nodenames, {})
1425

    
1426
    node_to_primary = dict([(name, set()) for name in nodenames])
1427
    node_to_secondary = dict([(name, set()) for name in nodenames])
1428

    
1429
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1430
                             "sinst_cnt", "sinst_list"))
1431
    if inst_fields & frozenset(self.op.output_fields):
1432
      instancelist = self.cfg.GetInstanceList()
1433

    
1434
      for instance_name in instancelist:
1435
        inst = self.cfg.GetInstanceInfo(instance_name)
1436
        if inst.primary_node in node_to_primary:
1437
          node_to_primary[inst.primary_node].add(inst.name)
1438
        for secnode in inst.secondary_nodes:
1439
          if secnode in node_to_secondary:
1440
            node_to_secondary[secnode].add(inst.name)
1441

    
1442
    # end data gathering
1443

    
1444
    output = []
1445
    for node in nodelist:
1446
      node_output = []
1447
      for field in self.op.output_fields:
1448
        if field == "name":
1449
          val = node.name
1450
        elif field == "pinst_list":
1451
          val = list(node_to_primary[node.name])
1452
        elif field == "sinst_list":
1453
          val = list(node_to_secondary[node.name])
1454
        elif field == "pinst_cnt":
1455
          val = len(node_to_primary[node.name])
1456
        elif field == "sinst_cnt":
1457
          val = len(node_to_secondary[node.name])
1458
        elif field == "pip":
1459
          val = node.primary_ip
1460
        elif field == "sip":
1461
          val = node.secondary_ip
1462
        elif field == "tags":
1463
          val = list(node.GetTags())
1464
        elif field in self.dynamic_fields:
1465
          val = live_data[node.name].get(field, None)
1466
        else:
1467
          raise errors.ParameterError(field)
1468
        node_output.append(val)
1469
      output.append(node_output)
1470

    
1471
    return output
1472

    
1473

    
1474
class LUQueryNodeVolumes(NoHooksLU):
1475
  """Logical unit for getting volumes on node(s).
1476

1477
  """
1478
  _OP_REQP = ["nodes", "output_fields"]
1479

    
1480
  def CheckPrereq(self):
1481
    """Check prerequisites.
1482

1483
    This checks that the fields required are valid output fields.
1484

1485
    """
1486
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1487

    
1488
    _CheckOutputFields(static=["node"],
1489
                       dynamic=["phys", "vg", "name", "size", "instance"],
1490
                       selected=self.op.output_fields)
1491

    
1492

    
1493
  def Exec(self, feedback_fn):
1494
    """Computes the list of nodes and their attributes.
1495

1496
    """
1497
    nodenames = self.nodes
1498
    volumes = rpc.call_node_volumes(nodenames)
1499

    
1500
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1501
             in self.cfg.GetInstanceList()]
1502

    
1503
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1504

    
1505
    output = []
1506
    for node in nodenames:
1507
      if node not in volumes or not volumes[node]:
1508
        continue
1509

    
1510
      node_vols = volumes[node][:]
1511
      node_vols.sort(key=lambda vol: vol['dev'])
1512

    
1513
      for vol in node_vols:
1514
        node_output = []
1515
        for field in self.op.output_fields:
1516
          if field == "node":
1517
            val = node
1518
          elif field == "phys":
1519
            val = vol['dev']
1520
          elif field == "vg":
1521
            val = vol['vg']
1522
          elif field == "name":
1523
            val = vol['name']
1524
          elif field == "size":
1525
            val = int(float(vol['size']))
1526
          elif field == "instance":
1527
            for inst in ilist:
1528
              if node not in lv_by_node[inst]:
1529
                continue
1530
              if vol['name'] in lv_by_node[inst][node]:
1531
                val = inst.name
1532
                break
1533
            else:
1534
              val = '-'
1535
          else:
1536
            raise errors.ParameterError(field)
1537
          node_output.append(str(val))
1538

    
1539
        output.append(node_output)
1540

    
1541
    return output
1542

    
1543

    
1544
class LUAddNode(LogicalUnit):
1545
  """Logical unit for adding node to the cluster.
1546

1547
  """
1548
  HPATH = "node-add"
1549
  HTYPE = constants.HTYPE_NODE
1550
  _OP_REQP = ["node_name"]
1551

    
1552
  def BuildHooksEnv(self):
1553
    """Build hooks env.
1554

1555
    This will run on all nodes before, and on all nodes + the new node after.
1556

1557
    """
1558
    env = {
1559
      "OP_TARGET": self.op.node_name,
1560
      "NODE_NAME": self.op.node_name,
1561
      "NODE_PIP": self.op.primary_ip,
1562
      "NODE_SIP": self.op.secondary_ip,
1563
      }
1564
    nodes_0 = self.cfg.GetNodeList()
1565
    nodes_1 = nodes_0 + [self.op.node_name, ]
1566
    return env, nodes_0, nodes_1
1567

    
1568
  def CheckPrereq(self):
1569
    """Check prerequisites.
1570

1571
    This checks:
1572
     - the new node is not already in the config
1573
     - it is resolvable
1574
     - its parameters (single/dual homed) matches the cluster
1575

1576
    Any errors are signalled by raising errors.OpPrereqError.
1577

1578
    """
1579
    node_name = self.op.node_name
1580
    cfg = self.cfg
1581

    
1582
    dns_data = utils.HostInfo(node_name)
1583

    
1584
    node = dns_data.name
1585
    primary_ip = self.op.primary_ip = dns_data.ip
1586
    secondary_ip = getattr(self.op, "secondary_ip", None)
1587
    if secondary_ip is None:
1588
      secondary_ip = primary_ip
1589
    if not utils.IsValidIP(secondary_ip):
1590
      raise errors.OpPrereqError("Invalid secondary IP given")
1591
    self.op.secondary_ip = secondary_ip
1592

    
1593
    node_list = cfg.GetNodeList()
1594
    if not self.op.readd and node in node_list:
1595
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1596
                                 node)
1597
    elif self.op.readd and node not in node_list:
1598
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1599

    
1600
    for existing_node_name in node_list:
1601
      existing_node = cfg.GetNodeInfo(existing_node_name)
1602

    
1603
      if self.op.readd and node == existing_node_name:
1604
        if (existing_node.primary_ip != primary_ip or
1605
            existing_node.secondary_ip != secondary_ip):
1606
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1607
                                     " address configuration as before")
1608
        continue
1609

    
1610
      if (existing_node.primary_ip == primary_ip or
1611
          existing_node.secondary_ip == primary_ip or
1612
          existing_node.primary_ip == secondary_ip or
1613
          existing_node.secondary_ip == secondary_ip):
1614
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1615
                                   " existing node %s" % existing_node.name)
1616

    
1617
    # check that the type of the node (single versus dual homed) is the
1618
    # same as for the master
1619
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1620
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1621
    newbie_singlehomed = secondary_ip == primary_ip
1622
    if master_singlehomed != newbie_singlehomed:
1623
      if master_singlehomed:
1624
        raise errors.OpPrereqError("The master has no private ip but the"
1625
                                   " new node has one")
1626
      else:
1627
        raise errors.OpPrereqError("The master has a private ip but the"
1628
                                   " new node doesn't have one")
1629

    
1630
    # checks reachablity
1631
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1632
      raise errors.OpPrereqError("Node not reachable by ping")
1633

    
1634
    if not newbie_singlehomed:
1635
      # check reachability from my secondary ip to newbie's secondary ip
1636
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1637
                           source=myself.secondary_ip):
1638
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1639
                                   " based ping to noded port")
1640

    
1641
    self.new_node = objects.Node(name=node,
1642
                                 primary_ip=primary_ip,
1643
                                 secondary_ip=secondary_ip)
1644

    
1645
  def Exec(self, feedback_fn):
1646
    """Adds the new node to the cluster.
1647

1648
    """
1649
    new_node = self.new_node
1650
    node = new_node.name
1651

    
1652
    # check connectivity
1653
    result = rpc.call_version([node])[node]
1654
    if result:
1655
      if constants.PROTOCOL_VERSION == result:
1656
        logger.Info("communication to node %s fine, sw version %s match" %
1657
                    (node, result))
1658
      else:
1659
        raise errors.OpExecError("Version mismatch master version %s,"
1660
                                 " node version %s" %
1661
                                 (constants.PROTOCOL_VERSION, result))
1662
    else:
1663
      raise errors.OpExecError("Cannot get version from the new node")
1664

    
1665
    # setup ssh on node
1666
    logger.Info("copy ssh key to node %s" % node)
1667
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1668
    keyarray = []
1669
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1670
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1671
                priv_key, pub_key]
1672

    
1673
    for i in keyfiles:
1674
      f = open(i, 'r')
1675
      try:
1676
        keyarray.append(f.read())
1677
      finally:
1678
        f.close()
1679

    
1680
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1681
                               keyarray[3], keyarray[4], keyarray[5])
1682

    
1683
    if not result:
1684
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1685

    
1686
    # Add node to our /etc/hosts, and add key to known_hosts
1687
    utils.AddHostToEtcHosts(new_node.name)
1688

    
1689
    if new_node.secondary_ip != new_node.primary_ip:
1690
      if not rpc.call_node_tcp_ping(new_node.name,
1691
                                    constants.LOCALHOST_IP_ADDRESS,
1692
                                    new_node.secondary_ip,
1693
                                    constants.DEFAULT_NODED_PORT,
1694
                                    10, False):
1695
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1696
                                 " you gave (%s). Please fix and re-run this"
1697
                                 " command." % new_node.secondary_ip)
1698

    
1699
    node_verify_list = [self.sstore.GetMasterNode()]
1700
    node_verify_param = {
1701
      'nodelist': [node],
1702
      # TODO: do a node-net-test as well?
1703
    }
1704

    
1705
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1706
    for verifier in node_verify_list:
1707
      if not result[verifier]:
1708
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1709
                                 " for remote verification" % verifier)
1710
      if result[verifier]['nodelist']:
1711
        for failed in result[verifier]['nodelist']:
1712
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1713
                      (verifier, result[verifier]['nodelist'][failed]))
1714
        raise errors.OpExecError("ssh/hostname verification failed.")
1715

    
1716
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1717
    # including the node just added
1718
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1719
    dist_nodes = self.cfg.GetNodeList()
1720
    if not self.op.readd:
1721
      dist_nodes.append(node)
1722
    if myself.name in dist_nodes:
1723
      dist_nodes.remove(myself.name)
1724

    
1725
    logger.Debug("Copying hosts and known_hosts to all nodes")
1726
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1727
      result = rpc.call_upload_file(dist_nodes, fname)
1728
      for to_node in dist_nodes:
1729
        if not result[to_node]:
1730
          logger.Error("copy of file %s to node %s failed" %
1731
                       (fname, to_node))
1732

    
1733
    to_copy = self.sstore.GetFileList()
1734
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1735
      to_copy.append(constants.VNC_PASSWORD_FILE)
1736
    for fname in to_copy:
1737
      result = rpc.call_upload_file([node], fname)
1738
      if not result[node]:
1739
        logger.Error("could not copy file %s to node %s" % (fname, node))
1740

    
1741
    if not self.op.readd:
1742
      logger.Info("adding node %s to cluster.conf" % node)
1743
      self.cfg.AddNode(new_node)
1744
      # Add the new node to the Ganeti Lock Manager
1745
      self.context.glm.add(locking.LEVEL_NODE, node)
1746

    
1747

    
1748
class LUQueryClusterInfo(NoHooksLU):
1749
  """Query cluster configuration.
1750

1751
  """
1752
  _OP_REQP = []
1753
  REQ_MASTER = False
1754
  REQ_BGL = False
1755

    
1756
  def ExpandNames(self):
1757
    self.needed_locks = {}
1758

    
1759
  def CheckPrereq(self):
1760
    """No prerequsites needed for this LU.
1761

1762
    """
1763
    pass
1764

    
1765
  def Exec(self, feedback_fn):
1766
    """Return cluster config.
1767

1768
    """
1769
    result = {
1770
      "name": self.sstore.GetClusterName(),
1771
      "software_version": constants.RELEASE_VERSION,
1772
      "protocol_version": constants.PROTOCOL_VERSION,
1773
      "config_version": constants.CONFIG_VERSION,
1774
      "os_api_version": constants.OS_API_VERSION,
1775
      "export_version": constants.EXPORT_VERSION,
1776
      "master": self.sstore.GetMasterNode(),
1777
      "architecture": (platform.architecture()[0], platform.machine()),
1778
      "hypervisor_type": self.sstore.GetHypervisorType(),
1779
      }
1780

    
1781
    return result
1782

    
1783

    
1784
class LUDumpClusterConfig(NoHooksLU):
1785
  """Return a text-representation of the cluster-config.
1786

1787
  """
1788
  _OP_REQP = []
1789
  REQ_BGL = False
1790

    
1791
  def ExpandNames(self):
1792
    self.needed_locks = {}
1793

    
1794
  def CheckPrereq(self):
1795
    """No prerequisites.
1796

1797
    """
1798
    pass
1799

    
1800
  def Exec(self, feedback_fn):
1801
    """Dump a representation of the cluster config to the standard output.
1802

1803
    """
1804
    return self.cfg.DumpConfig()
1805

    
1806

    
1807
class LUActivateInstanceDisks(NoHooksLU):
1808
  """Bring up an instance's disks.
1809

1810
  """
1811
  _OP_REQP = ["instance_name"]
1812

    
1813
  def CheckPrereq(self):
1814
    """Check prerequisites.
1815

1816
    This checks that the instance is in the cluster.
1817

1818
    """
1819
    instance = self.cfg.GetInstanceInfo(
1820
      self.cfg.ExpandInstanceName(self.op.instance_name))
1821
    if instance is None:
1822
      raise errors.OpPrereqError("Instance '%s' not known" %
1823
                                 self.op.instance_name)
1824
    self.instance = instance
1825

    
1826

    
1827
  def Exec(self, feedback_fn):
1828
    """Activate the disks.
1829

1830
    """
1831
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1832
    if not disks_ok:
1833
      raise errors.OpExecError("Cannot activate block devices")
1834

    
1835
    return disks_info
1836

    
1837

    
1838
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1839
  """Prepare the block devices for an instance.
1840

1841
  This sets up the block devices on all nodes.
1842

1843
  Args:
1844
    instance: a ganeti.objects.Instance object
1845
    ignore_secondaries: if true, errors on secondary nodes won't result
1846
                        in an error return from the function
1847

1848
  Returns:
1849
    false if the operation failed
1850
    list of (host, instance_visible_name, node_visible_name) if the operation
1851
         suceeded with the mapping from node devices to instance devices
1852
  """
1853
  device_info = []
1854
  disks_ok = True
1855
  iname = instance.name
1856
  # With the two passes mechanism we try to reduce the window of
1857
  # opportunity for the race condition of switching DRBD to primary
1858
  # before handshaking occured, but we do not eliminate it
1859

    
1860
  # The proper fix would be to wait (with some limits) until the
1861
  # connection has been made and drbd transitions from WFConnection
1862
  # into any other network-connected state (Connected, SyncTarget,
1863
  # SyncSource, etc.)
1864

    
1865
  # 1st pass, assemble on all nodes in secondary mode
1866
  for inst_disk in instance.disks:
1867
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1868
      cfg.SetDiskID(node_disk, node)
1869
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1870
      if not result:
1871
        logger.Error("could not prepare block device %s on node %s"
1872
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1873
        if not ignore_secondaries:
1874
          disks_ok = False
1875

    
1876
  # FIXME: race condition on drbd migration to primary
1877

    
1878
  # 2nd pass, do only the primary node
1879
  for inst_disk in instance.disks:
1880
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1881
      if node != instance.primary_node:
1882
        continue
1883
      cfg.SetDiskID(node_disk, node)
1884
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1885
      if not result:
1886
        logger.Error("could not prepare block device %s on node %s"
1887
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1888
        disks_ok = False
1889
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1890

    
1891
  # leave the disks configured for the primary node
1892
  # this is a workaround that would be fixed better by
1893
  # improving the logical/physical id handling
1894
  for disk in instance.disks:
1895
    cfg.SetDiskID(disk, instance.primary_node)
1896

    
1897
  return disks_ok, device_info
1898

    
1899

    
1900
def _StartInstanceDisks(cfg, instance, force):
1901
  """Start the disks of an instance.
1902

1903
  """
1904
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1905
                                           ignore_secondaries=force)
1906
  if not disks_ok:
1907
    _ShutdownInstanceDisks(instance, cfg)
1908
    if force is not None and not force:
1909
      logger.Error("If the message above refers to a secondary node,"
1910
                   " you can retry the operation using '--force'.")
1911
    raise errors.OpExecError("Disk consistency error")
1912

    
1913

    
1914
class LUDeactivateInstanceDisks(NoHooksLU):
1915
  """Shutdown an instance's disks.
1916

1917
  """
1918
  _OP_REQP = ["instance_name"]
1919

    
1920
  def CheckPrereq(self):
1921
    """Check prerequisites.
1922

1923
    This checks that the instance is in the cluster.
1924

1925
    """
1926
    instance = self.cfg.GetInstanceInfo(
1927
      self.cfg.ExpandInstanceName(self.op.instance_name))
1928
    if instance is None:
1929
      raise errors.OpPrereqError("Instance '%s' not known" %
1930
                                 self.op.instance_name)
1931
    self.instance = instance
1932

    
1933
  def Exec(self, feedback_fn):
1934
    """Deactivate the disks
1935

1936
    """
1937
    instance = self.instance
1938
    ins_l = rpc.call_instance_list([instance.primary_node])
1939
    ins_l = ins_l[instance.primary_node]
1940
    if not type(ins_l) is list:
1941
      raise errors.OpExecError("Can't contact node '%s'" %
1942
                               instance.primary_node)
1943

    
1944
    if self.instance.name in ins_l:
1945
      raise errors.OpExecError("Instance is running, can't shutdown"
1946
                               " block devices.")
1947

    
1948
    _ShutdownInstanceDisks(instance, self.cfg)
1949

    
1950

    
1951
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1952
  """Shutdown block devices of an instance.
1953

1954
  This does the shutdown on all nodes of the instance.
1955

1956
  If the ignore_primary is false, errors on the primary node are
1957
  ignored.
1958

1959
  """
1960
  result = True
1961
  for disk in instance.disks:
1962
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1963
      cfg.SetDiskID(top_disk, node)
1964
      if not rpc.call_blockdev_shutdown(node, top_disk):
1965
        logger.Error("could not shutdown block device %s on node %s" %
1966
                     (disk.iv_name, node))
1967
        if not ignore_primary or node != instance.primary_node:
1968
          result = False
1969
  return result
1970

    
1971

    
1972
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1973
  """Checks if a node has enough free memory.
1974

1975
  This function check if a given node has the needed amount of free
1976
  memory. In case the node has less memory or we cannot get the
1977
  information from the node, this function raise an OpPrereqError
1978
  exception.
1979

1980
  Args:
1981
    - cfg: a ConfigWriter instance
1982
    - node: the node name
1983
    - reason: string to use in the error message
1984
    - requested: the amount of memory in MiB
1985

1986
  """
1987
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1988
  if not nodeinfo or not isinstance(nodeinfo, dict):
1989
    raise errors.OpPrereqError("Could not contact node %s for resource"
1990
                             " information" % (node,))
1991

    
1992
  free_mem = nodeinfo[node].get('memory_free')
1993
  if not isinstance(free_mem, int):
1994
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1995
                             " was '%s'" % (node, free_mem))
1996
  if requested > free_mem:
1997
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1998
                             " needed %s MiB, available %s MiB" %
1999
                             (node, reason, requested, free_mem))
2000

    
2001

    
2002
class LUStartupInstance(LogicalUnit):
2003
  """Starts an instance.
2004

2005
  """
2006
  HPATH = "instance-start"
2007
  HTYPE = constants.HTYPE_INSTANCE
2008
  _OP_REQP = ["instance_name", "force"]
2009
  REQ_BGL = False
2010

    
2011
  def ExpandNames(self):
2012
    self._ExpandAndLockInstance()
2013
    self.needed_locks[locking.LEVEL_NODE] = []
2014
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2015

    
2016
  def DeclareLocks(self, level):
2017
    if level == locking.LEVEL_NODE:
2018
      self._LockInstancesNodes()
2019

    
2020
  def BuildHooksEnv(self):
2021
    """Build hooks env.
2022

2023
    This runs on master, primary and secondary nodes of the instance.
2024

2025
    """
2026
    env = {
2027
      "FORCE": self.op.force,
2028
      }
2029
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2030
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2031
          list(self.instance.secondary_nodes))
2032
    return env, nl, nl
2033

    
2034
  def CheckPrereq(self):
2035
    """Check prerequisites.
2036

2037
    This checks that the instance is in the cluster.
2038

2039
    """
2040
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2041
    assert self.instance is not None, \
2042
      "Cannot retrieve locked instance %s" % self.op.instance_name
2043

    
2044
    # check bridges existance
2045
    _CheckInstanceBridgesExist(instance)
2046

    
2047
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2048
                         "starting instance %s" % instance.name,
2049
                         instance.memory)
2050

    
2051
  def Exec(self, feedback_fn):
2052
    """Start the instance.
2053

2054
    """
2055
    instance = self.instance
2056
    force = self.op.force
2057
    extra_args = getattr(self.op, "extra_args", "")
2058

    
2059
    self.cfg.MarkInstanceUp(instance.name)
2060

    
2061
    node_current = instance.primary_node
2062

    
2063
    _StartInstanceDisks(self.cfg, instance, force)
2064

    
2065
    if not rpc.call_instance_start(node_current, instance, extra_args):
2066
      _ShutdownInstanceDisks(instance, self.cfg)
2067
      raise errors.OpExecError("Could not start instance")
2068

    
2069

    
2070
class LURebootInstance(LogicalUnit):
2071
  """Reboot an instance.
2072

2073
  """
2074
  HPATH = "instance-reboot"
2075
  HTYPE = constants.HTYPE_INSTANCE
2076
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2077
  REQ_BGL = False
2078

    
2079
  def ExpandNames(self):
2080
    self._ExpandAndLockInstance()
2081
    self.needed_locks[locking.LEVEL_NODE] = []
2082
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2083

    
2084
  def DeclareLocks(self, level):
2085
    if level == locking.LEVEL_NODE:
2086
      self._LockInstancesNodes()
2087

    
2088
  def BuildHooksEnv(self):
2089
    """Build hooks env.
2090

2091
    This runs on master, primary and secondary nodes of the instance.
2092

2093
    """
2094
    env = {
2095
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2096
      }
2097
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2098
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2099
          list(self.instance.secondary_nodes))
2100
    return env, nl, nl
2101

    
2102
  def CheckPrereq(self):
2103
    """Check prerequisites.
2104

2105
    This checks that the instance is in the cluster.
2106

2107
    """
2108
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2109
    assert self.instance is not None, \
2110
      "Cannot retrieve locked instance %s" % self.op.instance_name
2111

    
2112
    # check bridges existance
2113
    _CheckInstanceBridgesExist(instance)
2114

    
2115
  def Exec(self, feedback_fn):
2116
    """Reboot the instance.
2117

2118
    """
2119
    instance = self.instance
2120
    ignore_secondaries = self.op.ignore_secondaries
2121
    reboot_type = self.op.reboot_type
2122
    extra_args = getattr(self.op, "extra_args", "")
2123

    
2124
    node_current = instance.primary_node
2125

    
2126
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2127
                           constants.INSTANCE_REBOOT_HARD,
2128
                           constants.INSTANCE_REBOOT_FULL]:
2129
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2130
                                  (constants.INSTANCE_REBOOT_SOFT,
2131
                                   constants.INSTANCE_REBOOT_HARD,
2132
                                   constants.INSTANCE_REBOOT_FULL))
2133

    
2134
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2135
                       constants.INSTANCE_REBOOT_HARD]:
2136
      if not rpc.call_instance_reboot(node_current, instance,
2137
                                      reboot_type, extra_args):
2138
        raise errors.OpExecError("Could not reboot instance")
2139
    else:
2140
      if not rpc.call_instance_shutdown(node_current, instance):
2141
        raise errors.OpExecError("could not shutdown instance for full reboot")
2142
      _ShutdownInstanceDisks(instance, self.cfg)
2143
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2144
      if not rpc.call_instance_start(node_current, instance, extra_args):
2145
        _ShutdownInstanceDisks(instance, self.cfg)
2146
        raise errors.OpExecError("Could not start instance for full reboot")
2147

    
2148
    self.cfg.MarkInstanceUp(instance.name)
2149

    
2150

    
2151
class LUShutdownInstance(LogicalUnit):
2152
  """Shutdown an instance.
2153

2154
  """
2155
  HPATH = "instance-stop"
2156
  HTYPE = constants.HTYPE_INSTANCE
2157
  _OP_REQP = ["instance_name"]
2158
  REQ_BGL = False
2159

    
2160
  def ExpandNames(self):
2161
    self._ExpandAndLockInstance()
2162
    self.needed_locks[locking.LEVEL_NODE] = []
2163
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2164

    
2165
  def DeclareLocks(self, level):
2166
    if level == locking.LEVEL_NODE:
2167
      self._LockInstancesNodes()
2168

    
2169
  def BuildHooksEnv(self):
2170
    """Build hooks env.
2171

2172
    This runs on master, primary and secondary nodes of the instance.
2173

2174
    """
2175
    env = _BuildInstanceHookEnvByObject(self.instance)
2176
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2177
          list(self.instance.secondary_nodes))
2178
    return env, nl, nl
2179

    
2180
  def CheckPrereq(self):
2181
    """Check prerequisites.
2182

2183
    This checks that the instance is in the cluster.
2184

2185
    """
2186
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2187
    assert self.instance is not None, \
2188
      "Cannot retrieve locked instance %s" % self.op.instance_name
2189

    
2190
  def Exec(self, feedback_fn):
2191
    """Shutdown the instance.
2192

2193
    """
2194
    instance = self.instance
2195
    node_current = instance.primary_node
2196
    self.cfg.MarkInstanceDown(instance.name)
2197
    if not rpc.call_instance_shutdown(node_current, instance):
2198
      logger.Error("could not shutdown instance")
2199

    
2200
    _ShutdownInstanceDisks(instance, self.cfg)
2201

    
2202

    
2203
class LUReinstallInstance(LogicalUnit):
2204
  """Reinstall an instance.
2205

2206
  """
2207
  HPATH = "instance-reinstall"
2208
  HTYPE = constants.HTYPE_INSTANCE
2209
  _OP_REQP = ["instance_name"]
2210
  REQ_BGL = False
2211

    
2212
  def ExpandNames(self):
2213
    self._ExpandAndLockInstance()
2214
    self.needed_locks[locking.LEVEL_NODE] = []
2215
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2216

    
2217
  def DeclareLocks(self, level):
2218
    if level == locking.LEVEL_NODE:
2219
      self._LockInstancesNodes()
2220

    
2221
  def BuildHooksEnv(self):
2222
    """Build hooks env.
2223

2224
    This runs on master, primary and secondary nodes of the instance.
2225

2226
    """
2227
    env = _BuildInstanceHookEnvByObject(self.instance)
2228
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2229
          list(self.instance.secondary_nodes))
2230
    return env, nl, nl
2231

    
2232
  def CheckPrereq(self):
2233
    """Check prerequisites.
2234

2235
    This checks that the instance is in the cluster and is not running.
2236

2237
    """
2238
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2239
    assert instance is not None, \
2240
      "Cannot retrieve locked instance %s" % self.op.instance_name
2241

    
2242
    if instance.disk_template == constants.DT_DISKLESS:
2243
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2244
                                 self.op.instance_name)
2245
    if instance.status != "down":
2246
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2247
                                 self.op.instance_name)
2248
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2249
    if remote_info:
2250
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2251
                                 (self.op.instance_name,
2252
                                  instance.primary_node))
2253

    
2254
    self.op.os_type = getattr(self.op, "os_type", None)
2255
    if self.op.os_type is not None:
2256
      # OS verification
2257
      pnode = self.cfg.GetNodeInfo(
2258
        self.cfg.ExpandNodeName(instance.primary_node))
2259
      if pnode is None:
2260
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2261
                                   self.op.pnode)
2262
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2263
      if not os_obj:
2264
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2265
                                   " primary node"  % self.op.os_type)
2266

    
2267
    self.instance = instance
2268

    
2269
  def Exec(self, feedback_fn):
2270
    """Reinstall the instance.
2271

2272
    """
2273
    inst = self.instance
2274

    
2275
    if self.op.os_type is not None:
2276
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2277
      inst.os = self.op.os_type
2278
      self.cfg.AddInstance(inst)
2279

    
2280
    _StartInstanceDisks(self.cfg, inst, None)
2281
    try:
2282
      feedback_fn("Running the instance OS create scripts...")
2283
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2284
        raise errors.OpExecError("Could not install OS for instance %s"
2285
                                 " on node %s" %
2286
                                 (inst.name, inst.primary_node))
2287
    finally:
2288
      _ShutdownInstanceDisks(inst, self.cfg)
2289

    
2290

    
2291
class LURenameInstance(LogicalUnit):
2292
  """Rename an instance.
2293

2294
  """
2295
  HPATH = "instance-rename"
2296
  HTYPE = constants.HTYPE_INSTANCE
2297
  _OP_REQP = ["instance_name", "new_name"]
2298

    
2299
  def BuildHooksEnv(self):
2300
    """Build hooks env.
2301

2302
    This runs on master, primary and secondary nodes of the instance.
2303

2304
    """
2305
    env = _BuildInstanceHookEnvByObject(self.instance)
2306
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2307
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2308
          list(self.instance.secondary_nodes))
2309
    return env, nl, nl
2310

    
2311
  def CheckPrereq(self):
2312
    """Check prerequisites.
2313

2314
    This checks that the instance is in the cluster and is not running.
2315

2316
    """
2317
    instance = self.cfg.GetInstanceInfo(
2318
      self.cfg.ExpandInstanceName(self.op.instance_name))
2319
    if instance is None:
2320
      raise errors.OpPrereqError("Instance '%s' not known" %
2321
                                 self.op.instance_name)
2322
    if instance.status != "down":
2323
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2324
                                 self.op.instance_name)
2325
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2326
    if remote_info:
2327
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2328
                                 (self.op.instance_name,
2329
                                  instance.primary_node))
2330
    self.instance = instance
2331

    
2332
    # new name verification
2333
    name_info = utils.HostInfo(self.op.new_name)
2334

    
2335
    self.op.new_name = new_name = name_info.name
2336
    instance_list = self.cfg.GetInstanceList()
2337
    if new_name in instance_list:
2338
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2339
                                 new_name)
2340

    
2341
    if not getattr(self.op, "ignore_ip", False):
2342
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2343
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2344
                                   (name_info.ip, new_name))
2345

    
2346

    
2347
  def Exec(self, feedback_fn):
2348
    """Reinstall the instance.
2349

2350
    """
2351
    inst = self.instance
2352
    old_name = inst.name
2353

    
2354
    if inst.disk_template == constants.DT_FILE:
2355
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2356

    
2357
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2358
    # Change the instance lock. This is definitely safe while we hold the BGL
2359
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2360
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2361

    
2362
    # re-read the instance from the configuration after rename
2363
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2364

    
2365
    if inst.disk_template == constants.DT_FILE:
2366
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2367
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2368
                                                old_file_storage_dir,
2369
                                                new_file_storage_dir)
2370

    
2371
      if not result:
2372
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2373
                                 " directory '%s' to '%s' (but the instance"
2374
                                 " has been renamed in Ganeti)" % (
2375
                                 inst.primary_node, old_file_storage_dir,
2376
                                 new_file_storage_dir))
2377

    
2378
      if not result[0]:
2379
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2380
                                 " (but the instance has been renamed in"
2381
                                 " Ganeti)" % (old_file_storage_dir,
2382
                                               new_file_storage_dir))
2383

    
2384
    _StartInstanceDisks(self.cfg, inst, None)
2385
    try:
2386
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2387
                                          "sda", "sdb"):
2388
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2389
               " instance has been renamed in Ganeti)" %
2390
               (inst.name, inst.primary_node))
2391
        logger.Error(msg)
2392
    finally:
2393
      _ShutdownInstanceDisks(inst, self.cfg)
2394

    
2395

    
2396
class LURemoveInstance(LogicalUnit):
2397
  """Remove an instance.
2398

2399
  """
2400
  HPATH = "instance-remove"
2401
  HTYPE = constants.HTYPE_INSTANCE
2402
  _OP_REQP = ["instance_name", "ignore_failures"]
2403

    
2404
  def BuildHooksEnv(self):
2405
    """Build hooks env.
2406

2407
    This runs on master, primary and secondary nodes of the instance.
2408

2409
    """
2410
    env = _BuildInstanceHookEnvByObject(self.instance)
2411
    nl = [self.sstore.GetMasterNode()]
2412
    return env, nl, nl
2413

    
2414
  def CheckPrereq(self):
2415
    """Check prerequisites.
2416

2417
    This checks that the instance is in the cluster.
2418

2419
    """
2420
    instance = self.cfg.GetInstanceInfo(
2421
      self.cfg.ExpandInstanceName(self.op.instance_name))
2422
    if instance is None:
2423
      raise errors.OpPrereqError("Instance '%s' not known" %
2424
                                 self.op.instance_name)
2425
    self.instance = instance
2426

    
2427
  def Exec(self, feedback_fn):
2428
    """Remove the instance.
2429

2430
    """
2431
    instance = self.instance
2432
    logger.Info("shutting down instance %s on node %s" %
2433
                (instance.name, instance.primary_node))
2434

    
2435
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2436
      if self.op.ignore_failures:
2437
        feedback_fn("Warning: can't shutdown instance")
2438
      else:
2439
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2440
                                 (instance.name, instance.primary_node))
2441

    
2442
    logger.Info("removing block devices for instance %s" % instance.name)
2443

    
2444
    if not _RemoveDisks(instance, self.cfg):
2445
      if self.op.ignore_failures:
2446
        feedback_fn("Warning: can't remove instance's disks")
2447
      else:
2448
        raise errors.OpExecError("Can't remove instance's disks")
2449

    
2450
    logger.Info("removing instance %s out of cluster config" % instance.name)
2451

    
2452
    self.cfg.RemoveInstance(instance.name)
2453
    # Remove the new instance from the Ganeti Lock Manager
2454
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2455

    
2456

    
2457
class LUQueryInstances(NoHooksLU):
2458
  """Logical unit for querying instances.
2459

2460
  """
2461
  _OP_REQP = ["output_fields", "names"]
2462

    
2463
  def CheckPrereq(self):
2464
    """Check prerequisites.
2465

2466
    This checks that the fields required are valid output fields.
2467

2468
    """
2469
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2470
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2471
                               "admin_state", "admin_ram",
2472
                               "disk_template", "ip", "mac", "bridge",
2473
                               "sda_size", "sdb_size", "vcpus", "tags"],
2474
                       dynamic=self.dynamic_fields,
2475
                       selected=self.op.output_fields)
2476

    
2477
    self.wanted = _GetWantedInstances(self, self.op.names)
2478

    
2479
  def Exec(self, feedback_fn):
2480
    """Computes the list of nodes and their attributes.
2481

2482
    """
2483
    instance_names = self.wanted
2484
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2485
                     in instance_names]
2486

    
2487
    # begin data gathering
2488

    
2489
    nodes = frozenset([inst.primary_node for inst in instance_list])
2490

    
2491
    bad_nodes = []
2492
    if self.dynamic_fields.intersection(self.op.output_fields):
2493
      live_data = {}
2494
      node_data = rpc.call_all_instances_info(nodes)
2495
      for name in nodes:
2496
        result = node_data[name]
2497
        if result:
2498
          live_data.update(result)
2499
        elif result == False:
2500
          bad_nodes.append(name)
2501
        # else no instance is alive
2502
    else:
2503
      live_data = dict([(name, {}) for name in instance_names])
2504

    
2505
    # end data gathering
2506

    
2507
    output = []
2508
    for instance in instance_list:
2509
      iout = []
2510
      for field in self.op.output_fields:
2511
        if field == "name":
2512
          val = instance.name
2513
        elif field == "os":
2514
          val = instance.os
2515
        elif field == "pnode":
2516
          val = instance.primary_node
2517
        elif field == "snodes":
2518
          val = list(instance.secondary_nodes)
2519
        elif field == "admin_state":
2520
          val = (instance.status != "down")
2521
        elif field == "oper_state":
2522
          if instance.primary_node in bad_nodes:
2523
            val = None
2524
          else:
2525
            val = bool(live_data.get(instance.name))
2526
        elif field == "status":
2527
          if instance.primary_node in bad_nodes:
2528
            val = "ERROR_nodedown"
2529
          else:
2530
            running = bool(live_data.get(instance.name))
2531
            if running:
2532
              if instance.status != "down":
2533
                val = "running"
2534
              else:
2535
                val = "ERROR_up"
2536
            else:
2537
              if instance.status != "down":
2538
                val = "ERROR_down"
2539
              else:
2540
                val = "ADMIN_down"
2541
        elif field == "admin_ram":
2542
          val = instance.memory
2543
        elif field == "oper_ram":
2544
          if instance.primary_node in bad_nodes:
2545
            val = None
2546
          elif instance.name in live_data:
2547
            val = live_data[instance.name].get("memory", "?")
2548
          else:
2549
            val = "-"
2550
        elif field == "disk_template":
2551
          val = instance.disk_template
2552
        elif field == "ip":
2553
          val = instance.nics[0].ip
2554
        elif field == "bridge":
2555
          val = instance.nics[0].bridge
2556
        elif field == "mac":
2557
          val = instance.nics[0].mac
2558
        elif field == "sda_size" or field == "sdb_size":
2559
          disk = instance.FindDisk(field[:3])
2560
          if disk is None:
2561
            val = None
2562
          else:
2563
            val = disk.size
2564
        elif field == "vcpus":
2565
          val = instance.vcpus
2566
        elif field == "tags":
2567
          val = list(instance.GetTags())
2568
        else:
2569
          raise errors.ParameterError(field)
2570
        iout.append(val)
2571
      output.append(iout)
2572

    
2573
    return output
2574

    
2575

    
2576
class LUFailoverInstance(LogicalUnit):
2577
  """Failover an instance.
2578

2579
  """
2580
  HPATH = "instance-failover"
2581
  HTYPE = constants.HTYPE_INSTANCE
2582
  _OP_REQP = ["instance_name", "ignore_consistency"]
2583
  REQ_BGL = False
2584

    
2585
  def ExpandNames(self):
2586
    self._ExpandAndLockInstance()
2587
    self.needed_locks[locking.LEVEL_NODE] = []
2588
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2589

    
2590
  def DeclareLocks(self, level):
2591
    if level == locking.LEVEL_NODE:
2592
      self._LockInstancesNodes()
2593

    
2594
  def BuildHooksEnv(self):
2595
    """Build hooks env.
2596

2597
    This runs on master, primary and secondary nodes of the instance.
2598

2599
    """
2600
    env = {
2601
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2602
      }
2603
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2604
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2605
    return env, nl, nl
2606

    
2607
  def CheckPrereq(self):
2608
    """Check prerequisites.
2609

2610
    This checks that the instance is in the cluster.
2611

2612
    """
2613
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2614
    assert self.instance is not None, \
2615
      "Cannot retrieve locked instance %s" % self.op.instance_name
2616

    
2617
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2618
      raise errors.OpPrereqError("Instance's disk layout is not"
2619
                                 " network mirrored, cannot failover.")
2620

    
2621
    secondary_nodes = instance.secondary_nodes
2622
    if not secondary_nodes:
2623
      raise errors.ProgrammerError("no secondary node but using "
2624
                                   "a mirrored disk template")
2625

    
2626
    target_node = secondary_nodes[0]
2627
    # check memory requirements on the secondary node
2628
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2629
                         instance.name, instance.memory)
2630

    
2631
    # check bridge existance
2632
    brlist = [nic.bridge for nic in instance.nics]
2633
    if not rpc.call_bridges_exist(target_node, brlist):
2634
      raise errors.OpPrereqError("One or more target bridges %s does not"
2635
                                 " exist on destination node '%s'" %
2636
                                 (brlist, target_node))
2637

    
2638
  def Exec(self, feedback_fn):
2639
    """Failover an instance.
2640

2641
    The failover is done by shutting it down on its present node and
2642
    starting it on the secondary.
2643

2644
    """
2645
    instance = self.instance
2646

    
2647
    source_node = instance.primary_node
2648
    target_node = instance.secondary_nodes[0]
2649

    
2650
    feedback_fn("* checking disk consistency between source and target")
2651
    for dev in instance.disks:
2652
      # for drbd, these are drbd over lvm
2653
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2654
        if instance.status == "up" and not self.op.ignore_consistency:
2655
          raise errors.OpExecError("Disk %s is degraded on target node,"
2656
                                   " aborting failover." % dev.iv_name)
2657

    
2658
    feedback_fn("* shutting down instance on source node")
2659
    logger.Info("Shutting down instance %s on node %s" %
2660
                (instance.name, source_node))
2661

    
2662
    if not rpc.call_instance_shutdown(source_node, instance):
2663
      if self.op.ignore_consistency:
2664
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2665
                     " anyway. Please make sure node %s is down"  %
2666
                     (instance.name, source_node, source_node))
2667
      else:
2668
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2669
                                 (instance.name, source_node))
2670

    
2671
    feedback_fn("* deactivating the instance's disks on source node")
2672
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2673
      raise errors.OpExecError("Can't shut down the instance's disks.")
2674

    
2675
    instance.primary_node = target_node
2676
    # distribute new instance config to the other nodes
2677
    self.cfg.Update(instance)
2678

    
2679
    # Only start the instance if it's marked as up
2680
    if instance.status == "up":
2681
      feedback_fn("* activating the instance's disks on target node")
2682
      logger.Info("Starting instance %s on node %s" %
2683
                  (instance.name, target_node))
2684

    
2685
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2686
                                               ignore_secondaries=True)
2687
      if not disks_ok:
2688
        _ShutdownInstanceDisks(instance, self.cfg)
2689
        raise errors.OpExecError("Can't activate the instance's disks")
2690

    
2691
      feedback_fn("* starting the instance on the target node")
2692
      if not rpc.call_instance_start(target_node, instance, None):
2693
        _ShutdownInstanceDisks(instance, self.cfg)
2694
        raise errors.OpExecError("Could not start instance %s on node %s." %
2695
                                 (instance.name, target_node))
2696

    
2697

    
2698
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2699
  """Create a tree of block devices on the primary node.
2700

2701
  This always creates all devices.
2702

2703
  """
2704
  if device.children:
2705
    for child in device.children:
2706
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2707
        return False
2708

    
2709
  cfg.SetDiskID(device, node)
2710
  new_id = rpc.call_blockdev_create(node, device, device.size,
2711
                                    instance.name, True, info)
2712
  if not new_id:
2713
    return False
2714
  if device.physical_id is None:
2715
    device.physical_id = new_id
2716
  return True
2717

    
2718

    
2719
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2720
  """Create a tree of block devices on a secondary node.
2721

2722
  If this device type has to be created on secondaries, create it and
2723
  all its children.
2724

2725
  If not, just recurse to children keeping the same 'force' value.
2726

2727
  """
2728
  if device.CreateOnSecondary():
2729
    force = True
2730
  if device.children:
2731
    for child in device.children:
2732
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2733
                                        child, force, info):
2734
        return False
2735

    
2736
  if not force:
2737
    return True
2738
  cfg.SetDiskID(device, node)
2739
  new_id = rpc.call_blockdev_create(node, device, device.size,
2740
                                    instance.name, False, info)
2741
  if not new_id:
2742
    return False
2743
  if device.physical_id is None:
2744
    device.physical_id = new_id
2745
  return True
2746

    
2747

    
2748
def _GenerateUniqueNames(cfg, exts):
2749
  """Generate a suitable LV name.
2750

2751
  This will generate a logical volume name for the given instance.
2752

2753
  """
2754
  results = []
2755
  for val in exts:
2756
    new_id = cfg.GenerateUniqueID()
2757
    results.append("%s%s" % (new_id, val))
2758
  return results
2759

    
2760

    
2761
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2762
  """Generate a drbd8 device complete with its children.
2763

2764
  """
2765
  port = cfg.AllocatePort()
2766
  vgname = cfg.GetVGName()
2767
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2768
                          logical_id=(vgname, names[0]))
2769
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2770
                          logical_id=(vgname, names[1]))
2771
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2772
                          logical_id = (primary, secondary, port),
2773
                          children = [dev_data, dev_meta],
2774
                          iv_name=iv_name)
2775
  return drbd_dev
2776

    
2777

    
2778
def _GenerateDiskTemplate(cfg, template_name,
2779
                          instance_name, primary_node,
2780
                          secondary_nodes, disk_sz, swap_sz,
2781
                          file_storage_dir, file_driver):
2782
  """Generate the entire disk layout for a given template type.
2783

2784
  """
2785
  #TODO: compute space requirements
2786

    
2787
  vgname = cfg.GetVGName()
2788
  if template_name == constants.DT_DISKLESS:
2789
    disks = []
2790
  elif template_name == constants.DT_PLAIN:
2791
    if len(secondary_nodes) != 0:
2792
      raise errors.ProgrammerError("Wrong template configuration")
2793

    
2794
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2795
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2796
                           logical_id=(vgname, names[0]),
2797
                           iv_name = "sda")
2798
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2799
                           logical_id=(vgname, names[1]),
2800
                           iv_name = "sdb")
2801
    disks = [sda_dev, sdb_dev]
2802
  elif template_name == constants.DT_DRBD8:
2803
    if len(secondary_nodes) != 1:
2804
      raise errors.ProgrammerError("Wrong template configuration")
2805
    remote_node = secondary_nodes[0]
2806
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2807
                                       ".sdb_data", ".sdb_meta"])
2808
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2809
                                         disk_sz, names[0:2], "sda")
2810
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2811
                                         swap_sz, names[2:4], "sdb")
2812
    disks = [drbd_sda_dev, drbd_sdb_dev]
2813
  elif template_name == constants.DT_FILE:
2814
    if len(secondary_nodes) != 0:
2815
      raise errors.ProgrammerError("Wrong template configuration")
2816

    
2817
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2818
                                iv_name="sda", logical_id=(file_driver,
2819
                                "%s/sda" % file_storage_dir))
2820
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2821
                                iv_name="sdb", logical_id=(file_driver,
2822
                                "%s/sdb" % file_storage_dir))
2823
    disks = [file_sda_dev, file_sdb_dev]
2824
  else:
2825
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2826
  return disks
2827

    
2828

    
2829
def _GetInstanceInfoText(instance):
2830
  """Compute that text that should be added to the disk's metadata.
2831

2832
  """
2833
  return "originstname+%s" % instance.name
2834

    
2835

    
2836
def _CreateDisks(cfg, instance):
2837
  """Create all disks for an instance.
2838

2839
  This abstracts away some work from AddInstance.
2840

2841
  Args:
2842
    instance: the instance object
2843

2844
  Returns:
2845
    True or False showing the success of the creation process
2846

2847
  """
2848
  info = _GetInstanceInfoText(instance)
2849

    
2850
  if instance.disk_template == constants.DT_FILE:
2851
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2852
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2853
                                              file_storage_dir)
2854

    
2855
    if not result:
2856
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2857
      return False
2858

    
2859
    if not result[0]:
2860
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2861
      return False
2862

    
2863
  for device in instance.disks:
2864
    logger.Info("creating volume %s for instance %s" %
2865
                (device.iv_name, instance.name))
2866
    #HARDCODE
2867
    for secondary_node in instance.secondary_nodes:
2868
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2869
                                        device, False, info):
2870
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2871
                     (device.iv_name, device, secondary_node))
2872
        return False
2873
    #HARDCODE
2874
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2875
                                    instance, device, info):
2876
      logger.Error("failed to create volume %s on primary!" %
2877
                   device.iv_name)
2878
      return False
2879

    
2880
  return True
2881

    
2882

    
2883
def _RemoveDisks(instance, cfg):
2884
  """Remove all disks for an instance.
2885

2886
  This abstracts away some work from `AddInstance()` and
2887
  `RemoveInstance()`. Note that in case some of the devices couldn't
2888
  be removed, the removal will continue with the other ones (compare
2889
  with `_CreateDisks()`).
2890

2891
  Args:
2892
    instance: the instance object
2893

2894
  Returns:
2895
    True or False showing the success of the removal proces
2896

2897
  """
2898
  logger.Info("removing block devices for instance %s" % instance.name)
2899

    
2900
  result = True
2901
  for device in instance.disks:
2902
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2903
      cfg.SetDiskID(disk, node)
2904
      if not rpc.call_blockdev_remove(node, disk):
2905
        logger.Error("could not remove block device %s on node %s,"
2906
                     " continuing anyway" %
2907
                     (device.iv_name, node))
2908
        result = False
2909

    
2910
  if instance.disk_template == constants.DT_FILE:
2911
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2912
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2913
                                            file_storage_dir):
2914
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2915
      result = False
2916

    
2917
  return result
2918

    
2919

    
2920
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2921
  """Compute disk size requirements in the volume group
2922

2923
  This is currently hard-coded for the two-drive layout.
2924

2925
  """
2926
  # Required free disk space as a function of disk and swap space
2927
  req_size_dict = {
2928
    constants.DT_DISKLESS: None,
2929
    constants.DT_PLAIN: disk_size + swap_size,
2930
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2931
    constants.DT_DRBD8: disk_size + swap_size + 256,
2932
    constants.DT_FILE: None,
2933
  }
2934

    
2935
  if disk_template not in req_size_dict:
2936
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2937
                                 " is unknown" %  disk_template)
2938

    
2939
  return req_size_dict[disk_template]
2940

    
2941

    
2942
class LUCreateInstance(LogicalUnit):
2943
  """Create an instance.
2944

2945
  """
2946
  HPATH = "instance-add"
2947
  HTYPE = constants.HTYPE_INSTANCE
2948
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2949
              "disk_template", "swap_size", "mode", "start", "vcpus",
2950
              "wait_for_sync", "ip_check", "mac"]
2951

    
2952
  def _RunAllocator(self):
2953
    """Run the allocator based on input opcode.
2954

2955
    """
2956
    disks = [{"size": self.op.disk_size, "mode": "w"},
2957
             {"size": self.op.swap_size, "mode": "w"}]
2958
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2959
             "bridge": self.op.bridge}]
2960
    ial = IAllocator(self.cfg, self.sstore,
2961
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2962
                     name=self.op.instance_name,
2963
                     disk_template=self.op.disk_template,
2964
                     tags=[],
2965
                     os=self.op.os_type,
2966
                     vcpus=self.op.vcpus,
2967
                     mem_size=self.op.mem_size,
2968
                     disks=disks,
2969
                     nics=nics,
2970
                     )
2971

    
2972
    ial.Run(self.op.iallocator)
2973

    
2974
    if not ial.success:
2975
      raise errors.OpPrereqError("Can't compute nodes using"
2976
                                 " iallocator '%s': %s" % (self.op.iallocator,
2977
                                                           ial.info))
2978
    if len(ial.nodes) != ial.required_nodes:
2979
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2980
                                 " of nodes (%s), required %s" %
2981
                                 (len(ial.nodes), ial.required_nodes))
2982
    self.op.pnode = ial.nodes[0]
2983
    logger.ToStdout("Selected nodes for the instance: %s" %
2984
                    (", ".join(ial.nodes),))
2985
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2986
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2987
    if ial.required_nodes == 2:
2988
      self.op.snode = ial.nodes[1]
2989

    
2990
  def BuildHooksEnv(self):
2991
    """Build hooks env.
2992

2993
    This runs on master, primary and secondary nodes of the instance.
2994

2995
    """
2996
    env = {
2997
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2998
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2999
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3000
      "INSTANCE_ADD_MODE": self.op.mode,
3001
      }
3002
    if self.op.mode == constants.INSTANCE_IMPORT:
3003
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3004
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3005
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3006

    
3007
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3008
      primary_node=self.op.pnode,
3009
      secondary_nodes=self.secondaries,
3010
      status=self.instance_status,
3011
      os_type=self.op.os_type,
3012
      memory=self.op.mem_size,
3013
      vcpus=self.op.vcpus,
3014
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3015
    ))
3016

    
3017
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3018
          self.secondaries)
3019
    return env, nl, nl
3020

    
3021

    
3022
  def CheckPrereq(self):
3023
    """Check prerequisites.
3024

3025
    """
3026
    # set optional parameters to none if they don't exist
3027
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3028
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3029
                 "vnc_bind_address"]:
3030
      if not hasattr(self.op, attr):
3031
        setattr(self.op, attr, None)
3032

    
3033
    if self.op.mode not in (constants.INSTANCE_CREATE,
3034
                            constants.INSTANCE_IMPORT):
3035
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3036
                                 self.op.mode)
3037

    
3038
    if (not self.cfg.GetVGName() and
3039
        self.op.disk_template not in constants.DTS_NOT_LVM):
3040
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3041
                                 " instances")
3042

    
3043
    if self.op.mode == constants.INSTANCE_IMPORT:
3044
      src_node = getattr(self.op, "src_node", None)
3045
      src_path = getattr(self.op, "src_path", None)
3046
      if src_node is None or src_path is None:
3047
        raise errors.OpPrereqError("Importing an instance requires source"
3048
                                   " node and path options")
3049
      src_node_full = self.cfg.ExpandNodeName(src_node)
3050
      if src_node_full is None:
3051
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3052
      self.op.src_node = src_node = src_node_full
3053

    
3054
      if not os.path.isabs(src_path):
3055
        raise errors.OpPrereqError("The source path must be absolute")
3056

    
3057
      export_info = rpc.call_export_info(src_node, src_path)
3058

    
3059
      if not export_info:
3060
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3061

    
3062
      if not export_info.has_section(constants.INISECT_EXP):
3063
        raise errors.ProgrammerError("Corrupted export config")
3064

    
3065
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3066
      if (int(ei_version) != constants.EXPORT_VERSION):
3067
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3068
                                   (ei_version, constants.EXPORT_VERSION))
3069

    
3070
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3071
        raise errors.OpPrereqError("Can't import instance with more than"
3072
                                   " one data disk")
3073

    
3074
      # FIXME: are the old os-es, disk sizes, etc. useful?
3075
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3076
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3077
                                                         'disk0_dump'))
3078
      self.src_image = diskimage
3079
    else: # INSTANCE_CREATE
3080
      if getattr(self.op, "os_type", None) is None:
3081
        raise errors.OpPrereqError("No guest OS specified")
3082

    
3083
    #### instance parameters check
3084

    
3085
    # disk template and mirror node verification
3086
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3087
      raise errors.OpPrereqError("Invalid disk template name")
3088

    
3089
    # instance name verification
3090
    hostname1 = utils.HostInfo(self.op.instance_name)
3091

    
3092
    self.op.instance_name = instance_name = hostname1.name
3093
    instance_list = self.cfg.GetInstanceList()
3094
    if instance_name in instance_list:
3095
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3096
                                 instance_name)
3097

    
3098
    # ip validity checks
3099
    ip = getattr(self.op, "ip", None)
3100
    if ip is None or ip.lower() == "none":
3101
      inst_ip = None
3102
    elif ip.lower() == "auto":
3103
      inst_ip = hostname1.ip
3104
    else:
3105
      if not utils.IsValidIP(ip):
3106
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3107
                                   " like a valid IP" % ip)
3108
      inst_ip = ip
3109
    self.inst_ip = self.op.ip = inst_ip
3110

    
3111
    if self.op.start and not self.op.ip_check:
3112
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3113
                                 " adding an instance in start mode")
3114

    
3115
    if self.op.ip_check:
3116
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3117
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3118
                                   (hostname1.ip, instance_name))
3119

    
3120
    # MAC address verification
3121
    if self.op.mac != "auto":
3122
      if not utils.IsValidMac(self.op.mac.lower()):
3123
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3124
                                   self.op.mac)
3125

    
3126
    # bridge verification
3127
    bridge = getattr(self.op, "bridge", None)
3128
    if bridge is None:
3129
      self.op.bridge = self.cfg.GetDefBridge()
3130
    else:
3131
      self.op.bridge = bridge
3132

    
3133
    # boot order verification
3134
    if self.op.hvm_boot_order is not None:
3135
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3136
        raise errors.OpPrereqError("invalid boot order specified,"
3137
                                   " must be one or more of [acdn]")
3138
    # file storage checks
3139
    if (self.op.file_driver and
3140
        not self.op.file_driver in constants.FILE_DRIVER):
3141
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3142
                                 self.op.file_driver)
3143

    
3144
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3145
      raise errors.OpPrereqError("File storage directory not a relative"
3146
                                 " path")
3147
    #### allocator run
3148

    
3149
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3150
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3151
                                 " node must be given")
3152

    
3153
    if self.op.iallocator is not None:
3154
      self._RunAllocator()
3155

    
3156
    #### node related checks
3157

    
3158
    # check primary node
3159
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3160
    if pnode is None:
3161
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3162
                                 self.op.pnode)
3163
    self.op.pnode = pnode.name
3164
    self.pnode = pnode
3165
    self.secondaries = []
3166

    
3167
    # mirror node verification
3168
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3169
      if getattr(self.op, "snode", None) is None:
3170
        raise errors.OpPrereqError("The networked disk templates need"
3171
                                   " a mirror node")
3172

    
3173
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3174
      if snode_name is None:
3175
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3176
                                   self.op.snode)
3177
      elif snode_name == pnode.name:
3178
        raise errors.OpPrereqError("The secondary node cannot be"
3179
                                   " the primary node.")
3180
      self.secondaries.append(snode_name)
3181

    
3182
    req_size = _ComputeDiskSize(self.op.disk_template,
3183
                                self.op.disk_size, self.op.swap_size)
3184

    
3185
    # Check lv size requirements
3186
    if req_size is not None:
3187
      nodenames = [pnode.name] + self.secondaries
3188
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3189
      for node in nodenames:
3190
        info = nodeinfo.get(node, None)
3191
        if not info:
3192
          raise errors.OpPrereqError("Cannot get current information"
3193
                                     " from node '%s'" % node)
3194
        vg_free = info.get('vg_free', None)
3195
        if not isinstance(vg_free, int):
3196
          raise errors.OpPrereqError("Can't compute free disk space on"
3197
                                     " node %s" % node)
3198
        if req_size > info['vg_free']:
3199
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3200
                                     " %d MB available, %d MB required" %
3201
                                     (node, info['vg_free'], req_size))
3202

    
3203
    # os verification
3204
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3205
    if not os_obj:
3206
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3207
                                 " primary node"  % self.op.os_type)
3208

    
3209
    if self.op.kernel_path == constants.VALUE_NONE:
3210
      raise errors.OpPrereqError("Can't set instance kernel to none")
3211

    
3212

    
3213
    # bridge check on primary node
3214
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3215
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3216
                                 " destination node '%s'" %
3217
                                 (self.op.bridge, pnode.name))
3218

    
3219
    # memory check on primary node
3220
    if self.op.start:
3221
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3222
                           "creating instance %s" % self.op.instance_name,
3223
                           self.op.mem_size)
3224

    
3225
    # hvm_cdrom_image_path verification
3226
    if self.op.hvm_cdrom_image_path is not None:
3227
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3228
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3229
                                   " be an absolute path or None, not %s" %
3230
                                   self.op.hvm_cdrom_image_path)
3231
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3232
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3233
                                   " regular file or a symlink pointing to"
3234
                                   " an existing regular file, not %s" %
3235
                                   self.op.hvm_cdrom_image_path)
3236

    
3237
    # vnc_bind_address verification
3238
    if self.op.vnc_bind_address is not None:
3239
      if not utils.IsValidIP(self.op.vnc_bind_address):
3240
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3241
                                   " like a valid IP address" %
3242
                                   self.op.vnc_bind_address)
3243

    
3244
    if self.op.start:
3245
      self.instance_status = 'up'
3246
    else:
3247
      self.instance_status = 'down'
3248

    
3249
  def Exec(self, feedback_fn):
3250
    """Create and add the instance to the cluster.
3251

3252
    """
3253
    instance = self.op.instance_name
3254
    pnode_name = self.pnode.name
3255

    
3256
    if self.op.mac == "auto":
3257
      mac_address = self.cfg.GenerateMAC()
3258
    else:
3259
      mac_address = self.op.mac
3260

    
3261
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3262
    if self.inst_ip is not None:
3263
      nic.ip = self.inst_ip
3264

    
3265
    ht_kind = self.sstore.GetHypervisorType()
3266
    if ht_kind in constants.HTS_REQ_PORT:
3267
      network_port = self.cfg.AllocatePort()
3268
    else:
3269
      network_port = None
3270

    
3271
    if self.op.vnc_bind_address is None:
3272
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3273

    
3274
    # this is needed because os.path.join does not accept None arguments
3275
    if self.op.file_storage_dir is None:
3276
      string_file_storage_dir = ""
3277
    else:
3278
      string_file_storage_dir = self.op.file_storage_dir
3279

    
3280
    # build the full file storage dir path
3281
    file_storage_dir = os.path.normpath(os.path.join(
3282
                                        self.sstore.GetFileStorageDir(),
3283
                                        string_file_storage_dir, instance))
3284

    
3285

    
3286
    disks = _GenerateDiskTemplate(self.cfg,
3287
                                  self.op.disk_template,
3288
                                  instance, pnode_name,
3289
                                  self.secondaries, self.op.disk_size,
3290
                                  self.op.swap_size,
3291
                                  file_storage_dir,
3292
                                  self.op.file_driver)
3293

    
3294
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3295
                            primary_node=pnode_name,
3296
                            memory=self.op.mem_size,
3297
                            vcpus=self.op.vcpus,
3298
                            nics=[nic], disks=disks,
3299
                            disk_template=self.op.disk_template,
3300
                            status=self.instance_status,
3301
                            network_port=network_port,
3302
                            kernel_path=self.op.kernel_path,
3303
                            initrd_path=self.op.initrd_path,
3304
                            hvm_boot_order=self.op.hvm_boot_order,
3305
                            hvm_acpi=self.op.hvm_acpi,
3306
                            hvm_pae=self.op.hvm_pae,
3307
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3308
                            vnc_bind_address=self.op.vnc_bind_address,
3309
                            )
3310

    
3311
    feedback_fn("* creating instance disks...")
3312
    if not _CreateDisks(self.cfg, iobj):
3313
      _RemoveDisks(iobj, self.cfg)
3314
      raise errors.OpExecError("Device creation failed, reverting...")
3315

    
3316
    feedback_fn("adding instance %s to cluster config" % instance)
3317

    
3318
    self.cfg.AddInstance(iobj)
3319
    # Add the new instance to the Ganeti Lock Manager
3320
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3321

    
3322
    if self.op.wait_for_sync:
3323
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3324
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3325
      # make sure the disks are not degraded (still sync-ing is ok)
3326
      time.sleep(15)
3327
      feedback_fn("* checking mirrors status")
3328
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3329
    else:
3330
      disk_abort = False
3331

    
3332
    if disk_abort:
3333
      _RemoveDisks(iobj, self.cfg)
3334
      self.cfg.RemoveInstance(iobj.name)
3335
      # Remove the new instance from the Ganeti Lock Manager
3336
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3337
      raise errors.OpExecError("There are some degraded disks for"
3338
                               " this instance")
3339

    
3340
    feedback_fn("creating os for instance %s on node %s" %
3341
                (instance, pnode_name))
3342

    
3343
    if iobj.disk_template != constants.DT_DISKLESS:
3344
      if self.op.mode == constants.INSTANCE_CREATE:
3345
        feedback_fn("* running the instance OS create scripts...")
3346
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3347
          raise errors.OpExecError("could not add os for instance %s"
3348
                                   " on node %s" %
3349
                                   (instance, pnode_name))
3350

    
3351
      elif self.op.mode == constants.INSTANCE_IMPORT:
3352
        feedback_fn("* running the instance OS import scripts...")
3353
        src_node = self.op.src_node
3354
        src_image = self.src_image
3355
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3356
                                                src_node, src_image):
3357
          raise errors.OpExecError("Could not import os for instance"
3358
                                   " %s on node %s" %
3359
                                   (instance, pnode_name))
3360
      else:
3361
        # also checked in the prereq part
3362
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3363
                                     % self.op.mode)
3364

    
3365
    if self.op.start:
3366
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3367
      feedback_fn("* starting instance...")
3368
      if not rpc.call_instance_start(pnode_name, iobj, None):
3369
        raise errors.OpExecError("Could not start instance")
3370

    
3371

    
3372
class LUConnectConsole(NoHooksLU):
3373
  """Connect to an instance's console.
3374

3375
  This is somewhat special in that it returns the command line that
3376
  you need to run on the master node in order to connect to the
3377
  console.
3378

3379
  """
3380
  _OP_REQP = ["instance_name"]
3381
  REQ_BGL = False
3382

    
3383
  def ExpandNames(self):
3384
    self._ExpandAndLockInstance()
3385

    
3386
  def CheckPrereq(self):
3387
    """Check prerequisites.
3388

3389
    This checks that the instance is in the cluster.
3390

3391
    """
3392
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3393
    assert self.instance is not None, \
3394
      "Cannot retrieve locked instance %s" % self.op.instance_name
3395

    
3396
  def Exec(self, feedback_fn):
3397
    """Connect to the console of an instance
3398

3399
    """
3400
    instance = self.instance
3401
    node = instance.primary_node
3402

    
3403
    node_insts = rpc.call_instance_list([node])[node]
3404
    if node_insts is False:
3405
      raise errors.OpExecError("Can't connect to node %s." % node)
3406

    
3407
    if instance.name not in node_insts:
3408
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3409

    
3410
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3411

    
3412
    hyper = hypervisor.GetHypervisor()
3413
    console_cmd = hyper.GetShellCommandForConsole(instance)
3414

    
3415
    # build ssh cmdline
3416
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3417

    
3418

    
3419
class LUReplaceDisks(LogicalUnit):
3420
  """Replace the disks of an instance.
3421

3422
  """
3423
  HPATH = "mirrors-replace"
3424
  HTYPE = constants.HTYPE_INSTANCE
3425
  _OP_REQP = ["instance_name", "mode", "disks"]
3426

    
3427
  def _RunAllocator(self):
3428
    """Compute a new secondary node using an IAllocator.
3429

3430
    """
3431
    ial = IAllocator(self.cfg, self.sstore,
3432
                     mode=constants.IALLOCATOR_MODE_RELOC,
3433
                     name=self.op.instance_name,
3434
                     relocate_from=[self.sec_node])
3435

    
3436
    ial.Run(self.op.iallocator)
3437

    
3438
    if not ial.success:
3439
      raise errors.OpPrereqError("Can't compute nodes using"
3440
                                 " iallocator '%s': %s" % (self.op.iallocator,
3441
                                                           ial.info))
3442
    if len(ial.nodes) != ial.required_nodes:
3443
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3444
                                 " of nodes (%s), required %s" %
3445
                                 (len(ial.nodes), ial.required_nodes))
3446
    self.op.remote_node = ial.nodes[0]
3447
    logger.ToStdout("Selected new secondary for the instance: %s" %
3448
                    self.op.remote_node)
3449

    
3450
  def BuildHooksEnv(self):
3451
    """Build hooks env.
3452

3453
    This runs on the master, the primary and all the secondaries.
3454

3455
    """
3456
    env = {
3457
      "MODE": self.op.mode,
3458
      "NEW_SECONDARY": self.op.remote_node,
3459
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3460
      }
3461
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3462
    nl = [
3463
      self.sstore.GetMasterNode(),
3464
      self.instance.primary_node,
3465
      ]
3466
    if self.op.remote_node is not None:
3467
      nl.append(self.op.remote_node)
3468
    return env, nl, nl
3469

    
3470
  def CheckPrereq(self):
3471
    """Check prerequisites.
3472

3473
    This checks that the instance is in the cluster.
3474

3475
    """
3476
    if not hasattr(self.op, "remote_node"):
3477
      self.op.remote_node = None
3478

    
3479
    instance = self.cfg.GetInstanceInfo(
3480
      self.cfg.ExpandInstanceName(self.op.instance_name))
3481
    if instance is None:
3482
      raise errors.OpPrereqError("Instance '%s' not known" %
3483
                                 self.op.instance_name)
3484
    self.instance = instance
3485
    self.op.instance_name = instance.name
3486

    
3487
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3488
      raise errors.OpPrereqError("Instance's disk layout is not"
3489
                                 " network mirrored.")
3490

    
3491
    if len(instance.secondary_nodes) != 1:
3492
      raise errors.OpPrereqError("The instance has a strange layout,"
3493
                                 " expected one secondary but found %d" %
3494
                                 len(instance.secondary_nodes))
3495

    
3496
    self.sec_node = instance.secondary_nodes[0]
3497

    
3498
    ia_name = getattr(self.op, "iallocator", None)
3499
    if ia_name is not None:
3500
      if self.op.remote_node is not None:
3501
        raise errors.OpPrereqError("Give either the iallocator or the new"
3502
                                   " secondary, not both")
3503
      self.op.remote_node = self._RunAllocator()
3504

    
3505
    remote_node = self.op.remote_node
3506
    if remote_node is not None:
3507
      remote_node = self.cfg.ExpandNodeName(remote_node)
3508
      if remote_node is None:
3509
        raise errors.OpPrereqError("Node '%s' not known" %
3510
                                   self.op.remote_node)
3511
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3512
    else:
3513
      self.remote_node_info = None
3514
    if remote_node == instance.primary_node:
3515
      raise errors.OpPrereqError("The specified node is the primary node of"
3516
                                 " the instance.")
3517
    elif remote_node == self.sec_node:
3518
      if self.op.mode == constants.REPLACE_DISK_SEC:
3519
        # this is for DRBD8, where we can't execute the same mode of
3520
        # replacement as for drbd7 (no different port allocated)
3521
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3522
                                   " replacement")
3523
    if instance.disk_template == constants.DT_DRBD8:
3524
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3525
          remote_node is not None):
3526
        # switch to replace secondary mode
3527
        self.op.mode = constants.REPLACE_DISK_SEC
3528

    
3529
      if self.op.mode == constants.REPLACE_DISK_ALL:
3530
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3531
                                   " secondary disk replacement, not"
3532
                                   " both at once")
3533
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3534
        if remote_node is not None:
3535
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3536
                                     " the secondary while doing a primary"
3537
                                     " node disk replacement")
3538
        self.tgt_node = instance.primary_node
3539
        self.oth_node = instance.secondary_nodes[0]
3540
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3541
        self.new_node = remote_node # this can be None, in which case
3542
                                    # we don't change the secondary
3543
        self.tgt_node = instance.secondary_nodes[0]
3544
        self.oth_node = instance.primary_node
3545
      else:
3546
        raise errors.ProgrammerError("Unhandled disk replace mode")
3547

    
3548
    for name in self.op.disks:
3549
      if instance.FindDisk(name) is None:
3550
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3551
                                   (name, instance.name))
3552
    self.op.remote_node = remote_node
3553

    
3554
  def _ExecD8DiskOnly(self, feedback_fn):
3555
    """Replace a disk on the primary or secondary for dbrd8.
3556

3557
    The algorithm for replace is quite complicated:
3558
      - for each disk to be replaced:
3559
        - create new LVs on the target node with unique names
3560
        - detach old LVs from the drbd device
3561
        - rename old LVs to name_replaced.<time_t>
3562
        - rename new LVs to old LVs
3563
        - attach the new LVs (with the old names now) to the drbd device
3564
      - wait for sync across all devices
3565
      - for each modified disk:
3566
        - remove old LVs (which have the name name_replaces.<time_t>)
3567

3568
    Failures are not very well handled.
3569

3570
    """
3571
    steps_total = 6
3572
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3573
    instance = self.instance
3574
    iv_names = {}
3575
    vgname = self.cfg.GetVGName()
3576
    # start of work
3577
    cfg = self.cfg
3578
    tgt_node = self.tgt_node
3579
    oth_node = self.oth_node
3580

    
3581
    # Step: check device activation
3582
    self.proc.LogStep(1, steps_total, "check device existence")
3583
    info("checking volume groups")
3584
    my_vg = cfg.GetVGName()
3585
    results = rpc.call_vg_list([oth_node, tgt_node])
3586
    if not results:
3587
      raise errors.OpExecError("Can't list volume groups on the nodes")
3588
    for node in oth_node, tgt_node:
3589
      res = results.get(node, False)
3590
      if not res or my_vg not in res:
3591
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3592
                                 (my_vg, node))
3593
    for dev in instance.disks:
3594
      if not dev.iv_name in self.op.disks:
3595
        continue
3596
      for node in tgt_node, oth_node:
3597
        info("checking %s on %s" % (dev.iv_name, node))
3598
        cfg.SetDiskID(dev, node)
3599
        if not rpc.call_blockdev_find(node, dev):
3600
          raise errors.OpExecError("Can't find device %s on node %s" %
3601
                                   (dev.iv_name, node))
3602

    
3603
    # Step: check other node consistency
3604
    self.proc.LogStep(2, steps_total, "check peer consistency")
3605
    for dev in instance.disks:
3606
      if not dev.iv_name in self.op.disks:
3607
        continue
3608
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3609
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3610
                                   oth_node==instance.primary_node):
3611
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3612
                                 " to replace disks on this node (%s)" %
3613
                                 (oth_node, tgt_node))
3614

    
3615
    # Step: create new storage
3616
    self.proc.LogStep(3, steps_total, "allocate new storage")
3617
    for dev in instance.disks:
3618
      if not dev.iv_name in self.op.disks:
3619
        continue
3620
      size = dev.size
3621
      cfg.SetDiskID(dev, tgt_node)
3622
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3623
      names = _GenerateUniqueNames(cfg, lv_names)
3624
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3625
                             logical_id=(vgname, names[0]))
3626
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3627
                             logical_id=(vgname, names[1]))
3628
      new_lvs = [lv_data, lv_meta]
3629
      old_lvs = dev.children
3630
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3631
      info("creating new local storage on %s for %s" %
3632
           (tgt_node, dev.iv_name))
3633
      # since we *always* want to create this LV, we use the
3634
      # _Create...OnPrimary (which forces the creation), even if we
3635
      # are talking about the secondary node
3636
      for new_lv in new_lvs:
3637
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3638
                                        _GetInstanceInfoText(instance)):
3639
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3640
                                   " node '%s'" %
3641
                                   (new_lv.logical_id[1], tgt_node))
3642

    
3643
    # Step: for each lv, detach+rename*2+attach
3644
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3645
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3646
      info("detaching %s drbd from local storage" % dev.iv_name)
3647
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3648
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3649
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3650
      #dev.children = []
3651
      #cfg.Update(instance)
3652

    
3653
      # ok, we created the new LVs, so now we know we have the needed
3654
      # storage; as such, we proceed on the target node to rename
3655
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3656
      # using the assumption that logical_id == physical_id (which in
3657
      # turn is the unique_id on that node)
3658

    
3659
      # FIXME(iustin): use a better name for the replaced LVs
3660
      temp_suffix = int(time.time())
3661
      ren_fn = lambda d, suff: (d.physical_id[0],
3662
                                d.physical_id[1] + "_replaced-%s" % suff)
3663
      # build the rename list based on what LVs exist on the node
3664
      rlist = []
3665
      for to_ren in old_lvs:
3666
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3667
        if find_res is not None: # device exists
3668
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3669

    
3670
      info("renaming the old LVs on the target node")
3671
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3672
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3673
      # now we rename the new LVs to the old LVs
3674
      info("renaming the new LVs on the target node")
3675
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3676
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3677
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3678

    
3679
      for old, new in zip(old_lvs, new_lvs):
3680
        new.logical_id = old.logical_id
3681
        cfg.SetDiskID(new, tgt_node)
3682

    
3683
      for disk in old_lvs:
3684
        disk.logical_id = ren_fn(disk, temp_suffix)
3685
        cfg.SetDiskID(disk, tgt_node)
3686

    
3687
      # now that the new lvs have the old name, we can add them to the device
3688
      info("adding new mirror component on %s" % tgt_node)
3689
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3690
        for new_lv in new_lvs:
3691
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3692
            warning("Can't rollback device %s", hint="manually cleanup unused"
3693
                    " logical volumes")
3694
        raise errors.OpExecError("Can't add local storage to drbd")
3695

    
3696
      dev.children = new_lvs
3697
      cfg.Update(instance)
3698

    
3699
    # Step: wait for sync
3700

    
3701
    # this can fail as the old devices are degraded and _WaitForSync
3702
    # does a combined result over all disks, so we don't check its
3703
    # return value
3704
    self.proc.LogStep(5, steps_total, "sync devices")
3705
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3706

    
3707
    # so check manually all the devices
3708
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3709
      cfg.SetDiskID(dev, instance.primary_node)
3710
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3711
      if is_degr:
3712
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3713

    
3714
    # Step: remove old storage
3715
    self.proc.LogStep(6, steps_total, "removing old storage")
3716
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3717
      info("remove logical volumes for %s" % name)
3718
      for lv in old_lvs:
3719
        cfg.SetDiskID(lv, tgt_node)
3720
        if not rpc.call_blockdev_remove(tgt_node, lv):
3721
          warning("Can't remove old LV", hint="manually remove unused LVs")
3722
          continue
3723

    
3724
  def _ExecD8Secondary(self, feedback_fn):
3725
    """Replace the secondary node for drbd8.
3726

3727
    The algorithm for replace is quite complicated:
3728
      - for all disks of the instance:
3729
        - create new LVs on the new node with same names
3730
        - shutdown the drbd device on the old secondary
3731
        - disconnect the drbd network on the primary
3732
        - create the drbd device on the new secondary
3733
        - network attach the drbd on the primary, using an artifice:
3734
          the drbd code for Attach() will connect to the network if it
3735
          finds a device which is connected to the good local disks but
3736
          not network enabled
3737
      - wait for sync across all devices
3738
      - remove all disks from the old secondary
3739

3740
    Failures are not very well handled.
3741

3742
    """
3743
    steps_total = 6
3744
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3745
    instance = self.instance
3746
    iv_names = {}
3747
    vgname = self.cfg.GetVGName()
3748
    # start of work
3749
    cfg = self.cfg
3750
    old_node = self.tgt_node
3751
    new_node = self.new_node
3752
    pri_node = instance.primary_node
3753

    
3754
    # Step: check device activation
3755
    self.proc.LogStep(1, steps_total, "check device existence")
3756
    info("checking volume groups")
3757
    my_vg = cfg.GetVGName()
3758
    results = rpc.call_vg_list([pri_node, new_node])
3759
    if not results:
3760
      raise errors.OpExecError("Can't list volume groups on the nodes")
3761
    for node in pri_node, new_node:
3762
      res = results.get(node, False)
3763
      if not res or my_vg not in res:
3764
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3765
                                 (my_vg, node))
3766
    for dev in instance.disks:
3767
      if not dev.iv_name in self.op.disks:
3768
        continue
3769
      info("checking %s on %s" % (dev.iv_name, pri_node))
3770
      cfg.SetDiskID(dev, pri_node)
3771
      if not rpc.call_blockdev_find(pri_node, dev):
3772
        raise errors.OpExecError("Can't find device %s on node %s" %
3773
                                 (dev.iv_name, pri_node))
3774

    
3775
    # Step: check other node consistency
3776
    self.proc.LogStep(2, steps_total, "check peer consistency")
3777
    for dev in instance.disks:
3778
      if not dev.iv_name in self.op.disks:
3779
        continue
3780
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3781
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3782
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3783
                                 " unsafe to replace the secondary" %
3784
                                 pri_node)
3785

    
3786
    # Step: create new storage
3787
    self.proc.LogStep(3, steps_total, "allocate new storage")
3788
    for dev in instance.disks:
3789
      size = dev.size
3790
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3791
      # since we *always* want to create this LV, we use the
3792
      # _Create...OnPrimary (which forces the creation), even if we
3793
      # are talking about the secondary node
3794
      for new_lv in dev.children:
3795
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3796
                                        _GetInstanceInfoText(instance)):
3797
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3798
                                   " node '%s'" %
3799
                                   (new_lv.logical_id[1], new_node))
3800

    
3801
      iv_names[dev.iv_name] = (dev, dev.children)
3802

    
3803
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3804
    for dev in instance.disks:
3805
      size = dev.size
3806
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3807
      # create new devices on new_node
3808
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3809
                              logical_id=(pri_node, new_node,
3810
                                          dev.logical_id[2]),
3811
                              children=dev.children)
3812
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3813
                                        new_drbd, False,
3814
                                      _GetInstanceInfoText(instance)):
3815
        raise errors.OpExecError("Failed to create new DRBD on"
3816
                                 " node '%s'" % new_node)
3817

    
3818
    for dev in instance.disks:
3819
      # we have new devices, shutdown the drbd on the old secondary
3820
      info("shutting down drbd for %s on old node" % dev.iv_name)
3821
      cfg.SetDiskID(dev, old_node)
3822
      if not rpc.call_blockdev_shutdown(old_node, dev):
3823
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3824
                hint="Please cleanup this device manually as soon as possible")
3825

    
3826
    info("detaching primary drbds from the network (=> standalone)")
3827
    done = 0
3828
    for dev in instance.disks:
3829
      cfg.SetDiskID(dev, pri_node)
3830
      # set the physical (unique in bdev terms) id to None, meaning
3831
      # detach from network
3832
      dev.physical_id = (None,) * len(dev.physical_id)
3833
      # and 'find' the device, which will 'fix' it to match the
3834
      # standalone state
3835
      if rpc.call_blockdev_find(pri_node, dev):
3836
        done += 1
3837
      else:
3838
        warning("Failed to detach drbd %s from network, unusual case" %
3839
                dev.iv_name)
3840

    
3841
    if not done:
3842
      # no detaches succeeded (very unlikely)
3843
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3844

    
3845
    # if we managed to detach at least one, we update all the disks of
3846
    # the instance to point to the new secondary
3847
    info("updating instance configuration")
3848
    for dev in instance.disks:
3849
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3850
      cfg.SetDiskID(dev, pri_node)
3851
    cfg.Update(instance)
3852

    
3853
    # and now perform the drbd attach
3854
    info("attaching primary drbds to new secondary (standalone => connected)")
3855
    failures = []
3856
    for dev in instance.disks:
3857
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3858
      # since the attach is smart, it's enough to 'find' the device,
3859
      # it will automatically activate the network, if the physical_id
3860
      # is correct
3861
      cfg.SetDiskID(dev, pri_node)
3862
      if not rpc.call_blockdev_find(pri_node, dev):
3863
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3864
                "please do a gnt-instance info to see the status of disks")
3865

    
3866
    # this can fail as the old devices are degraded and _WaitForSync
3867
    # does a combined result over all disks, so we don't check its
3868
    # return value
3869
    self.proc.LogStep(5, steps_total, "sync devices")
3870
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3871

    
3872
    # so check manually all the devices
3873
    for name, (dev, old_lvs) in iv_names.iteritems():
3874
      cfg.SetDiskID(dev, pri_node)
3875
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3876
      if is_degr:
3877
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3878

    
3879
    self.proc.LogStep(6, steps_total, "removing old storage")
3880
    for name, (dev, old_lvs) in iv_names.iteritems():
3881
      info("remove logical volumes for %s" % name)
3882
      for lv in old_lvs:
3883
        cfg.SetDiskID(lv, old_node)
3884
        if not rpc.call_blockdev_remove(old_node, lv):
3885
          warning("Can't remove LV on old secondary",
3886
                  hint="Cleanup stale volumes by hand")
3887

    
3888
  def Exec(self, feedback_fn):
3889
    """Execute disk replacement.
3890

3891
    This dispatches the disk replacement to the appropriate handler.
3892

3893
    """
3894
    instance = self.instance
3895

    
3896
    # Activate the instance disks if we're replacing them on a down instance
3897
    if instance.status == "down":
3898
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3899
      self.proc.ChainOpCode(op)
3900

    
3901
    if instance.disk_template == constants.DT_DRBD8:
3902
      if self.op.remote_node is None:
3903
        fn = self._ExecD8DiskOnly
3904
      else:
3905
        fn = self._ExecD8Secondary
3906
    else:
3907
      raise errors.ProgrammerError("Unhandled disk replacement case")
3908

    
3909
    ret = fn(feedback_fn)
3910

    
3911
    # Deactivate the instance disks if we're replacing them on a down instance
3912
    if instance.status == "down":
3913
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3914
      self.proc.ChainOpCode(op)
3915

    
3916
    return ret
3917

    
3918

    
3919
class LUGrowDisk(LogicalUnit):
3920
  """Grow a disk of an instance.
3921

3922
  """
3923
  HPATH = "disk-grow"
3924
  HTYPE = constants.HTYPE_INSTANCE
3925
  _OP_REQP = ["instance_name", "disk", "amount"]
3926

    
3927
  def BuildHooksEnv(self):
3928
    """Build hooks env.
3929

3930
    This runs on the master, the primary and all the secondaries.
3931

3932
    """
3933
    env = {
3934
      "DISK": self.op.disk,
3935
      "AMOUNT": self.op.amount,
3936
      }
3937
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3938
    nl = [
3939
      self.sstore.GetMasterNode(),
3940
      self.instance.primary_node,
3941
      ]
3942
    return env, nl, nl
3943

    
3944
  def CheckPrereq(self):
3945
    """Check prerequisites.
3946

3947
    This checks that the instance is in the cluster.
3948

3949
    """
3950
    instance = self.cfg.GetInstanceInfo(
3951
      self.cfg.ExpandInstanceName(self.op.instance_name))
3952
    if instance is None:
3953
      raise errors.OpPrereqError("Instance '%s' not known" %
3954
                                 self.op.instance_name)
3955
    self.instance = instance
3956
    self.op.instance_name = instance.name
3957

    
3958
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3959
      raise errors.OpPrereqError("Instance's disk layout does not support"
3960
                                 " growing.")
3961

    
3962
    if instance.FindDisk(self.op.disk) is None:
3963
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3964
                                 (self.op.disk, instance.name))
3965

    
3966
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3967
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3968
    for node in nodenames:
3969
      info = nodeinfo.get(node, None)
3970
      if not info:
3971
        raise errors.OpPrereqError("Cannot get current information"
3972
                                   " from node '%s'" % node)
3973
      vg_free = info.get('vg_free', None)
3974
      if not isinstance(vg_free, int):
3975
        raise errors.OpPrereqError("Can't compute free disk space on"
3976
                                   " node %s" % node)
3977
      if self.op.amount > info['vg_free']:
3978
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3979
                                   " %d MiB available, %d MiB required" %
3980
                                   (node, info['vg_free'], self.op.amount))
3981

    
3982
  def Exec(self, feedback_fn):
3983
    """Execute disk grow.
3984

3985
    """
3986
    instance = self.instance
3987
    disk = instance.FindDisk(self.op.disk)
3988
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3989
      self.cfg.SetDiskID(disk, node)
3990
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3991
      if not result or not isinstance(result, tuple) or len(result) != 2:
3992
        raise errors.OpExecError("grow request failed to node %s" % node)
3993
      elif not result[0]:
3994
        raise errors.OpExecError("grow request failed to node %s: %s" %
3995
                                 (node, result[1]))
3996
    disk.RecordGrow(self.op.amount)
3997
    self.cfg.Update(instance)
3998
    return
3999

    
4000

    
4001
class LUQueryInstanceData(NoHooksLU):
4002
  """Query runtime instance data.
4003

4004
  """
4005
  _OP_REQP = ["instances"]
4006

    
4007
  def CheckPrereq(self):
4008
    """Check prerequisites.
4009

4010
    This only checks the optional instance list against the existing names.
4011

4012
    """
4013
    if not isinstance(self.op.instances, list):
4014
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4015
    if self.op.instances:
4016
      self.wanted_instances = []
4017
      names = self.op.instances
4018
      for name in names:
4019
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4020
        if instance is None:
4021
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4022
        self.wanted_instances.append(instance)
4023
    else:
4024
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4025
                               in self.cfg.GetInstanceList()]
4026
    return
4027

    
4028

    
4029
  def _ComputeDiskStatus(self, instance, snode, dev):
4030
    """Compute block device status.
4031

4032
    """
4033
    self.cfg.SetDiskID(dev, instance.primary_node)
4034
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4035
    if dev.dev_type in constants.LDS_DRBD:
4036
      # we change the snode then (otherwise we use the one passed in)
4037
      if dev.logical_id[0] == instance.primary_node:
4038
        snode = dev.logical_id[1]
4039
      else:
4040
        snode = dev.logical_id[0]
4041

    
4042
    if snode:
4043
      self.cfg.SetDiskID(dev, snode)
4044
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4045
    else:
4046
      dev_sstatus = None
4047

    
4048
    if dev.children:
4049
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4050
                      for child in dev.children]
4051
    else:
4052
      dev_children = []
4053

    
4054
    data = {
4055
      "iv_name": dev.iv_name,
4056
      "dev_type": dev.dev_type,
4057
      "logical_id": dev.logical_id,
4058
      "physical_id": dev.physical_id,
4059
      "pstatus": dev_pstatus,
4060
      "sstatus": dev_sstatus,
4061
      "children": dev_children,
4062
      }
4063

    
4064
    return data
4065

    
4066
  def Exec(self, feedback_fn):
4067
    """Gather and return data"""
4068
    result = {}
4069
    for instance in self.wanted_instances:
4070
      remote_info = rpc.call_instance_info(instance.primary_node,
4071
                                                instance.name)
4072
      if remote_info and "state" in remote_info:
4073
        remote_state = "up"
4074
      else:
4075
        remote_state = "down"
4076
      if instance.status == "down":
4077
        config_state = "down"
4078
      else:
4079
        config_state = "up"
4080

    
4081
      disks = [self._ComputeDiskStatus(instance, None, device)
4082
               for device in instance.disks]
4083

    
4084
      idict = {
4085
        "name": instance.name,
4086
        "config_state": config_state,
4087
        "run_state": remote_state,
4088
        "pnode": instance.primary_node,
4089
        "snodes": instance.secondary_nodes,
4090
        "os": instance.os,
4091
        "memory": instance.memory,
4092
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4093
        "disks": disks,
4094
        "vcpus": instance.vcpus,
4095
        }
4096

    
4097
      htkind = self.sstore.GetHypervisorType()
4098
      if htkind == constants.HT_XEN_PVM30:
4099
        idict["kernel_path"] = instance.kernel_path
4100
        idict["initrd_path"] = instance.initrd_path
4101

    
4102
      if htkind == constants.HT_XEN_HVM31:
4103
        idict["hvm_boot_order"] = instance.hvm_boot_order
4104
        idict["hvm_acpi"] = instance.hvm_acpi
4105
        idict["hvm_pae"] = instance.hvm_pae
4106
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4107

    
4108
      if htkind in constants.HTS_REQ_PORT:
4109
        idict["vnc_bind_address"] = instance.vnc_bind_address
4110
        idict["network_port"] = instance.network_port
4111

    
4112
      result[instance.name] = idict
4113

    
4114
    return result
4115

    
4116

    
4117
class LUSetInstanceParams(LogicalUnit):
4118
  """Modifies an instances's parameters.
4119

4120
  """
4121
  HPATH = "instance-modify"
4122
  HTYPE = constants.HTYPE_INSTANCE
4123
  _OP_REQP = ["instance_name"]
4124
  REQ_BGL = False
4125

    
4126
  def ExpandNames(self):
4127
    self._ExpandAndLockInstance()
4128

    
4129
  def BuildHooksEnv(self):
4130
    """Build hooks env.
4131

4132
    This runs on the master, primary and secondaries.
4133

4134
    """
4135
    args = dict()
4136
    if self.mem:
4137
      args['memory'] = self.mem
4138
    if self.vcpus:
4139
      args['vcpus'] = self.vcpus
4140
    if self.do_ip or self.do_bridge or self.mac:
4141
      if self.do_ip:
4142
        ip = self.ip
4143
      else:
4144
        ip = self.instance.nics[0].ip
4145
      if self.bridge:
4146
        bridge = self.bridge
4147
      else:
4148
        bridge = self.instance.nics[0].bridge
4149
      if self.mac:
4150
        mac = self.mac
4151
      else:
4152
        mac = self.instance.nics[0].mac
4153
      args['nics'] = [(ip, bridge, mac)]
4154
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4155
    nl = [self.sstore.GetMasterNode(),
4156
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4157
    return env, nl, nl
4158

    
4159
  def CheckPrereq(self):
4160
    """Check prerequisites.
4161

4162
    This only checks the instance list against the existing names.
4163

4164
    """
4165
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4166
    # a separate CheckArguments function, if we implement one, so the operation
4167
    # can be aborted without waiting for any lock, should it have an error...
4168
    self.mem = getattr(self.op, "mem", None)
4169
    self.vcpus = getattr(self.op, "vcpus", None)
4170
    self.ip = getattr(self.op, "ip", None)
4171
    self.mac = getattr(self.op, "mac", None)
4172
    self.bridge = getattr(self.op, "bridge", None)
4173
    self.kernel_path = getattr(self.op, "kernel_path", None)
4174
    self.initrd_path = getattr(self.op, "initrd_path", None)
4175
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4176
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4177
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4178
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4179
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4180
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4181
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4182
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4183
                 self.vnc_bind_address]
4184
    if all_parms.count(None) == len(all_parms):
4185
      raise errors.OpPrereqError("No changes submitted")
4186
    if self.mem is not None:
4187
      try:
4188
        self.mem = int(self.mem)
4189
      except ValueError, err:
4190
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4191
    if self.vcpus is not None:
4192
      try:
4193
        self.vcpus = int(self.vcpus)
4194
      except ValueError, err:
4195
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4196
    if self.ip is not None:
4197
      self.do_ip = True
4198
      if self.ip.lower() == "none":
4199
        self.ip = None
4200
      else:
4201
        if not utils.IsValidIP(self.ip):
4202
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4203
    else:
4204
      self.do_ip = False
4205
    self.do_bridge = (self.bridge is not None)
4206
    if self.mac is not None:
4207
      if self.cfg.IsMacInUse(self.mac):
4208
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4209
                                   self.mac)
4210
      if not utils.IsValidMac(self.mac):
4211
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4212

    
4213
    if self.kernel_path is not None:
4214
      self.do_kernel_path = True
4215
      if self.kernel_path == constants.VALUE_NONE:
4216
        raise errors.OpPrereqError("Can't set instance to no kernel")
4217

    
4218
      if self.kernel_path != constants.VALUE_DEFAULT:
4219
        if not os.path.isabs(self.kernel_path):
4220
          raise errors.OpPrereqError("The kernel path must be an absolute"
4221
                                    " filename")
4222
    else:
4223
      self.do_kernel_path = False
4224

    
4225
    if self.initrd_path is not None:
4226
      self.do_initrd_path = True
4227
      if self.initrd_path not in (constants.VALUE_NONE,
4228
                                  constants.VALUE_DEFAULT):
4229
        if not os.path.isabs(self.initrd_path):
4230
          raise errors.OpPrereqError("The initrd path must be an absolute"
4231
                                    " filename")
4232
    else:
4233
      self.do_initrd_path = False
4234

    
4235
    # boot order verification
4236
    if self.hvm_boot_order is not None:
4237
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4238
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4239
          raise errors.OpPrereqError("invalid boot order specified,"
4240
                                     " must be one or more of [acdn]"
4241
                                     " or 'default'")
4242

    
4243
    # hvm_cdrom_image_path verification
4244
    if self.op.hvm_cdrom_image_path is not None:
4245
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4246
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4247
                                   " be an absolute path or None, not %s" %
4248
                                   self.op.hvm_cdrom_image_path)
4249
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4250
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4251
                                   " regular file or a symlink pointing to"
4252
                                   " an existing regular file, not %s" %
4253
                                   self.op.hvm_cdrom_image_path)
4254

    
4255
    # vnc_bind_address verification
4256
    if self.op.vnc_bind_address is not None:
4257
      if not utils.IsValidIP(self.op.vnc_bind_address):
4258
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4259
                                   " like a valid IP address" %
4260
                                   self.op.vnc_bind_address)
4261

    
4262
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4263
    assert self.instance is not None, \
4264
      "Cannot retrieve locked instance %s" % self.op.instance_name
4265
    return
4266

    
4267
  def Exec(self, feedback_fn):
4268
    """Modifies an instance.
4269

4270
    All parameters take effect only at the next restart of the instance.
4271
    """
4272
    result = []
4273
    instance = self.instance
4274
    if self.mem:
4275
      instance.memory = self.mem
4276
      result.append(("mem", self.mem))
4277
    if self.vcpus:
4278
      instance.vcpus = self.vcpus
4279
      result.append(("vcpus",  self.vcpus))
4280
    if self.do_ip:
4281
      instance.nics[0].ip = self.ip
4282
      result.append(("ip", self.ip))
4283
    if self.bridge:
4284
      instance.nics[0].bridge = self.bridge
4285
      result.append(("bridge", self.bridge))
4286
    if self.mac:
4287
      instance.nics[0].mac = self.mac
4288
      result.append(("mac", self.mac))
4289
    if self.do_kernel_path:
4290
      instance.kernel_path = self.kernel_path
4291
      result.append(("kernel_path", self.kernel_path))
4292
    if self.do_initrd_path:
4293
      instance.initrd_path = self.initrd_path
4294
      result.append(("initrd_path", self.initrd_path))
4295
    if self.hvm_boot_order:
4296
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4297
        instance.hvm_boot_order = None
4298
      else:
4299
        instance.hvm_boot_order = self.hvm_boot_order
4300
      result.append(("hvm_boot_order", self.hvm_boot_order))
4301
    if self.hvm_acpi:
4302
      instance.hvm_acpi = self.hvm_acpi
4303
      result.append(("hvm_acpi", self.hvm_acpi))
4304
    if self.hvm_pae:
4305
      instance.hvm_pae = self.hvm_pae
4306
      result.append(("hvm_pae", self.hvm_pae))
4307
    if self.hvm_cdrom_image_path:
4308
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4309
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4310
    if self.vnc_bind_address:
4311
      instance.vnc_bind_address = self.vnc_bind_address
4312
      result.append(("vnc_bind_address", self.vnc_bind_address))
4313

    
4314
    self.cfg.Update(instance)
4315

    
4316
    return result
4317

    
4318

    
4319
class LUQueryExports(NoHooksLU):
4320
  """Query the exports list
4321

4322
  """
4323
  _OP_REQP = []
4324

    
4325
  def CheckPrereq(self):
4326
    """Check that the nodelist contains only existing nodes.
4327

4328
    """
4329
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4330

    
4331
  def Exec(self, feedback_fn):
4332
    """Compute the list of all the exported system images.
4333

4334
    Returns:
4335
      a dictionary with the structure node->(export-list)
4336
      where export-list is a list of the instances exported on
4337
      that node.
4338

4339
    """
4340
    return rpc.call_export_list(self.nodes)
4341

    
4342

    
4343
class LUExportInstance(LogicalUnit):
4344
  """Export an instance to an image in the cluster.
4345

4346
  """
4347
  HPATH = "instance-export"
4348
  HTYPE = constants.HTYPE_INSTANCE
4349
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4350

    
4351
  def BuildHooksEnv(self):
4352
    """Build hooks env.
4353

4354
    This will run on the master, primary node and target node.
4355

4356
    """
4357
    env = {
4358
      "EXPORT_NODE": self.op.target_node,
4359
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4360
      }
4361
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4362
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4363
          self.op.target_node]
4364
    return env, nl, nl
4365

    
4366
  def CheckPrereq(self):
4367
    """Check prerequisites.
4368

4369
    This checks that the instance and node names are valid.
4370

4371
    """
4372
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4373
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4374
    if self.instance is None:
4375
      raise errors.OpPrereqError("Instance '%s' not found" %
4376
                                 self.op.instance_name)
4377

    
4378
    # node verification
4379
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4380
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4381

    
4382
    if self.dst_node is None:
4383
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4384
                                 self.op.target_node)
4385
    self.op.target_node = self.dst_node.name
4386

    
4387
    # instance disk type verification
4388
    for disk in self.instance.disks:
4389
      if disk.dev_type == constants.LD_FILE:
4390
        raise errors.OpPrereqError("Export not supported for instances with"
4391
                                   " file-based disks")
4392

    
4393
  def Exec(self, feedback_fn):
4394
    """Export an instance to an image in the cluster.
4395

4396
    """
4397
    instance = self.instance
4398
    dst_node = self.dst_node
4399
    src_node = instance.primary_node
4400
    if self.op.shutdown:
4401
      # shutdown the instance, but not the disks
4402
      if not rpc.call_instance_shutdown(src_node, instance):
4403
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4404
                                 (instance.name, src_node))
4405

    
4406
    vgname = self.cfg.GetVGName()
4407

    
4408
    snap_disks = []
4409

    
4410
    try:
4411
      for disk in instance.disks:
4412
        if disk.iv_name == "sda":
4413
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4414
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4415

    
4416
          if not new_dev_name:
4417
            logger.Error("could not snapshot block device %s on node %s" %
4418
                         (disk.logical_id[1], src_node))
4419
          else:
4420
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4421
                                      logical_id=(vgname, new_dev_name),
4422
                                      physical_id=(vgname, new_dev_name),
4423
                                      iv_name=disk.iv_name)
4424
            snap_disks.append(new_dev)
4425

    
4426
    finally:
4427
      if self.op.shutdown and instance.status == "up":
4428
        if not rpc.call_instance_start(src_node, instance, None):
4429
          _ShutdownInstanceDisks(instance, self.cfg)
4430
          raise errors.OpExecError("Could not start instance")
4431

    
4432
    # TODO: check for size
4433

    
4434
    for dev in snap_disks:
4435
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4436
        logger.Error("could not export block device %s from node %s to node %s"
4437
                     % (dev.logical_id[1], src_node, dst_node.name))
4438
      if not rpc.call_blockdev_remove(src_node, dev):
4439
        logger.Error("could not remove snapshot block device %s from node %s" %
4440
                     (dev.logical_id[1], src_node))
4441

    
4442
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4443
      logger.Error("could not finalize export for instance %s on node %s" %
4444
                   (instance.name, dst_node.name))
4445

    
4446
    nodelist = self.cfg.GetNodeList()
4447
    nodelist.remove(dst_node.name)
4448

    
4449
    # on one-node clusters nodelist will be empty after the removal
4450
    # if we proceed the backup would be removed because OpQueryExports
4451
    # substitutes an empty list with the full cluster node list.
4452
    if nodelist:
4453
      op = opcodes.OpQueryExports(nodes=nodelist)
4454
      exportlist = self.proc.ChainOpCode(op)
4455
      for node in exportlist:
4456
        if instance.name in exportlist[node]:
4457
          if not rpc.call_export_remove(node, instance.name):
4458
            logger.Error("could not remove older export for instance %s"
4459
                         " on node %s" % (instance.name, node))
4460

    
4461

    
4462
class LURemoveExport(NoHooksLU):
4463
  """Remove exports related to the named instance.
4464

4465
  """
4466
  _OP_REQP = ["instance_name"]
4467

    
4468
  def CheckPrereq(self):
4469
    """Check prerequisites.
4470
    """
4471
    pass
4472

    
4473
  def Exec(self, feedback_fn):
4474
    """Remove any export.
4475

4476
    """
4477
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4478
    # If the instance was not found we'll try with the name that was passed in.
4479
    # This will only work if it was an FQDN, though.
4480
    fqdn_warn = False
4481
    if not instance_name:
4482
      fqdn_warn = True
4483
      instance_name = self.op.instance_name
4484

    
4485
    op = opcodes.OpQueryExports(nodes=[])
4486
    exportlist = self.proc.ChainOpCode(op)
4487
    found = False
4488
    for node in exportlist:
4489
      if instance_name in exportlist[node]:
4490
        found = True
4491
        if not rpc.call_export_remove(node, instance_name):
4492
          logger.Error("could not remove export for instance %s"
4493
                       " on node %s" % (instance_name, node))
4494

    
4495
    if fqdn_warn and not found:
4496
      feedback_fn("Export not found. If trying to remove an export belonging"
4497
                  " to a deleted instance please use its Fully Qualified"
4498
                  " Domain Name.")
4499

    
4500

    
4501
class TagsLU(NoHooksLU):
4502
  """Generic tags LU.
4503

4504
  This is an abstract class which is the parent of all the other tags LUs.
4505

4506
  """
4507
  def CheckPrereq(self):
4508
    """Check prerequisites.
4509

4510
    """
4511
    if self.op.kind == constants.TAG_CLUSTER:
4512
      self.target = self.cfg.GetClusterInfo()
4513
    elif self.op.kind == constants.TAG_NODE:
4514
      name = self.cfg.ExpandNodeName(self.op.name)
4515
      if name is None:
4516
        raise errors.OpPrereqError("Invalid node name (%s)" %
4517
                                   (self.op.name,))
4518
      self.op.name = name
4519
      self.target = self.cfg.GetNodeInfo(name)
4520
    elif self.op.kind == constants.TAG_INSTANCE:
4521
      name = self.cfg.ExpandInstanceName(self.op.name)
4522
      if name is None:
4523
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4524
                                   (self.op.name,))
4525
      self.op.name = name
4526
      self.target = self.cfg.GetInstanceInfo(name)
4527
    else:
4528
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4529
                                 str(self.op.kind))
4530

    
4531

    
4532
class LUGetTags(TagsLU):
4533
  """Returns the tags of a given object.
4534

4535
  """
4536
  _OP_REQP = ["kind", "name"]
4537

    
4538
  def Exec(self, feedback_fn):
4539
    """Returns the tag list.
4540

4541
    """
4542
    return list(self.target.GetTags())
4543

    
4544

    
4545
class LUSearchTags(NoHooksLU):
4546
  """Searches the tags for a given pattern.
4547

4548
  """
4549
  _OP_REQP = ["pattern"]
4550

    
4551
  def CheckPrereq(self):
4552
    """Check prerequisites.
4553

4554
    This checks the pattern passed for validity by compiling it.
4555

4556
    """
4557
    try:
4558
      self.re = re.compile(self.op.pattern)
4559
    except re.error, err:
4560
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4561
                                 (self.op.pattern, err))
4562

    
4563
  def Exec(self, feedback_fn):
4564
    """Returns the tag list.
4565

4566
    """
4567
    cfg = self.cfg
4568
    tgts = [("/cluster", cfg.GetClusterInfo())]
4569
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4570
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4571
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4572
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4573
    results = []
4574
    for path, target in tgts:
4575
      for tag in target.GetTags():
4576
        if self.re.search(tag):
4577
          results.append((path, tag))
4578
    return results
4579

    
4580

    
4581
class LUAddTags(TagsLU):
4582
  """Sets a tag on a given object.
4583

4584
  """
4585
  _OP_REQP = ["kind", "name", "tags"]
4586

    
4587
  def CheckPrereq(self):
4588
    """Check prerequisites.
4589

4590
    This checks the type and length of the tag name and value.
4591

4592
    """
4593
    TagsLU.CheckPrereq(self)
4594
    for tag in self.op.tags:
4595
      objects.TaggableObject.ValidateTag(tag)
4596

    
4597
  def Exec(self, feedback_fn):
4598
    """Sets the tag.
4599

4600
    """
4601
    try:
4602
      for tag in self.op.tags:
4603
        self.target.AddTag(tag)
4604
    except errors.TagError, err:
4605
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4606
    try:
4607
      self.cfg.Update(self.target)
4608
    except errors.ConfigurationError:
4609
      raise errors.OpRetryError("There has been a modification to the"
4610
                                " config file and the operation has been"
4611
                                " aborted. Please retry.")
4612

    
4613

    
4614
class LUDelTags(TagsLU):
4615
  """Delete a list of tags from a given object.
4616

4617
  """
4618
  _OP_REQP = ["kind", "name", "tags"]
4619

    
4620
  def CheckPrereq(self):
4621
    """Check prerequisites.
4622

4623
    This checks that we have the given tag.
4624

4625
    """
4626
    TagsLU.CheckPrereq(self)
4627
    for tag in self.op.tags:
4628
      objects.TaggableObject.ValidateTag(tag)
4629
    del_tags = frozenset(self.op.tags)
4630
    cur_tags = self.target.GetTags()
4631
    if not del_tags <= cur_tags:
4632
      diff_tags = del_tags - cur_tags
4633
      diff_names = ["'%s'" % tag for tag in diff_tags]
4634
      diff_names.sort()
4635
      raise errors.OpPrereqError("Tag(s) %s not found" %
4636
                                 (",".join(diff_names)))
4637

    
4638
  def Exec(self, feedback_fn):
4639
    """Remove the tag from the object.
4640

4641
    """
4642
    for tag in self.op.tags:
4643
      self.target.RemoveTag(tag)
4644
    try:
4645
      self.cfg.Update(self.target)
4646
    except errors.ConfigurationError:
4647
      raise errors.OpRetryError("There has been a modification to the"
4648
                                " config file and the operation has been"
4649
                                " aborted. Please retry.")
4650

    
4651

    
4652
class LUTestDelay(NoHooksLU):
4653
  """Sleep for a specified amount of time.
4654

4655
  This LU sleeps on the master and/or nodes for a specified amount of
4656
  time.
4657

4658
  """
4659
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4660
  REQ_BGL = False
4661

    
4662
  def ExpandNames(self):
4663
    """Expand names and set required locks.
4664

4665
    This expands the node list, if any.
4666

4667
    """
4668
    self.needed_locks = {}
4669
    if self.op.on_nodes:
4670
      # _GetWantedNodes can be used here, but is not always appropriate to use
4671
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4672
      # more information.
4673
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4674
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4675

    
4676
  def CheckPrereq(self):
4677
    """Check prerequisites.
4678

4679
    """
4680

    
4681
  def Exec(self, feedback_fn):
4682
    """Do the actual sleep.
4683

4684
    """
4685
    if self.op.on_master:
4686
      if not utils.TestDelay(self.op.duration):
4687
        raise errors.OpExecError("Error during master delay test")
4688
    if self.op.on_nodes:
4689
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4690
      if not result:
4691
        raise errors.OpExecError("Complete failure from rpc call")
4692
      for node, node_result in result.items():
4693
        if not node_result:
4694
          raise errors.OpExecError("Failure during rpc call to node %s,"
4695
                                   " result: %s" % (node, node_result))
4696

    
4697

    
4698
class IAllocator(object):
4699
  """IAllocator framework.
4700

4701
  An IAllocator instance has three sets of attributes:
4702
    - cfg/sstore that are needed to query the cluster
4703
    - input data (all members of the _KEYS class attribute are required)
4704
    - four buffer attributes (in|out_data|text), that represent the
4705
      input (to the external script) in text and data structure format,
4706
      and the output from it, again in two formats
4707
    - the result variables from the script (success, info, nodes) for
4708
      easy usage
4709

4710
  """
4711
  _ALLO_KEYS = [
4712
    "mem_size", "disks", "disk_template",
4713
    "os", "tags", "nics", "vcpus",
4714
    ]
4715
  _RELO_KEYS = [
4716
    "relocate_from",
4717
    ]
4718

    
4719
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4720
    self.cfg = cfg
4721
    self.sstore = sstore
4722
    # init buffer variables
4723
    self.in_text = self.out_text = self.in_data = self.out_data = None
4724
    # init all input fields so that pylint is happy
4725
    self.mode = mode
4726
    self.name = name
4727
    self.mem_size = self.disks = self.disk_template = None
4728
    self.os = self.tags = self.nics = self.vcpus = None
4729
    self.relocate_from = None
4730
    # computed fields
4731
    self.required_nodes = None
4732
    # init result fields
4733
    self.success = self.info = self.nodes = None
4734
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4735
      keyset = self._ALLO_KEYS
4736
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4737
      keyset = self._RELO_KEYS
4738
    else:
4739
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4740
                                   " IAllocator" % self.mode)
4741
    for key in kwargs:
4742
      if key not in keyset:
4743
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4744
                                     " IAllocator" % key)
4745
      setattr(self, key, kwargs[key])
4746
    for key in keyset:
4747
      if key not in kwargs:
4748
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4749
                                     " IAllocator" % key)
4750
    self._BuildInputData()
4751

    
4752
  def _ComputeClusterData(self):
4753
    """Compute the generic allocator input data.
4754

4755
    This is the data that is independent of the actual operation.
4756

4757
    """
4758
    cfg = self.cfg
4759
    # cluster data
4760
    data = {
4761
      "version": 1,
4762
      "cluster_name": self.sstore.GetClusterName(),
4763
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4764
      "hypervisor_type": self.sstore.GetHypervisorType(),
4765
      # we don't have job IDs
4766
      }
4767

    
4768
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4769

    
4770
    # node data
4771
    node_results = {}
4772
    node_list = cfg.GetNodeList()
4773
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4774
    for nname in node_list:
4775
      ninfo = cfg.GetNodeInfo(nname)
4776
      if nname not in node_data or not isinstance(node_data[nname], dict):
4777
        raise errors.OpExecError("Can't get data for node %s" % nname)
4778
      remote_info = node_data[nname]
4779
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4780
                   'vg_size', 'vg_free', 'cpu_total']:
4781
        if attr not in remote_info:
4782
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4783
                                   (nname, attr))
4784
        try:
4785
          remote_info[attr] = int(remote_info[attr])
4786
        except ValueError, err:
4787
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4788
                                   " %s" % (nname, attr, str(err)))
4789
      # compute memory used by primary instances
4790
      i_p_mem = i_p_up_mem = 0
4791
      for iinfo in i_list:
4792
        if iinfo.primary_node == nname:
4793
          i_p_mem += iinfo.memory
4794
          if iinfo.status == "up":
4795
            i_p_up_mem += iinfo.memory
4796

    
4797
      # compute memory used by instances
4798
      pnr = {
4799
        "tags": list(ninfo.GetTags()),
4800
        "total_memory": remote_info['memory_total'],
4801
        "reserved_memory": remote_info['memory_dom0'],
4802
        "free_memory": remote_info['memory_free'],
4803
        "i_pri_memory": i_p_mem,
4804
        "i_pri_up_memory": i_p_up_mem,
4805
        "total_disk": remote_info['vg_size'],
4806
        "free_disk": remote_info['vg_free'],
4807
        "primary_ip": ninfo.primary_ip,
4808
        "secondary_ip": ninfo.secondary_ip,
4809
        "total_cpus": remote_info['cpu_total'],
4810
        }
4811
      node_results[nname] = pnr
4812
    data["nodes"] = node_results
4813

    
4814
    # instance data
4815
    instance_data = {}
4816
    for iinfo in i_list:
4817
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4818
                  for n in iinfo.nics]
4819
      pir = {
4820
        "tags": list(iinfo.GetTags()),
4821
        "should_run": iinfo.status == "up",
4822
        "vcpus": iinfo.vcpus,
4823
        "memory": iinfo.memory,
4824
        "os": iinfo.os,
4825
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4826
        "nics": nic_data,
4827
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4828
        "disk_template": iinfo.disk_template,
4829
        }
4830
      instance_data[iinfo.name] = pir
4831

    
4832
    data["instances"] = instance_data
4833

    
4834
    self.in_data = data
4835

    
4836
  def _AddNewInstance(self):
4837
    """Add new instance data to allocator structure.
4838

4839
    This in combination with _AllocatorGetClusterData will create the
4840
    correct structure needed as input for the allocator.
4841

4842
    The checks for the completeness of the opcode must have already been
4843
    done.
4844

4845
    """
4846
    data = self.in_data
4847
    if len(self.disks) != 2:
4848
      raise errors.OpExecError("Only two-disk configurations supported")
4849

    
4850
    disk_space = _ComputeDiskSize(self.disk_template,
4851
                                  self.disks[0]["size"], self.disks[1]["size"])
4852

    
4853
    if self.disk_template in constants.DTS_NET_MIRROR:
4854
      self.required_nodes = 2
4855
    else:
4856
      self.required_nodes = 1
4857
    request = {
4858
      "type": "allocate",
4859
      "name": self.name,
4860
      "disk_template": self.disk_template,
4861
      "tags": self.tags,
4862
      "os": self.os,
4863
      "vcpus": self.vcpus,
4864
      "memory": self.mem_size,
4865
      "disks": self.disks,
4866
      "disk_space_total": disk_space,
4867
      "nics": self.nics,
4868
      "required_nodes": self.required_nodes,
4869
      }
4870
    data["request"] = request
4871

    
4872
  def _AddRelocateInstance(self):
4873
    """Add relocate instance data to allocator structure.
4874

4875
    This in combination with _IAllocatorGetClusterData will create the
4876
    correct structure needed as input for the allocator.
4877

4878
    The checks for the completeness of the opcode must have already been
4879
    done.
4880

4881
    """
4882
    instance = self.cfg.GetInstanceInfo(self.name)
4883
    if instance is None:
4884
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4885
                                   " IAllocator" % self.name)
4886

    
4887
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4888
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4889

    
4890
    if len(instance.secondary_nodes) != 1:
4891
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4892

    
4893
    self.required_nodes = 1
4894

    
4895
    disk_space = _ComputeDiskSize(instance.disk_template,
4896
                                  instance.disks[0].size,
4897
                                  instance.disks[1].size)
4898

    
4899
    request = {
4900
      "type": "relocate",
4901
      "name": self.name,
4902
      "disk_space_total": disk_space,
4903
      "required_nodes": self.required_nodes,
4904
      "relocate_from": self.relocate_from,
4905
      }
4906
    self.in_data["request"] = request
4907

    
4908
  def _BuildInputData(self):
4909
    """Build input data structures.
4910

4911
    """
4912
    self._ComputeClusterData()
4913

    
4914
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4915
      self._AddNewInstance()
4916
    else:
4917
      self._AddRelocateInstance()
4918

    
4919
    self.in_text = serializer.Dump(self.in_data)
4920

    
4921
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4922
    """Run an instance allocator and return the results.
4923

4924
    """
4925
    data = self.in_text
4926

    
4927
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4928

    
4929
    if not isinstance(result, tuple) or len(result) != 4:
4930
      raise errors.OpExecError("Invalid result from master iallocator runner")
4931

    
4932
    rcode, stdout, stderr, fail = result
4933

    
4934
    if rcode == constants.IARUN_NOTFOUND:
4935
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4936
    elif rcode == constants.IARUN_FAILURE:
4937
      raise errors.OpExecError("Instance allocator call failed: %s,"
4938
                               " output: %s" % (fail, stdout+stderr))
4939
    self.out_text = stdout
4940
    if validate:
4941
      self._ValidateResult()
4942

    
4943
  def _ValidateResult(self):
4944
    """Process the allocator results.
4945

4946
    This will process and if successful save the result in
4947
    self.out_data and the other parameters.
4948

4949
    """
4950
    try:
4951
      rdict = serializer.Load(self.out_text)
4952
    except Exception, err:
4953
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4954

    
4955
    if not isinstance(rdict, dict):
4956
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4957

    
4958
    for key in "success", "info", "nodes":
4959
      if key not in rdict:
4960
        raise errors.OpExecError("Can't parse iallocator results:"
4961
                                 " missing key '%s'" % key)
4962
      setattr(self, key, rdict[key])
4963

    
4964
    if not isinstance(rdict["nodes"], list):
4965
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4966
                               " is not a list")
4967
    self.out_data = rdict
4968

    
4969

    
4970
class LUTestAllocator(NoHooksLU):
4971
  """Run allocator tests.
4972

4973
  This LU runs the allocator tests
4974

4975
  """
4976
  _OP_REQP = ["direction", "mode", "name"]
4977

    
4978
  def CheckPrereq(self):
4979
    """Check prerequisites.
4980

4981
    This checks the opcode parameters depending on the director and mode test.
4982

4983
    """
4984
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4985
      for attr in ["name", "mem_size", "disks", "disk_template",
4986
                   "os", "tags", "nics", "vcpus"]:
4987
        if not hasattr(self.op, attr):
4988
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4989
                                     attr)
4990
      iname = self.cfg.ExpandInstanceName(self.op.name)
4991
      if iname is not None:
4992
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4993
                                   iname)
4994
      if not isinstance(self.op.nics, list):
4995
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4996
      for row in self.op.nics:
4997
        if (not isinstance(row, dict) or
4998
            "mac" not in row or
4999
            "ip" not in row or
5000
            "bridge" not in row):
5001
          raise errors.OpPrereqError("Invalid contents of the"
5002
                                     " 'nics' parameter")
5003
      if not isinstance(self.op.disks, list):
5004
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5005
      if len(self.op.disks) != 2:
5006
        raise errors.OpPrereqError("Only two-disk configurations supported")
5007
      for row in self.op.disks:
5008
        if (not isinstance(row, dict) or
5009
            "size" not in row or
5010
            not isinstance(row["size"], int) or
5011
            "mode" not in row or
5012
            row["mode"] not in ['r', 'w']):
5013
          raise errors.OpPrereqError("Invalid contents of the"
5014
                                     " 'disks' parameter")
5015
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5016
      if not hasattr(self.op, "name"):
5017
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5018
      fname = self.cfg.ExpandInstanceName(self.op.name)
5019
      if fname is None:
5020
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5021
                                   self.op.name)
5022
      self.op.name = fname
5023
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5024
    else:
5025
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5026
                                 self.op.mode)
5027

    
5028
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5029
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5030
        raise errors.OpPrereqError("Missing allocator name")
5031
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5032
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5033
                                 self.op.direction)
5034

    
5035
  def Exec(self, feedback_fn):
5036
    """Run the allocator test.
5037

5038
    """
5039
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5040
      ial = IAllocator(self.cfg, self.sstore,
5041
                       mode=self.op.mode,
5042
                       name=self.op.name,
5043
                       mem_size=self.op.mem_size,
5044
                       disks=self.op.disks,
5045
                       disk_template=self.op.disk_template,
5046
                       os=self.op.os,
5047
                       tags=self.op.tags,
5048
                       nics=self.op.nics,
5049
                       vcpus=self.op.vcpus,
5050
                       )
5051
    else:
5052
      ial = IAllocator(self.cfg, self.sstore,
5053
                       mode=self.op.mode,
5054
                       name=self.op.name,
5055
                       relocate_from=list(self.relocate_from),
5056
                       )
5057

    
5058
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5059
      result = ial.in_text
5060
    else:
5061
      ial.Run(self.op.allocator, validate=False)
5062
      result = ial.out_text
5063
    return result