Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 34290825

History | View | Annotate | Download (172.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    # Used to force good behavior when calling helper functions
86
    self.recalculate_locks = {}
87
    self.__ssh = None
88

    
89
    for attr_name in self._OP_REQP:
90
      attr_val = getattr(op, attr_name, None)
91
      if attr_val is None:
92
        raise errors.OpPrereqError("Required parameter '%s' missing" %
93
                                   attr_name)
94

    
95
    if not self.cfg.IsCluster():
96
      raise errors.OpPrereqError("Cluster not initialized yet,"
97
                                 " use 'gnt-cluster init' first.")
98
    if self.REQ_MASTER:
99
      master = sstore.GetMasterNode()
100
      if master != utils.HostInfo().name:
101
        raise errors.OpPrereqError("Commands must be run on the master"
102
                                   " node %s" % master)
103

    
104
  def __GetSSH(self):
105
    """Returns the SshRunner object
106

107
    """
108
    if not self.__ssh:
109
      self.__ssh = ssh.SshRunner(self.sstore)
110
    return self.__ssh
111

    
112
  ssh = property(fget=__GetSSH)
113

    
114
  def ExpandNames(self):
115
    """Expand names for this LU.
116

117
    This method is called before starting to execute the opcode, and it should
118
    update all the parameters of the opcode to their canonical form (e.g. a
119
    short node name must be fully expanded after this method has successfully
120
    completed). This way locking, hooks, logging, ecc. can work correctly.
121

122
    LUs which implement this method must also populate the self.needed_locks
123
    member, as a dict with lock levels as keys, and a list of needed lock names
124
    as values. Rules:
125
      - Use an empty dict if you don't need any lock
126
      - If you don't need any lock at a particular level omit that level
127
      - Don't put anything for the BGL level
128
      - If you want all locks at a level use None as a value
129
        (this reflects what LockSet does, and will be replaced before
130
        CheckPrereq with the full list of nodes that have been locked)
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: None,
140
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265
  def _LockInstancesNodes(self):
266
    """Helper function to declare instances' nodes for locking.
267

268
    This function should be called after locking one or more instances to lock
269
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270
    with all primary or secondary nodes for instances already locked and
271
    present in self.needed_locks[locking.LEVEL_INSTANCE].
272

273
    It should be called from DeclareLocks, and for safety only works if
274
    self.recalculate_locks[locking.LEVEL_NODE] is set.
275

276
    In the future it may grow parameters to just lock some instance's nodes, or
277
    to just lock primaries or secondary nodes, if needed.
278

279
    If should be called in DeclareLocks in a way similar to:
280

281
    if level == locking.LEVEL_NODE:
282
      self._LockInstancesNodes()
283

284
    """
285
    assert locking.LEVEL_NODE in self.recalculate_locks, \
286
      "_LockInstancesNodes helper function called with no nodes to recalculate"
287

    
288
    # TODO: check if we're really been called with the instance locks held
289

    
290
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
291
    # future we might want to have different behaviors depending on the value
292
    # of self.recalculate_locks[locking.LEVEL_NODE]
293
    wanted_nodes = []
294
    for instance_name in self.needed_locks[locking.LEVEL_INSTANCE]:
295
      instance = self.context.cfg.GetInstanceInfo(instance_name)
296
      wanted_nodes.append(instance.primary_node)
297
      wanted_nodes.extend(instance.secondary_nodes)
298
    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
299

    
300
    del self.recalculate_locks[locking.LEVEL_NODE]
301

    
302

    
303
class NoHooksLU(LogicalUnit):
304
  """Simple LU which runs no hooks.
305

306
  This LU is intended as a parent for other LogicalUnits which will
307
  run no hooks, in order to reduce duplicate code.
308

309
  """
310
  HPATH = None
311
  HTYPE = None
312

    
313

    
314
def _GetWantedNodes(lu, nodes):
315
  """Returns list of checked and expanded node names.
316

317
  Args:
318
    nodes: List of nodes (strings) or None for all
319

320
  """
321
  if not isinstance(nodes, list):
322
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
323

    
324
  if nodes:
325
    wanted = []
326

    
327
    for name in nodes:
328
      node = lu.cfg.ExpandNodeName(name)
329
      if node is None:
330
        raise errors.OpPrereqError("No such node name '%s'" % name)
331
      wanted.append(node)
332

    
333
  else:
334
    wanted = lu.cfg.GetNodeList()
335
  return utils.NiceSort(wanted)
336

    
337

    
338
def _GetWantedInstances(lu, instances):
339
  """Returns list of checked and expanded instance names.
340

341
  Args:
342
    instances: List of instances (strings) or None for all
343

344
  """
345
  if not isinstance(instances, list):
346
    raise errors.OpPrereqError("Invalid argument type 'instances'")
347

    
348
  if instances:
349
    wanted = []
350

    
351
    for name in instances:
352
      instance = lu.cfg.ExpandInstanceName(name)
353
      if instance is None:
354
        raise errors.OpPrereqError("No such instance name '%s'" % name)
355
      wanted.append(instance)
356

    
357
  else:
358
    wanted = lu.cfg.GetInstanceList()
359
  return utils.NiceSort(wanted)
360

    
361

    
362
def _CheckOutputFields(static, dynamic, selected):
363
  """Checks whether all selected fields are valid.
364

365
  Args:
366
    static: Static fields
367
    dynamic: Dynamic fields
368

369
  """
370
  static_fields = frozenset(static)
371
  dynamic_fields = frozenset(dynamic)
372

    
373
  all_fields = static_fields | dynamic_fields
374

    
375
  if not all_fields.issuperset(selected):
376
    raise errors.OpPrereqError("Unknown output fields selected: %s"
377
                               % ",".join(frozenset(selected).
378
                                          difference(all_fields)))
379

    
380

    
381
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
382
                          memory, vcpus, nics):
383
  """Builds instance related env variables for hooks from single variables.
384

385
  Args:
386
    secondary_nodes: List of secondary nodes as strings
387
  """
388
  env = {
389
    "OP_TARGET": name,
390
    "INSTANCE_NAME": name,
391
    "INSTANCE_PRIMARY": primary_node,
392
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
393
    "INSTANCE_OS_TYPE": os_type,
394
    "INSTANCE_STATUS": status,
395
    "INSTANCE_MEMORY": memory,
396
    "INSTANCE_VCPUS": vcpus,
397
  }
398

    
399
  if nics:
400
    nic_count = len(nics)
401
    for idx, (ip, bridge, mac) in enumerate(nics):
402
      if ip is None:
403
        ip = ""
404
      env["INSTANCE_NIC%d_IP" % idx] = ip
405
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
406
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
407
  else:
408
    nic_count = 0
409

    
410
  env["INSTANCE_NIC_COUNT"] = nic_count
411

    
412
  return env
413

    
414

    
415
def _BuildInstanceHookEnvByObject(instance, override=None):
416
  """Builds instance related env variables for hooks from an object.
417

418
  Args:
419
    instance: objects.Instance object of instance
420
    override: dict of values to override
421
  """
422
  args = {
423
    'name': instance.name,
424
    'primary_node': instance.primary_node,
425
    'secondary_nodes': instance.secondary_nodes,
426
    'os_type': instance.os,
427
    'status': instance.os,
428
    'memory': instance.memory,
429
    'vcpus': instance.vcpus,
430
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
431
  }
432
  if override:
433
    args.update(override)
434
  return _BuildInstanceHookEnv(**args)
435

    
436

    
437
def _CheckInstanceBridgesExist(instance):
438
  """Check that the brigdes needed by an instance exist.
439

440
  """
441
  # check bridges existance
442
  brlist = [nic.bridge for nic in instance.nics]
443
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
444
    raise errors.OpPrereqError("one or more target bridges %s does not"
445
                               " exist on destination node '%s'" %
446
                               (brlist, instance.primary_node))
447

    
448

    
449
class LUDestroyCluster(NoHooksLU):
450
  """Logical unit for destroying the cluster.
451

452
  """
453
  _OP_REQP = []
454

    
455
  def CheckPrereq(self):
456
    """Check prerequisites.
457

458
    This checks whether the cluster is empty.
459

460
    Any errors are signalled by raising errors.OpPrereqError.
461

462
    """
463
    master = self.sstore.GetMasterNode()
464

    
465
    nodelist = self.cfg.GetNodeList()
466
    if len(nodelist) != 1 or nodelist[0] != master:
467
      raise errors.OpPrereqError("There are still %d node(s) in"
468
                                 " this cluster." % (len(nodelist) - 1))
469
    instancelist = self.cfg.GetInstanceList()
470
    if instancelist:
471
      raise errors.OpPrereqError("There are still %d instance(s) in"
472
                                 " this cluster." % len(instancelist))
473

    
474
  def Exec(self, feedback_fn):
475
    """Destroys the cluster.
476

477
    """
478
    master = self.sstore.GetMasterNode()
479
    if not rpc.call_node_stop_master(master, False):
480
      raise errors.OpExecError("Could not disable the master role")
481
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
482
    utils.CreateBackup(priv_key)
483
    utils.CreateBackup(pub_key)
484
    return master
485

    
486

    
487
class LUVerifyCluster(LogicalUnit):
488
  """Verifies the cluster status.
489

490
  """
491
  HPATH = "cluster-verify"
492
  HTYPE = constants.HTYPE_CLUSTER
493
  _OP_REQP = ["skip_checks"]
494

    
495
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
496
                  remote_version, feedback_fn):
497
    """Run multiple tests against a node.
498

499
    Test list:
500
      - compares ganeti version
501
      - checks vg existance and size > 20G
502
      - checks config file checksum
503
      - checks ssh to other nodes
504

505
    Args:
506
      node: name of the node to check
507
      file_list: required list of files
508
      local_cksum: dictionary of local files and their checksums
509

510
    """
511
    # compares ganeti version
512
    local_version = constants.PROTOCOL_VERSION
513
    if not remote_version:
514
      feedback_fn("  - ERROR: connection to %s failed" % (node))
515
      return True
516

    
517
    if local_version != remote_version:
518
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
519
                      (local_version, node, remote_version))
520
      return True
521

    
522
    # checks vg existance and size > 20G
523

    
524
    bad = False
525
    if not vglist:
526
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
527
                      (node,))
528
      bad = True
529
    else:
530
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
531
                                            constants.MIN_VG_SIZE)
532
      if vgstatus:
533
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
534
        bad = True
535

    
536
    # checks config file checksum
537
    # checks ssh to any
538

    
539
    if 'filelist' not in node_result:
540
      bad = True
541
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
542
    else:
543
      remote_cksum = node_result['filelist']
544
      for file_name in file_list:
545
        if file_name not in remote_cksum:
546
          bad = True
547
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
548
        elif remote_cksum[file_name] != local_cksum[file_name]:
549
          bad = True
550
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
551

    
552
    if 'nodelist' not in node_result:
553
      bad = True
554
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
555
    else:
556
      if node_result['nodelist']:
557
        bad = True
558
        for node in node_result['nodelist']:
559
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
560
                          (node, node_result['nodelist'][node]))
561
    if 'node-net-test' not in node_result:
562
      bad = True
563
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
564
    else:
565
      if node_result['node-net-test']:
566
        bad = True
567
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
568
        for node in nlist:
569
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
570
                          (node, node_result['node-net-test'][node]))
571

    
572
    hyp_result = node_result.get('hypervisor', None)
573
    if hyp_result is not None:
574
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
575
    return bad
576

    
577
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
578
                      node_instance, feedback_fn):
579
    """Verify an instance.
580

581
    This function checks to see if the required block devices are
582
    available on the instance's node.
583

584
    """
585
    bad = False
586

    
587
    node_current = instanceconfig.primary_node
588

    
589
    node_vol_should = {}
590
    instanceconfig.MapLVsByNode(node_vol_should)
591

    
592
    for node in node_vol_should:
593
      for volume in node_vol_should[node]:
594
        if node not in node_vol_is or volume not in node_vol_is[node]:
595
          feedback_fn("  - ERROR: volume %s missing on node %s" %
596
                          (volume, node))
597
          bad = True
598

    
599
    if not instanceconfig.status == 'down':
600
      if (node_current not in node_instance or
601
          not instance in node_instance[node_current]):
602
        feedback_fn("  - ERROR: instance %s not running on node %s" %
603
                        (instance, node_current))
604
        bad = True
605

    
606
    for node in node_instance:
607
      if (not node == node_current):
608
        if instance in node_instance[node]:
609
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
610
                          (instance, node))
611
          bad = True
612

    
613
    return bad
614

    
615
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
616
    """Verify if there are any unknown volumes in the cluster.
617

618
    The .os, .swap and backup volumes are ignored. All other volumes are
619
    reported as unknown.
620

621
    """
622
    bad = False
623

    
624
    for node in node_vol_is:
625
      for volume in node_vol_is[node]:
626
        if node not in node_vol_should or volume not in node_vol_should[node]:
627
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
628
                      (volume, node))
629
          bad = True
630
    return bad
631

    
632
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
633
    """Verify the list of running instances.
634

635
    This checks what instances are running but unknown to the cluster.
636

637
    """
638
    bad = False
639
    for node in node_instance:
640
      for runninginstance in node_instance[node]:
641
        if runninginstance not in instancelist:
642
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
643
                          (runninginstance, node))
644
          bad = True
645
    return bad
646

    
647
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
648
    """Verify N+1 Memory Resilience.
649

650
    Check that if one single node dies we can still start all the instances it
651
    was primary for.
652

653
    """
654
    bad = False
655

    
656
    for node, nodeinfo in node_info.iteritems():
657
      # This code checks that every node which is now listed as secondary has
658
      # enough memory to host all instances it is supposed to should a single
659
      # other node in the cluster fail.
660
      # FIXME: not ready for failover to an arbitrary node
661
      # FIXME: does not support file-backed instances
662
      # WARNING: we currently take into account down instances as well as up
663
      # ones, considering that even if they're down someone might want to start
664
      # them even in the event of a node failure.
665
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
666
        needed_mem = 0
667
        for instance in instances:
668
          needed_mem += instance_cfg[instance].memory
669
        if nodeinfo['mfree'] < needed_mem:
670
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
671
                      " failovers should node %s fail" % (node, prinode))
672
          bad = True
673
    return bad
674

    
675
  def CheckPrereq(self):
676
    """Check prerequisites.
677

678
    Transform the list of checks we're going to skip into a set and check that
679
    all its members are valid.
680

681
    """
682
    self.skip_set = frozenset(self.op.skip_checks)
683
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
684
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
685

    
686
  def BuildHooksEnv(self):
687
    """Build hooks env.
688

689
    Cluster-Verify hooks just rone in the post phase and their failure makes
690
    the output be logged in the verify output and the verification to fail.
691

692
    """
693
    all_nodes = self.cfg.GetNodeList()
694
    # TODO: populate the environment with useful information for verify hooks
695
    env = {}
696
    return env, [], all_nodes
697

    
698
  def Exec(self, feedback_fn):
699
    """Verify integrity of cluster, performing various test on nodes.
700

701
    """
702
    bad = False
703
    feedback_fn("* Verifying global settings")
704
    for msg in self.cfg.VerifyConfig():
705
      feedback_fn("  - ERROR: %s" % msg)
706

    
707
    vg_name = self.cfg.GetVGName()
708
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
709
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
710
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
711
    i_non_redundant = [] # Non redundant instances
712
    node_volume = {}
713
    node_instance = {}
714
    node_info = {}
715
    instance_cfg = {}
716

    
717
    # FIXME: verify OS list
718
    # do local checksums
719
    file_names = list(self.sstore.GetFileList())
720
    file_names.append(constants.SSL_CERT_FILE)
721
    file_names.append(constants.CLUSTER_CONF_FILE)
722
    local_checksums = utils.FingerprintFiles(file_names)
723

    
724
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
725
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
726
    all_instanceinfo = rpc.call_instance_list(nodelist)
727
    all_vglist = rpc.call_vg_list(nodelist)
728
    node_verify_param = {
729
      'filelist': file_names,
730
      'nodelist': nodelist,
731
      'hypervisor': None,
732
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
733
                        for node in nodeinfo]
734
      }
735
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
736
    all_rversion = rpc.call_version(nodelist)
737
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
738

    
739
    for node in nodelist:
740
      feedback_fn("* Verifying node %s" % node)
741
      result = self._VerifyNode(node, file_names, local_checksums,
742
                                all_vglist[node], all_nvinfo[node],
743
                                all_rversion[node], feedback_fn)
744
      bad = bad or result
745

    
746
      # node_volume
747
      volumeinfo = all_volumeinfo[node]
748

    
749
      if isinstance(volumeinfo, basestring):
750
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
751
                    (node, volumeinfo[-400:].encode('string_escape')))
752
        bad = True
753
        node_volume[node] = {}
754
      elif not isinstance(volumeinfo, dict):
755
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
756
        bad = True
757
        continue
758
      else:
759
        node_volume[node] = volumeinfo
760

    
761
      # node_instance
762
      nodeinstance = all_instanceinfo[node]
763
      if type(nodeinstance) != list:
764
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
765
        bad = True
766
        continue
767

    
768
      node_instance[node] = nodeinstance
769

    
770
      # node_info
771
      nodeinfo = all_ninfo[node]
772
      if not isinstance(nodeinfo, dict):
773
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
774
        bad = True
775
        continue
776

    
777
      try:
778
        node_info[node] = {
779
          "mfree": int(nodeinfo['memory_free']),
780
          "dfree": int(nodeinfo['vg_free']),
781
          "pinst": [],
782
          "sinst": [],
783
          # dictionary holding all instances this node is secondary for,
784
          # grouped by their primary node. Each key is a cluster node, and each
785
          # value is a list of instances which have the key as primary and the
786
          # current node as secondary.  this is handy to calculate N+1 memory
787
          # availability if you can only failover from a primary to its
788
          # secondary.
789
          "sinst-by-pnode": {},
790
        }
791
      except ValueError:
792
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
793
        bad = True
794
        continue
795

    
796
    node_vol_should = {}
797

    
798
    for instance in instancelist:
799
      feedback_fn("* Verifying instance %s" % instance)
800
      inst_config = self.cfg.GetInstanceInfo(instance)
801
      result =  self._VerifyInstance(instance, inst_config, node_volume,
802
                                     node_instance, feedback_fn)
803
      bad = bad or result
804

    
805
      inst_config.MapLVsByNode(node_vol_should)
806

    
807
      instance_cfg[instance] = inst_config
808

    
809
      pnode = inst_config.primary_node
810
      if pnode in node_info:
811
        node_info[pnode]['pinst'].append(instance)
812
      else:
813
        feedback_fn("  - ERROR: instance %s, connection to primary node"
814
                    " %s failed" % (instance, pnode))
815
        bad = True
816

    
817
      # If the instance is non-redundant we cannot survive losing its primary
818
      # node, so we are not N+1 compliant. On the other hand we have no disk
819
      # templates with more than one secondary so that situation is not well
820
      # supported either.
821
      # FIXME: does not support file-backed instances
822
      if len(inst_config.secondary_nodes) == 0:
823
        i_non_redundant.append(instance)
824
      elif len(inst_config.secondary_nodes) > 1:
825
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
826
                    % instance)
827

    
828
      for snode in inst_config.secondary_nodes:
829
        if snode in node_info:
830
          node_info[snode]['sinst'].append(instance)
831
          if pnode not in node_info[snode]['sinst-by-pnode']:
832
            node_info[snode]['sinst-by-pnode'][pnode] = []
833
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
834
        else:
835
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
836
                      " %s failed" % (instance, snode))
837

    
838
    feedback_fn("* Verifying orphan volumes")
839
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
840
                                       feedback_fn)
841
    bad = bad or result
842

    
843
    feedback_fn("* Verifying remaining instances")
844
    result = self._VerifyOrphanInstances(instancelist, node_instance,
845
                                         feedback_fn)
846
    bad = bad or result
847

    
848
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
849
      feedback_fn("* Verifying N+1 Memory redundancy")
850
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
851
      bad = bad or result
852

    
853
    feedback_fn("* Other Notes")
854
    if i_non_redundant:
855
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
856
                  % len(i_non_redundant))
857

    
858
    return not bad
859

    
860
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
861
    """Analize the post-hooks' result, handle it, and send some
862
    nicely-formatted feedback back to the user.
863

864
    Args:
865
      phase: the hooks phase that has just been run
866
      hooks_results: the results of the multi-node hooks rpc call
867
      feedback_fn: function to send feedback back to the caller
868
      lu_result: previous Exec result
869

870
    """
871
    # We only really run POST phase hooks, and are only interested in
872
    # their results
873
    if phase == constants.HOOKS_PHASE_POST:
874
      # Used to change hooks' output to proper indentation
875
      indent_re = re.compile('^', re.M)
876
      feedback_fn("* Hooks Results")
877
      if not hooks_results:
878
        feedback_fn("  - ERROR: general communication failure")
879
        lu_result = 1
880
      else:
881
        for node_name in hooks_results:
882
          show_node_header = True
883
          res = hooks_results[node_name]
884
          if res is False or not isinstance(res, list):
885
            feedback_fn("    Communication failure")
886
            lu_result = 1
887
            continue
888
          for script, hkr, output in res:
889
            if hkr == constants.HKR_FAIL:
890
              # The node header is only shown once, if there are
891
              # failing hooks on that node
892
              if show_node_header:
893
                feedback_fn("  Node %s:" % node_name)
894
                show_node_header = False
895
              feedback_fn("    ERROR: Script %s failed, output:" % script)
896
              output = indent_re.sub('      ', output)
897
              feedback_fn("%s" % output)
898
              lu_result = 1
899

    
900
      return lu_result
901

    
902

    
903
class LUVerifyDisks(NoHooksLU):
904
  """Verifies the cluster disks status.
905

906
  """
907
  _OP_REQP = []
908

    
909
  def CheckPrereq(self):
910
    """Check prerequisites.
911

912
    This has no prerequisites.
913

914
    """
915
    pass
916

    
917
  def Exec(self, feedback_fn):
918
    """Verify integrity of cluster disks.
919

920
    """
921
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
922

    
923
    vg_name = self.cfg.GetVGName()
924
    nodes = utils.NiceSort(self.cfg.GetNodeList())
925
    instances = [self.cfg.GetInstanceInfo(name)
926
                 for name in self.cfg.GetInstanceList()]
927

    
928
    nv_dict = {}
929
    for inst in instances:
930
      inst_lvs = {}
931
      if (inst.status != "up" or
932
          inst.disk_template not in constants.DTS_NET_MIRROR):
933
        continue
934
      inst.MapLVsByNode(inst_lvs)
935
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
936
      for node, vol_list in inst_lvs.iteritems():
937
        for vol in vol_list:
938
          nv_dict[(node, vol)] = inst
939

    
940
    if not nv_dict:
941
      return result
942

    
943
    node_lvs = rpc.call_volume_list(nodes, vg_name)
944

    
945
    to_act = set()
946
    for node in nodes:
947
      # node_volume
948
      lvs = node_lvs[node]
949

    
950
      if isinstance(lvs, basestring):
951
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
952
        res_nlvm[node] = lvs
953
      elif not isinstance(lvs, dict):
954
        logger.Info("connection to node %s failed or invalid data returned" %
955
                    (node,))
956
        res_nodes.append(node)
957
        continue
958

    
959
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
960
        inst = nv_dict.pop((node, lv_name), None)
961
        if (not lv_online and inst is not None
962
            and inst.name not in res_instances):
963
          res_instances.append(inst.name)
964

    
965
    # any leftover items in nv_dict are missing LVs, let's arrange the
966
    # data better
967
    for key, inst in nv_dict.iteritems():
968
      if inst.name not in res_missing:
969
        res_missing[inst.name] = []
970
      res_missing[inst.name].append(key)
971

    
972
    return result
973

    
974

    
975
class LURenameCluster(LogicalUnit):
976
  """Rename the cluster.
977

978
  """
979
  HPATH = "cluster-rename"
980
  HTYPE = constants.HTYPE_CLUSTER
981
  _OP_REQP = ["name"]
982
  REQ_WSSTORE = True
983

    
984
  def BuildHooksEnv(self):
985
    """Build hooks env.
986

987
    """
988
    env = {
989
      "OP_TARGET": self.sstore.GetClusterName(),
990
      "NEW_NAME": self.op.name,
991
      }
992
    mn = self.sstore.GetMasterNode()
993
    return env, [mn], [mn]
994

    
995
  def CheckPrereq(self):
996
    """Verify that the passed name is a valid one.
997

998
    """
999
    hostname = utils.HostInfo(self.op.name)
1000

    
1001
    new_name = hostname.name
1002
    self.ip = new_ip = hostname.ip
1003
    old_name = self.sstore.GetClusterName()
1004
    old_ip = self.sstore.GetMasterIP()
1005
    if new_name == old_name and new_ip == old_ip:
1006
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1007
                                 " cluster has changed")
1008
    if new_ip != old_ip:
1009
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1010
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1011
                                   " reachable on the network. Aborting." %
1012
                                   new_ip)
1013

    
1014
    self.op.name = new_name
1015

    
1016
  def Exec(self, feedback_fn):
1017
    """Rename the cluster.
1018

1019
    """
1020
    clustername = self.op.name
1021
    ip = self.ip
1022
    ss = self.sstore
1023

    
1024
    # shutdown the master IP
1025
    master = ss.GetMasterNode()
1026
    if not rpc.call_node_stop_master(master, False):
1027
      raise errors.OpExecError("Could not disable the master role")
1028

    
1029
    try:
1030
      # modify the sstore
1031
      ss.SetKey(ss.SS_MASTER_IP, ip)
1032
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1033

    
1034
      # Distribute updated ss config to all nodes
1035
      myself = self.cfg.GetNodeInfo(master)
1036
      dist_nodes = self.cfg.GetNodeList()
1037
      if myself.name in dist_nodes:
1038
        dist_nodes.remove(myself.name)
1039

    
1040
      logger.Debug("Copying updated ssconf data to all nodes")
1041
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1042
        fname = ss.KeyToFilename(keyname)
1043
        result = rpc.call_upload_file(dist_nodes, fname)
1044
        for to_node in dist_nodes:
1045
          if not result[to_node]:
1046
            logger.Error("copy of file %s to node %s failed" %
1047
                         (fname, to_node))
1048
    finally:
1049
      if not rpc.call_node_start_master(master, False):
1050
        logger.Error("Could not re-enable the master role on the master,"
1051
                     " please restart manually.")
1052

    
1053

    
1054
def _RecursiveCheckIfLVMBased(disk):
1055
  """Check if the given disk or its children are lvm-based.
1056

1057
  Args:
1058
    disk: ganeti.objects.Disk object
1059

1060
  Returns:
1061
    boolean indicating whether a LD_LV dev_type was found or not
1062

1063
  """
1064
  if disk.children:
1065
    for chdisk in disk.children:
1066
      if _RecursiveCheckIfLVMBased(chdisk):
1067
        return True
1068
  return disk.dev_type == constants.LD_LV
1069

    
1070

    
1071
class LUSetClusterParams(LogicalUnit):
1072
  """Change the parameters of the cluster.
1073

1074
  """
1075
  HPATH = "cluster-modify"
1076
  HTYPE = constants.HTYPE_CLUSTER
1077
  _OP_REQP = []
1078

    
1079
  def BuildHooksEnv(self):
1080
    """Build hooks env.
1081

1082
    """
1083
    env = {
1084
      "OP_TARGET": self.sstore.GetClusterName(),
1085
      "NEW_VG_NAME": self.op.vg_name,
1086
      }
1087
    mn = self.sstore.GetMasterNode()
1088
    return env, [mn], [mn]
1089

    
1090
  def CheckPrereq(self):
1091
    """Check prerequisites.
1092

1093
    This checks whether the given params don't conflict and
1094
    if the given volume group is valid.
1095

1096
    """
1097
    if not self.op.vg_name:
1098
      instances = [self.cfg.GetInstanceInfo(name)
1099
                   for name in self.cfg.GetInstanceList()]
1100
      for inst in instances:
1101
        for disk in inst.disks:
1102
          if _RecursiveCheckIfLVMBased(disk):
1103
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1104
                                       " lvm-based instances exist")
1105

    
1106
    # if vg_name not None, checks given volume group on all nodes
1107
    if self.op.vg_name:
1108
      node_list = self.cfg.GetNodeList()
1109
      vglist = rpc.call_vg_list(node_list)
1110
      for node in node_list:
1111
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1112
                                              constants.MIN_VG_SIZE)
1113
        if vgstatus:
1114
          raise errors.OpPrereqError("Error on node '%s': %s" %
1115
                                     (node, vgstatus))
1116

    
1117
  def Exec(self, feedback_fn):
1118
    """Change the parameters of the cluster.
1119

1120
    """
1121
    if self.op.vg_name != self.cfg.GetVGName():
1122
      self.cfg.SetVGName(self.op.vg_name)
1123
    else:
1124
      feedback_fn("Cluster LVM configuration already in desired"
1125
                  " state, not changing")
1126

    
1127

    
1128
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1129
  """Sleep and poll for an instance's disk to sync.
1130

1131
  """
1132
  if not instance.disks:
1133
    return True
1134

    
1135
  if not oneshot:
1136
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1137

    
1138
  node = instance.primary_node
1139

    
1140
  for dev in instance.disks:
1141
    cfgw.SetDiskID(dev, node)
1142

    
1143
  retries = 0
1144
  while True:
1145
    max_time = 0
1146
    done = True
1147
    cumul_degraded = False
1148
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1149
    if not rstats:
1150
      proc.LogWarning("Can't get any data from node %s" % node)
1151
      retries += 1
1152
      if retries >= 10:
1153
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1154
                                 " aborting." % node)
1155
      time.sleep(6)
1156
      continue
1157
    retries = 0
1158
    for i in range(len(rstats)):
1159
      mstat = rstats[i]
1160
      if mstat is None:
1161
        proc.LogWarning("Can't compute data for node %s/%s" %
1162
                        (node, instance.disks[i].iv_name))
1163
        continue
1164
      # we ignore the ldisk parameter
1165
      perc_done, est_time, is_degraded, _ = mstat
1166
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1167
      if perc_done is not None:
1168
        done = False
1169
        if est_time is not None:
1170
          rem_time = "%d estimated seconds remaining" % est_time
1171
          max_time = est_time
1172
        else:
1173
          rem_time = "no time estimate"
1174
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1175
                     (instance.disks[i].iv_name, perc_done, rem_time))
1176
    if done or oneshot:
1177
      break
1178

    
1179
    time.sleep(min(60, max_time))
1180

    
1181
  if done:
1182
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1183
  return not cumul_degraded
1184

    
1185

    
1186
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1187
  """Check that mirrors are not degraded.
1188

1189
  The ldisk parameter, if True, will change the test from the
1190
  is_degraded attribute (which represents overall non-ok status for
1191
  the device(s)) to the ldisk (representing the local storage status).
1192

1193
  """
1194
  cfgw.SetDiskID(dev, node)
1195
  if ldisk:
1196
    idx = 6
1197
  else:
1198
    idx = 5
1199

    
1200
  result = True
1201
  if on_primary or dev.AssembleOnSecondary():
1202
    rstats = rpc.call_blockdev_find(node, dev)
1203
    if not rstats:
1204
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1205
      result = False
1206
    else:
1207
      result = result and (not rstats[idx])
1208
  if dev.children:
1209
    for child in dev.children:
1210
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1211

    
1212
  return result
1213

    
1214

    
1215
class LUDiagnoseOS(NoHooksLU):
1216
  """Logical unit for OS diagnose/query.
1217

1218
  """
1219
  _OP_REQP = ["output_fields", "names"]
1220

    
1221
  def CheckPrereq(self):
1222
    """Check prerequisites.
1223

1224
    This always succeeds, since this is a pure query LU.
1225

1226
    """
1227
    if self.op.names:
1228
      raise errors.OpPrereqError("Selective OS query not supported")
1229

    
1230
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1231
    _CheckOutputFields(static=[],
1232
                       dynamic=self.dynamic_fields,
1233
                       selected=self.op.output_fields)
1234

    
1235
  @staticmethod
1236
  def _DiagnoseByOS(node_list, rlist):
1237
    """Remaps a per-node return list into an a per-os per-node dictionary
1238

1239
      Args:
1240
        node_list: a list with the names of all nodes
1241
        rlist: a map with node names as keys and OS objects as values
1242

1243
      Returns:
1244
        map: a map with osnames as keys and as value another map, with
1245
             nodes as
1246
             keys and list of OS objects as values
1247
             e.g. {"debian-etch": {"node1": [<object>,...],
1248
                                   "node2": [<object>,]}
1249
                  }
1250

1251
    """
1252
    all_os = {}
1253
    for node_name, nr in rlist.iteritems():
1254
      if not nr:
1255
        continue
1256
      for os_obj in nr:
1257
        if os_obj.name not in all_os:
1258
          # build a list of nodes for this os containing empty lists
1259
          # for each node in node_list
1260
          all_os[os_obj.name] = {}
1261
          for nname in node_list:
1262
            all_os[os_obj.name][nname] = []
1263
        all_os[os_obj.name][node_name].append(os_obj)
1264
    return all_os
1265

    
1266
  def Exec(self, feedback_fn):
1267
    """Compute the list of OSes.
1268

1269
    """
1270
    node_list = self.cfg.GetNodeList()
1271
    node_data = rpc.call_os_diagnose(node_list)
1272
    if node_data == False:
1273
      raise errors.OpExecError("Can't gather the list of OSes")
1274
    pol = self._DiagnoseByOS(node_list, node_data)
1275
    output = []
1276
    for os_name, os_data in pol.iteritems():
1277
      row = []
1278
      for field in self.op.output_fields:
1279
        if field == "name":
1280
          val = os_name
1281
        elif field == "valid":
1282
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1283
        elif field == "node_status":
1284
          val = {}
1285
          for node_name, nos_list in os_data.iteritems():
1286
            val[node_name] = [(v.status, v.path) for v in nos_list]
1287
        else:
1288
          raise errors.ParameterError(field)
1289
        row.append(val)
1290
      output.append(row)
1291

    
1292
    return output
1293

    
1294

    
1295
class LURemoveNode(LogicalUnit):
1296
  """Logical unit for removing a node.
1297

1298
  """
1299
  HPATH = "node-remove"
1300
  HTYPE = constants.HTYPE_NODE
1301
  _OP_REQP = ["node_name"]
1302

    
1303
  def BuildHooksEnv(self):
1304
    """Build hooks env.
1305

1306
    This doesn't run on the target node in the pre phase as a failed
1307
    node would then be impossible to remove.
1308

1309
    """
1310
    env = {
1311
      "OP_TARGET": self.op.node_name,
1312
      "NODE_NAME": self.op.node_name,
1313
      }
1314
    all_nodes = self.cfg.GetNodeList()
1315
    all_nodes.remove(self.op.node_name)
1316
    return env, all_nodes, all_nodes
1317

    
1318
  def CheckPrereq(self):
1319
    """Check prerequisites.
1320

1321
    This checks:
1322
     - the node exists in the configuration
1323
     - it does not have primary or secondary instances
1324
     - it's not the master
1325

1326
    Any errors are signalled by raising errors.OpPrereqError.
1327

1328
    """
1329
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1330
    if node is None:
1331
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1332

    
1333
    instance_list = self.cfg.GetInstanceList()
1334

    
1335
    masternode = self.sstore.GetMasterNode()
1336
    if node.name == masternode:
1337
      raise errors.OpPrereqError("Node is the master node,"
1338
                                 " you need to failover first.")
1339

    
1340
    for instance_name in instance_list:
1341
      instance = self.cfg.GetInstanceInfo(instance_name)
1342
      if node.name == instance.primary_node:
1343
        raise errors.OpPrereqError("Instance %s still running on the node,"
1344
                                   " please remove first." % instance_name)
1345
      if node.name in instance.secondary_nodes:
1346
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1347
                                   " please remove first." % instance_name)
1348
    self.op.node_name = node.name
1349
    self.node = node
1350

    
1351
  def Exec(self, feedback_fn):
1352
    """Removes the node from the cluster.
1353

1354
    """
1355
    node = self.node
1356
    logger.Info("stopping the node daemon and removing configs from node %s" %
1357
                node.name)
1358

    
1359
    self.context.RemoveNode(node.name)
1360

    
1361
    rpc.call_node_leave_cluster(node.name)
1362

    
1363

    
1364
class LUQueryNodes(NoHooksLU):
1365
  """Logical unit for querying nodes.
1366

1367
  """
1368
  _OP_REQP = ["output_fields", "names"]
1369

    
1370
  def CheckPrereq(self):
1371
    """Check prerequisites.
1372

1373
    This checks that the fields required are valid output fields.
1374

1375
    """
1376
    self.dynamic_fields = frozenset([
1377
      "dtotal", "dfree",
1378
      "mtotal", "mnode", "mfree",
1379
      "bootid",
1380
      "ctotal",
1381
      ])
1382

    
1383
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1384
                               "pinst_list", "sinst_list",
1385
                               "pip", "sip", "tags"],
1386
                       dynamic=self.dynamic_fields,
1387
                       selected=self.op.output_fields)
1388

    
1389
    self.wanted = _GetWantedNodes(self, self.op.names)
1390

    
1391
  def Exec(self, feedback_fn):
1392
    """Computes the list of nodes and their attributes.
1393

1394
    """
1395
    nodenames = self.wanted
1396
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1397

    
1398
    # begin data gathering
1399

    
1400
    if self.dynamic_fields.intersection(self.op.output_fields):
1401
      live_data = {}
1402
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1403
      for name in nodenames:
1404
        nodeinfo = node_data.get(name, None)
1405
        if nodeinfo:
1406
          live_data[name] = {
1407
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1408
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1409
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1410
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1411
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1412
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1413
            "bootid": nodeinfo['bootid'],
1414
            }
1415
        else:
1416
          live_data[name] = {}
1417
    else:
1418
      live_data = dict.fromkeys(nodenames, {})
1419

    
1420
    node_to_primary = dict([(name, set()) for name in nodenames])
1421
    node_to_secondary = dict([(name, set()) for name in nodenames])
1422

    
1423
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1424
                             "sinst_cnt", "sinst_list"))
1425
    if inst_fields & frozenset(self.op.output_fields):
1426
      instancelist = self.cfg.GetInstanceList()
1427

    
1428
      for instance_name in instancelist:
1429
        inst = self.cfg.GetInstanceInfo(instance_name)
1430
        if inst.primary_node in node_to_primary:
1431
          node_to_primary[inst.primary_node].add(inst.name)
1432
        for secnode in inst.secondary_nodes:
1433
          if secnode in node_to_secondary:
1434
            node_to_secondary[secnode].add(inst.name)
1435

    
1436
    # end data gathering
1437

    
1438
    output = []
1439
    for node in nodelist:
1440
      node_output = []
1441
      for field in self.op.output_fields:
1442
        if field == "name":
1443
          val = node.name
1444
        elif field == "pinst_list":
1445
          val = list(node_to_primary[node.name])
1446
        elif field == "sinst_list":
1447
          val = list(node_to_secondary[node.name])
1448
        elif field == "pinst_cnt":
1449
          val = len(node_to_primary[node.name])
1450
        elif field == "sinst_cnt":
1451
          val = len(node_to_secondary[node.name])
1452
        elif field == "pip":
1453
          val = node.primary_ip
1454
        elif field == "sip":
1455
          val = node.secondary_ip
1456
        elif field == "tags":
1457
          val = list(node.GetTags())
1458
        elif field in self.dynamic_fields:
1459
          val = live_data[node.name].get(field, None)
1460
        else:
1461
          raise errors.ParameterError(field)
1462
        node_output.append(val)
1463
      output.append(node_output)
1464

    
1465
    return output
1466

    
1467

    
1468
class LUQueryNodeVolumes(NoHooksLU):
1469
  """Logical unit for getting volumes on node(s).
1470

1471
  """
1472
  _OP_REQP = ["nodes", "output_fields"]
1473

    
1474
  def CheckPrereq(self):
1475
    """Check prerequisites.
1476

1477
    This checks that the fields required are valid output fields.
1478

1479
    """
1480
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1481

    
1482
    _CheckOutputFields(static=["node"],
1483
                       dynamic=["phys", "vg", "name", "size", "instance"],
1484
                       selected=self.op.output_fields)
1485

    
1486

    
1487
  def Exec(self, feedback_fn):
1488
    """Computes the list of nodes and their attributes.
1489

1490
    """
1491
    nodenames = self.nodes
1492
    volumes = rpc.call_node_volumes(nodenames)
1493

    
1494
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1495
             in self.cfg.GetInstanceList()]
1496

    
1497
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1498

    
1499
    output = []
1500
    for node in nodenames:
1501
      if node not in volumes or not volumes[node]:
1502
        continue
1503

    
1504
      node_vols = volumes[node][:]
1505
      node_vols.sort(key=lambda vol: vol['dev'])
1506

    
1507
      for vol in node_vols:
1508
        node_output = []
1509
        for field in self.op.output_fields:
1510
          if field == "node":
1511
            val = node
1512
          elif field == "phys":
1513
            val = vol['dev']
1514
          elif field == "vg":
1515
            val = vol['vg']
1516
          elif field == "name":
1517
            val = vol['name']
1518
          elif field == "size":
1519
            val = int(float(vol['size']))
1520
          elif field == "instance":
1521
            for inst in ilist:
1522
              if node not in lv_by_node[inst]:
1523
                continue
1524
              if vol['name'] in lv_by_node[inst][node]:
1525
                val = inst.name
1526
                break
1527
            else:
1528
              val = '-'
1529
          else:
1530
            raise errors.ParameterError(field)
1531
          node_output.append(str(val))
1532

    
1533
        output.append(node_output)
1534

    
1535
    return output
1536

    
1537

    
1538
class LUAddNode(LogicalUnit):
1539
  """Logical unit for adding node to the cluster.
1540

1541
  """
1542
  HPATH = "node-add"
1543
  HTYPE = constants.HTYPE_NODE
1544
  _OP_REQP = ["node_name"]
1545

    
1546
  def BuildHooksEnv(self):
1547
    """Build hooks env.
1548

1549
    This will run on all nodes before, and on all nodes + the new node after.
1550

1551
    """
1552
    env = {
1553
      "OP_TARGET": self.op.node_name,
1554
      "NODE_NAME": self.op.node_name,
1555
      "NODE_PIP": self.op.primary_ip,
1556
      "NODE_SIP": self.op.secondary_ip,
1557
      }
1558
    nodes_0 = self.cfg.GetNodeList()
1559
    nodes_1 = nodes_0 + [self.op.node_name, ]
1560
    return env, nodes_0, nodes_1
1561

    
1562
  def CheckPrereq(self):
1563
    """Check prerequisites.
1564

1565
    This checks:
1566
     - the new node is not already in the config
1567
     - it is resolvable
1568
     - its parameters (single/dual homed) matches the cluster
1569

1570
    Any errors are signalled by raising errors.OpPrereqError.
1571

1572
    """
1573
    node_name = self.op.node_name
1574
    cfg = self.cfg
1575

    
1576
    dns_data = utils.HostInfo(node_name)
1577

    
1578
    node = dns_data.name
1579
    primary_ip = self.op.primary_ip = dns_data.ip
1580
    secondary_ip = getattr(self.op, "secondary_ip", None)
1581
    if secondary_ip is None:
1582
      secondary_ip = primary_ip
1583
    if not utils.IsValidIP(secondary_ip):
1584
      raise errors.OpPrereqError("Invalid secondary IP given")
1585
    self.op.secondary_ip = secondary_ip
1586

    
1587
    node_list = cfg.GetNodeList()
1588
    if not self.op.readd and node in node_list:
1589
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1590
                                 node)
1591
    elif self.op.readd and node not in node_list:
1592
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1593

    
1594
    for existing_node_name in node_list:
1595
      existing_node = cfg.GetNodeInfo(existing_node_name)
1596

    
1597
      if self.op.readd and node == existing_node_name:
1598
        if (existing_node.primary_ip != primary_ip or
1599
            existing_node.secondary_ip != secondary_ip):
1600
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1601
                                     " address configuration as before")
1602
        continue
1603

    
1604
      if (existing_node.primary_ip == primary_ip or
1605
          existing_node.secondary_ip == primary_ip or
1606
          existing_node.primary_ip == secondary_ip or
1607
          existing_node.secondary_ip == secondary_ip):
1608
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1609
                                   " existing node %s" % existing_node.name)
1610

    
1611
    # check that the type of the node (single versus dual homed) is the
1612
    # same as for the master
1613
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1614
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1615
    newbie_singlehomed = secondary_ip == primary_ip
1616
    if master_singlehomed != newbie_singlehomed:
1617
      if master_singlehomed:
1618
        raise errors.OpPrereqError("The master has no private ip but the"
1619
                                   " new node has one")
1620
      else:
1621
        raise errors.OpPrereqError("The master has a private ip but the"
1622
                                   " new node doesn't have one")
1623

    
1624
    # checks reachablity
1625
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1626
      raise errors.OpPrereqError("Node not reachable by ping")
1627

    
1628
    if not newbie_singlehomed:
1629
      # check reachability from my secondary ip to newbie's secondary ip
1630
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1631
                           source=myself.secondary_ip):
1632
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1633
                                   " based ping to noded port")
1634

    
1635
    self.new_node = objects.Node(name=node,
1636
                                 primary_ip=primary_ip,
1637
                                 secondary_ip=secondary_ip)
1638

    
1639
  def Exec(self, feedback_fn):
1640
    """Adds the new node to the cluster.
1641

1642
    """
1643
    new_node = self.new_node
1644
    node = new_node.name
1645

    
1646
    # check connectivity
1647
    result = rpc.call_version([node])[node]
1648
    if result:
1649
      if constants.PROTOCOL_VERSION == result:
1650
        logger.Info("communication to node %s fine, sw version %s match" %
1651
                    (node, result))
1652
      else:
1653
        raise errors.OpExecError("Version mismatch master version %s,"
1654
                                 " node version %s" %
1655
                                 (constants.PROTOCOL_VERSION, result))
1656
    else:
1657
      raise errors.OpExecError("Cannot get version from the new node")
1658

    
1659
    # setup ssh on node
1660
    logger.Info("copy ssh key to node %s" % node)
1661
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1662
    keyarray = []
1663
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1664
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1665
                priv_key, pub_key]
1666

    
1667
    for i in keyfiles:
1668
      f = open(i, 'r')
1669
      try:
1670
        keyarray.append(f.read())
1671
      finally:
1672
        f.close()
1673

    
1674
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1675
                               keyarray[3], keyarray[4], keyarray[5])
1676

    
1677
    if not result:
1678
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1679

    
1680
    # Add node to our /etc/hosts, and add key to known_hosts
1681
    utils.AddHostToEtcHosts(new_node.name)
1682

    
1683
    if new_node.secondary_ip != new_node.primary_ip:
1684
      if not rpc.call_node_tcp_ping(new_node.name,
1685
                                    constants.LOCALHOST_IP_ADDRESS,
1686
                                    new_node.secondary_ip,
1687
                                    constants.DEFAULT_NODED_PORT,
1688
                                    10, False):
1689
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1690
                                 " you gave (%s). Please fix and re-run this"
1691
                                 " command." % new_node.secondary_ip)
1692

    
1693
    node_verify_list = [self.sstore.GetMasterNode()]
1694
    node_verify_param = {
1695
      'nodelist': [node],
1696
      # TODO: do a node-net-test as well?
1697
    }
1698

    
1699
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1700
    for verifier in node_verify_list:
1701
      if not result[verifier]:
1702
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1703
                                 " for remote verification" % verifier)
1704
      if result[verifier]['nodelist']:
1705
        for failed in result[verifier]['nodelist']:
1706
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1707
                      (verifier, result[verifier]['nodelist'][failed]))
1708
        raise errors.OpExecError("ssh/hostname verification failed.")
1709

    
1710
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1711
    # including the node just added
1712
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1713
    dist_nodes = self.cfg.GetNodeList()
1714
    if not self.op.readd:
1715
      dist_nodes.append(node)
1716
    if myself.name in dist_nodes:
1717
      dist_nodes.remove(myself.name)
1718

    
1719
    logger.Debug("Copying hosts and known_hosts to all nodes")
1720
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1721
      result = rpc.call_upload_file(dist_nodes, fname)
1722
      for to_node in dist_nodes:
1723
        if not result[to_node]:
1724
          logger.Error("copy of file %s to node %s failed" %
1725
                       (fname, to_node))
1726

    
1727
    to_copy = self.sstore.GetFileList()
1728
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1729
      to_copy.append(constants.VNC_PASSWORD_FILE)
1730
    for fname in to_copy:
1731
      result = rpc.call_upload_file([node], fname)
1732
      if not result[node]:
1733
        logger.Error("could not copy file %s to node %s" % (fname, node))
1734

    
1735
    if self.op.readd:
1736
      self.context.ReaddNode(new_node)
1737
    else:
1738
      self.context.AddNode(new_node)
1739

    
1740

    
1741
class LUQueryClusterInfo(NoHooksLU):
1742
  """Query cluster configuration.
1743

1744
  """
1745
  _OP_REQP = []
1746
  REQ_MASTER = False
1747
  REQ_BGL = False
1748

    
1749
  def ExpandNames(self):
1750
    self.needed_locks = {}
1751

    
1752
  def CheckPrereq(self):
1753
    """No prerequsites needed for this LU.
1754

1755
    """
1756
    pass
1757

    
1758
  def Exec(self, feedback_fn):
1759
    """Return cluster config.
1760

1761
    """
1762
    result = {
1763
      "name": self.sstore.GetClusterName(),
1764
      "software_version": constants.RELEASE_VERSION,
1765
      "protocol_version": constants.PROTOCOL_VERSION,
1766
      "config_version": constants.CONFIG_VERSION,
1767
      "os_api_version": constants.OS_API_VERSION,
1768
      "export_version": constants.EXPORT_VERSION,
1769
      "master": self.sstore.GetMasterNode(),
1770
      "architecture": (platform.architecture()[0], platform.machine()),
1771
      "hypervisor_type": self.sstore.GetHypervisorType(),
1772
      }
1773

    
1774
    return result
1775

    
1776

    
1777
class LUDumpClusterConfig(NoHooksLU):
1778
  """Return a text-representation of the cluster-config.
1779

1780
  """
1781
  _OP_REQP = []
1782
  REQ_BGL = False
1783

    
1784
  def ExpandNames(self):
1785
    self.needed_locks = {}
1786

    
1787
  def CheckPrereq(self):
1788
    """No prerequisites.
1789

1790
    """
1791
    pass
1792

    
1793
  def Exec(self, feedback_fn):
1794
    """Dump a representation of the cluster config to the standard output.
1795

1796
    """
1797
    return self.cfg.DumpConfig()
1798

    
1799

    
1800
class LUActivateInstanceDisks(NoHooksLU):
1801
  """Bring up an instance's disks.
1802

1803
  """
1804
  _OP_REQP = ["instance_name"]
1805

    
1806
  def CheckPrereq(self):
1807
    """Check prerequisites.
1808

1809
    This checks that the instance is in the cluster.
1810

1811
    """
1812
    instance = self.cfg.GetInstanceInfo(
1813
      self.cfg.ExpandInstanceName(self.op.instance_name))
1814
    if instance is None:
1815
      raise errors.OpPrereqError("Instance '%s' not known" %
1816
                                 self.op.instance_name)
1817
    self.instance = instance
1818

    
1819

    
1820
  def Exec(self, feedback_fn):
1821
    """Activate the disks.
1822

1823
    """
1824
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1825
    if not disks_ok:
1826
      raise errors.OpExecError("Cannot activate block devices")
1827

    
1828
    return disks_info
1829

    
1830

    
1831
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1832
  """Prepare the block devices for an instance.
1833

1834
  This sets up the block devices on all nodes.
1835

1836
  Args:
1837
    instance: a ganeti.objects.Instance object
1838
    ignore_secondaries: if true, errors on secondary nodes won't result
1839
                        in an error return from the function
1840

1841
  Returns:
1842
    false if the operation failed
1843
    list of (host, instance_visible_name, node_visible_name) if the operation
1844
         suceeded with the mapping from node devices to instance devices
1845
  """
1846
  device_info = []
1847
  disks_ok = True
1848
  iname = instance.name
1849
  # With the two passes mechanism we try to reduce the window of
1850
  # opportunity for the race condition of switching DRBD to primary
1851
  # before handshaking occured, but we do not eliminate it
1852

    
1853
  # The proper fix would be to wait (with some limits) until the
1854
  # connection has been made and drbd transitions from WFConnection
1855
  # into any other network-connected state (Connected, SyncTarget,
1856
  # SyncSource, etc.)
1857

    
1858
  # 1st pass, assemble on all nodes in secondary mode
1859
  for inst_disk in instance.disks:
1860
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1861
      cfg.SetDiskID(node_disk, node)
1862
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1863
      if not result:
1864
        logger.Error("could not prepare block device %s on node %s"
1865
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1866
        if not ignore_secondaries:
1867
          disks_ok = False
1868

    
1869
  # FIXME: race condition on drbd migration to primary
1870

    
1871
  # 2nd pass, do only the primary node
1872
  for inst_disk in instance.disks:
1873
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1874
      if node != instance.primary_node:
1875
        continue
1876
      cfg.SetDiskID(node_disk, node)
1877
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1878
      if not result:
1879
        logger.Error("could not prepare block device %s on node %s"
1880
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1881
        disks_ok = False
1882
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1883

    
1884
  # leave the disks configured for the primary node
1885
  # this is a workaround that would be fixed better by
1886
  # improving the logical/physical id handling
1887
  for disk in instance.disks:
1888
    cfg.SetDiskID(disk, instance.primary_node)
1889

    
1890
  return disks_ok, device_info
1891

    
1892

    
1893
def _StartInstanceDisks(cfg, instance, force):
1894
  """Start the disks of an instance.
1895

1896
  """
1897
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1898
                                           ignore_secondaries=force)
1899
  if not disks_ok:
1900
    _ShutdownInstanceDisks(instance, cfg)
1901
    if force is not None and not force:
1902
      logger.Error("If the message above refers to a secondary node,"
1903
                   " you can retry the operation using '--force'.")
1904
    raise errors.OpExecError("Disk consistency error")
1905

    
1906

    
1907
class LUDeactivateInstanceDisks(NoHooksLU):
1908
  """Shutdown an instance's disks.
1909

1910
  """
1911
  _OP_REQP = ["instance_name"]
1912

    
1913
  def CheckPrereq(self):
1914
    """Check prerequisites.
1915

1916
    This checks that the instance is in the cluster.
1917

1918
    """
1919
    instance = self.cfg.GetInstanceInfo(
1920
      self.cfg.ExpandInstanceName(self.op.instance_name))
1921
    if instance is None:
1922
      raise errors.OpPrereqError("Instance '%s' not known" %
1923
                                 self.op.instance_name)
1924
    self.instance = instance
1925

    
1926
  def Exec(self, feedback_fn):
1927
    """Deactivate the disks
1928

1929
    """
1930
    instance = self.instance
1931
    ins_l = rpc.call_instance_list([instance.primary_node])
1932
    ins_l = ins_l[instance.primary_node]
1933
    if not type(ins_l) is list:
1934
      raise errors.OpExecError("Can't contact node '%s'" %
1935
                               instance.primary_node)
1936

    
1937
    if self.instance.name in ins_l:
1938
      raise errors.OpExecError("Instance is running, can't shutdown"
1939
                               " block devices.")
1940

    
1941
    _ShutdownInstanceDisks(instance, self.cfg)
1942

    
1943

    
1944
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1945
  """Shutdown block devices of an instance.
1946

1947
  This does the shutdown on all nodes of the instance.
1948

1949
  If the ignore_primary is false, errors on the primary node are
1950
  ignored.
1951

1952
  """
1953
  result = True
1954
  for disk in instance.disks:
1955
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1956
      cfg.SetDiskID(top_disk, node)
1957
      if not rpc.call_blockdev_shutdown(node, top_disk):
1958
        logger.Error("could not shutdown block device %s on node %s" %
1959
                     (disk.iv_name, node))
1960
        if not ignore_primary or node != instance.primary_node:
1961
          result = False
1962
  return result
1963

    
1964

    
1965
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1966
  """Checks if a node has enough free memory.
1967

1968
  This function check if a given node has the needed amount of free
1969
  memory. In case the node has less memory or we cannot get the
1970
  information from the node, this function raise an OpPrereqError
1971
  exception.
1972

1973
  Args:
1974
    - cfg: a ConfigWriter instance
1975
    - node: the node name
1976
    - reason: string to use in the error message
1977
    - requested: the amount of memory in MiB
1978

1979
  """
1980
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1981
  if not nodeinfo or not isinstance(nodeinfo, dict):
1982
    raise errors.OpPrereqError("Could not contact node %s for resource"
1983
                             " information" % (node,))
1984

    
1985
  free_mem = nodeinfo[node].get('memory_free')
1986
  if not isinstance(free_mem, int):
1987
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1988
                             " was '%s'" % (node, free_mem))
1989
  if requested > free_mem:
1990
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1991
                             " needed %s MiB, available %s MiB" %
1992
                             (node, reason, requested, free_mem))
1993

    
1994

    
1995
class LUStartupInstance(LogicalUnit):
1996
  """Starts an instance.
1997

1998
  """
1999
  HPATH = "instance-start"
2000
  HTYPE = constants.HTYPE_INSTANCE
2001
  _OP_REQP = ["instance_name", "force"]
2002
  REQ_BGL = False
2003

    
2004
  def ExpandNames(self):
2005
    self._ExpandAndLockInstance()
2006
    self.needed_locks[locking.LEVEL_NODE] = []
2007
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2008

    
2009
  def DeclareLocks(self, level):
2010
    if level == locking.LEVEL_NODE:
2011
      self._LockInstancesNodes()
2012

    
2013
  def BuildHooksEnv(self):
2014
    """Build hooks env.
2015

2016
    This runs on master, primary and secondary nodes of the instance.
2017

2018
    """
2019
    env = {
2020
      "FORCE": self.op.force,
2021
      }
2022
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2023
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2024
          list(self.instance.secondary_nodes))
2025
    return env, nl, nl
2026

    
2027
  def CheckPrereq(self):
2028
    """Check prerequisites.
2029

2030
    This checks that the instance is in the cluster.
2031

2032
    """
2033
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2034
    assert self.instance is not None, \
2035
      "Cannot retrieve locked instance %s" % self.op.instance_name
2036

    
2037
    # check bridges existance
2038
    _CheckInstanceBridgesExist(instance)
2039

    
2040
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2041
                         "starting instance %s" % instance.name,
2042
                         instance.memory)
2043

    
2044
  def Exec(self, feedback_fn):
2045
    """Start the instance.
2046

2047
    """
2048
    instance = self.instance
2049
    force = self.op.force
2050
    extra_args = getattr(self.op, "extra_args", "")
2051

    
2052
    self.cfg.MarkInstanceUp(instance.name)
2053

    
2054
    node_current = instance.primary_node
2055

    
2056
    _StartInstanceDisks(self.cfg, instance, force)
2057

    
2058
    if not rpc.call_instance_start(node_current, instance, extra_args):
2059
      _ShutdownInstanceDisks(instance, self.cfg)
2060
      raise errors.OpExecError("Could not start instance")
2061

    
2062

    
2063
class LURebootInstance(LogicalUnit):
2064
  """Reboot an instance.
2065

2066
  """
2067
  HPATH = "instance-reboot"
2068
  HTYPE = constants.HTYPE_INSTANCE
2069
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2070
  REQ_BGL = False
2071

    
2072
  def ExpandNames(self):
2073
    self._ExpandAndLockInstance()
2074
    self.needed_locks[locking.LEVEL_NODE] = []
2075
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2076

    
2077
  def DeclareLocks(self, level):
2078
    if level == locking.LEVEL_NODE:
2079
      self._LockInstancesNodes()
2080

    
2081
  def BuildHooksEnv(self):
2082
    """Build hooks env.
2083

2084
    This runs on master, primary and secondary nodes of the instance.
2085

2086
    """
2087
    env = {
2088
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2089
      }
2090
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2091
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2092
          list(self.instance.secondary_nodes))
2093
    return env, nl, nl
2094

    
2095
  def CheckPrereq(self):
2096
    """Check prerequisites.
2097

2098
    This checks that the instance is in the cluster.
2099

2100
    """
2101
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2102
    assert self.instance is not None, \
2103
      "Cannot retrieve locked instance %s" % self.op.instance_name
2104

    
2105
    # check bridges existance
2106
    _CheckInstanceBridgesExist(instance)
2107

    
2108
  def Exec(self, feedback_fn):
2109
    """Reboot the instance.
2110

2111
    """
2112
    instance = self.instance
2113
    ignore_secondaries = self.op.ignore_secondaries
2114
    reboot_type = self.op.reboot_type
2115
    extra_args = getattr(self.op, "extra_args", "")
2116

    
2117
    node_current = instance.primary_node
2118

    
2119
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2120
                           constants.INSTANCE_REBOOT_HARD,
2121
                           constants.INSTANCE_REBOOT_FULL]:
2122
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2123
                                  (constants.INSTANCE_REBOOT_SOFT,
2124
                                   constants.INSTANCE_REBOOT_HARD,
2125
                                   constants.INSTANCE_REBOOT_FULL))
2126

    
2127
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2128
                       constants.INSTANCE_REBOOT_HARD]:
2129
      if not rpc.call_instance_reboot(node_current, instance,
2130
                                      reboot_type, extra_args):
2131
        raise errors.OpExecError("Could not reboot instance")
2132
    else:
2133
      if not rpc.call_instance_shutdown(node_current, instance):
2134
        raise errors.OpExecError("could not shutdown instance for full reboot")
2135
      _ShutdownInstanceDisks(instance, self.cfg)
2136
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2137
      if not rpc.call_instance_start(node_current, instance, extra_args):
2138
        _ShutdownInstanceDisks(instance, self.cfg)
2139
        raise errors.OpExecError("Could not start instance for full reboot")
2140

    
2141
    self.cfg.MarkInstanceUp(instance.name)
2142

    
2143

    
2144
class LUShutdownInstance(LogicalUnit):
2145
  """Shutdown an instance.
2146

2147
  """
2148
  HPATH = "instance-stop"
2149
  HTYPE = constants.HTYPE_INSTANCE
2150
  _OP_REQP = ["instance_name"]
2151
  REQ_BGL = False
2152

    
2153
  def ExpandNames(self):
2154
    self._ExpandAndLockInstance()
2155
    self.needed_locks[locking.LEVEL_NODE] = []
2156
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2157

    
2158
  def DeclareLocks(self, level):
2159
    if level == locking.LEVEL_NODE:
2160
      self._LockInstancesNodes()
2161

    
2162
  def BuildHooksEnv(self):
2163
    """Build hooks env.
2164

2165
    This runs on master, primary and secondary nodes of the instance.
2166

2167
    """
2168
    env = _BuildInstanceHookEnvByObject(self.instance)
2169
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2170
          list(self.instance.secondary_nodes))
2171
    return env, nl, nl
2172

    
2173
  def CheckPrereq(self):
2174
    """Check prerequisites.
2175

2176
    This checks that the instance is in the cluster.
2177

2178
    """
2179
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2180
    assert self.instance is not None, \
2181
      "Cannot retrieve locked instance %s" % self.op.instance_name
2182

    
2183
  def Exec(self, feedback_fn):
2184
    """Shutdown the instance.
2185

2186
    """
2187
    instance = self.instance
2188
    node_current = instance.primary_node
2189
    self.cfg.MarkInstanceDown(instance.name)
2190
    if not rpc.call_instance_shutdown(node_current, instance):
2191
      logger.Error("could not shutdown instance")
2192

    
2193
    _ShutdownInstanceDisks(instance, self.cfg)
2194

    
2195

    
2196
class LUReinstallInstance(LogicalUnit):
2197
  """Reinstall an instance.
2198

2199
  """
2200
  HPATH = "instance-reinstall"
2201
  HTYPE = constants.HTYPE_INSTANCE
2202
  _OP_REQP = ["instance_name"]
2203
  REQ_BGL = False
2204

    
2205
  def ExpandNames(self):
2206
    self._ExpandAndLockInstance()
2207
    self.needed_locks[locking.LEVEL_NODE] = []
2208
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2209

    
2210
  def DeclareLocks(self, level):
2211
    if level == locking.LEVEL_NODE:
2212
      self._LockInstancesNodes()
2213

    
2214
  def BuildHooksEnv(self):
2215
    """Build hooks env.
2216

2217
    This runs on master, primary and secondary nodes of the instance.
2218

2219
    """
2220
    env = _BuildInstanceHookEnvByObject(self.instance)
2221
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2222
          list(self.instance.secondary_nodes))
2223
    return env, nl, nl
2224

    
2225
  def CheckPrereq(self):
2226
    """Check prerequisites.
2227

2228
    This checks that the instance is in the cluster and is not running.
2229

2230
    """
2231
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2232
    assert instance is not None, \
2233
      "Cannot retrieve locked instance %s" % self.op.instance_name
2234

    
2235
    if instance.disk_template == constants.DT_DISKLESS:
2236
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2237
                                 self.op.instance_name)
2238
    if instance.status != "down":
2239
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2240
                                 self.op.instance_name)
2241
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2242
    if remote_info:
2243
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2244
                                 (self.op.instance_name,
2245
                                  instance.primary_node))
2246

    
2247
    self.op.os_type = getattr(self.op, "os_type", None)
2248
    if self.op.os_type is not None:
2249
      # OS verification
2250
      pnode = self.cfg.GetNodeInfo(
2251
        self.cfg.ExpandNodeName(instance.primary_node))
2252
      if pnode is None:
2253
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2254
                                   self.op.pnode)
2255
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2256
      if not os_obj:
2257
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2258
                                   " primary node"  % self.op.os_type)
2259

    
2260
    self.instance = instance
2261

    
2262
  def Exec(self, feedback_fn):
2263
    """Reinstall the instance.
2264

2265
    """
2266
    inst = self.instance
2267

    
2268
    if self.op.os_type is not None:
2269
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2270
      inst.os = self.op.os_type
2271
      self.cfg.AddInstance(inst)
2272

    
2273
    _StartInstanceDisks(self.cfg, inst, None)
2274
    try:
2275
      feedback_fn("Running the instance OS create scripts...")
2276
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2277
        raise errors.OpExecError("Could not install OS for instance %s"
2278
                                 " on node %s" %
2279
                                 (inst.name, inst.primary_node))
2280
    finally:
2281
      _ShutdownInstanceDisks(inst, self.cfg)
2282

    
2283

    
2284
class LURenameInstance(LogicalUnit):
2285
  """Rename an instance.
2286

2287
  """
2288
  HPATH = "instance-rename"
2289
  HTYPE = constants.HTYPE_INSTANCE
2290
  _OP_REQP = ["instance_name", "new_name"]
2291

    
2292
  def BuildHooksEnv(self):
2293
    """Build hooks env.
2294

2295
    This runs on master, primary and secondary nodes of the instance.
2296

2297
    """
2298
    env = _BuildInstanceHookEnvByObject(self.instance)
2299
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2300
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2301
          list(self.instance.secondary_nodes))
2302
    return env, nl, nl
2303

    
2304
  def CheckPrereq(self):
2305
    """Check prerequisites.
2306

2307
    This checks that the instance is in the cluster and is not running.
2308

2309
    """
2310
    instance = self.cfg.GetInstanceInfo(
2311
      self.cfg.ExpandInstanceName(self.op.instance_name))
2312
    if instance is None:
2313
      raise errors.OpPrereqError("Instance '%s' not known" %
2314
                                 self.op.instance_name)
2315
    if instance.status != "down":
2316
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2317
                                 self.op.instance_name)
2318
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2319
    if remote_info:
2320
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2321
                                 (self.op.instance_name,
2322
                                  instance.primary_node))
2323
    self.instance = instance
2324

    
2325
    # new name verification
2326
    name_info = utils.HostInfo(self.op.new_name)
2327

    
2328
    self.op.new_name = new_name = name_info.name
2329
    instance_list = self.cfg.GetInstanceList()
2330
    if new_name in instance_list:
2331
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2332
                                 new_name)
2333

    
2334
    if not getattr(self.op, "ignore_ip", False):
2335
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2336
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2337
                                   (name_info.ip, new_name))
2338

    
2339

    
2340
  def Exec(self, feedback_fn):
2341
    """Reinstall the instance.
2342

2343
    """
2344
    inst = self.instance
2345
    old_name = inst.name
2346

    
2347
    if inst.disk_template == constants.DT_FILE:
2348
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2349

    
2350
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2351
    # Change the instance lock. This is definitely safe while we hold the BGL
2352
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2353
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2354

    
2355
    # re-read the instance from the configuration after rename
2356
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2357

    
2358
    if inst.disk_template == constants.DT_FILE:
2359
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2360
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2361
                                                old_file_storage_dir,
2362
                                                new_file_storage_dir)
2363

    
2364
      if not result:
2365
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2366
                                 " directory '%s' to '%s' (but the instance"
2367
                                 " has been renamed in Ganeti)" % (
2368
                                 inst.primary_node, old_file_storage_dir,
2369
                                 new_file_storage_dir))
2370

    
2371
      if not result[0]:
2372
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2373
                                 " (but the instance has been renamed in"
2374
                                 " Ganeti)" % (old_file_storage_dir,
2375
                                               new_file_storage_dir))
2376

    
2377
    _StartInstanceDisks(self.cfg, inst, None)
2378
    try:
2379
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2380
                                          "sda", "sdb"):
2381
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2382
               " instance has been renamed in Ganeti)" %
2383
               (inst.name, inst.primary_node))
2384
        logger.Error(msg)
2385
    finally:
2386
      _ShutdownInstanceDisks(inst, self.cfg)
2387

    
2388

    
2389
class LURemoveInstance(LogicalUnit):
2390
  """Remove an instance.
2391

2392
  """
2393
  HPATH = "instance-remove"
2394
  HTYPE = constants.HTYPE_INSTANCE
2395
  _OP_REQP = ["instance_name", "ignore_failures"]
2396

    
2397
  def BuildHooksEnv(self):
2398
    """Build hooks env.
2399

2400
    This runs on master, primary and secondary nodes of the instance.
2401

2402
    """
2403
    env = _BuildInstanceHookEnvByObject(self.instance)
2404
    nl = [self.sstore.GetMasterNode()]
2405
    return env, nl, nl
2406

    
2407
  def CheckPrereq(self):
2408
    """Check prerequisites.
2409

2410
    This checks that the instance is in the cluster.
2411

2412
    """
2413
    instance = self.cfg.GetInstanceInfo(
2414
      self.cfg.ExpandInstanceName(self.op.instance_name))
2415
    if instance is None:
2416
      raise errors.OpPrereqError("Instance '%s' not known" %
2417
                                 self.op.instance_name)
2418
    self.instance = instance
2419

    
2420
  def Exec(self, feedback_fn):
2421
    """Remove the instance.
2422

2423
    """
2424
    instance = self.instance
2425
    logger.Info("shutting down instance %s on node %s" %
2426
                (instance.name, instance.primary_node))
2427

    
2428
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2429
      if self.op.ignore_failures:
2430
        feedback_fn("Warning: can't shutdown instance")
2431
      else:
2432
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2433
                                 (instance.name, instance.primary_node))
2434

    
2435
    logger.Info("removing block devices for instance %s" % instance.name)
2436

    
2437
    if not _RemoveDisks(instance, self.cfg):
2438
      if self.op.ignore_failures:
2439
        feedback_fn("Warning: can't remove instance's disks")
2440
      else:
2441
        raise errors.OpExecError("Can't remove instance's disks")
2442

    
2443
    logger.Info("removing instance %s out of cluster config" % instance.name)
2444

    
2445
    self.cfg.RemoveInstance(instance.name)
2446
    # Remove the new instance from the Ganeti Lock Manager
2447
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2448

    
2449

    
2450
class LUQueryInstances(NoHooksLU):
2451
  """Logical unit for querying instances.
2452

2453
  """
2454
  _OP_REQP = ["output_fields", "names"]
2455

    
2456
  def CheckPrereq(self):
2457
    """Check prerequisites.
2458

2459
    This checks that the fields required are valid output fields.
2460

2461
    """
2462
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2463
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2464
                               "admin_state", "admin_ram",
2465
                               "disk_template", "ip", "mac", "bridge",
2466
                               "sda_size", "sdb_size", "vcpus", "tags"],
2467
                       dynamic=self.dynamic_fields,
2468
                       selected=self.op.output_fields)
2469

    
2470
    self.wanted = _GetWantedInstances(self, self.op.names)
2471

    
2472
  def Exec(self, feedback_fn):
2473
    """Computes the list of nodes and their attributes.
2474

2475
    """
2476
    instance_names = self.wanted
2477
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2478
                     in instance_names]
2479

    
2480
    # begin data gathering
2481

    
2482
    nodes = frozenset([inst.primary_node for inst in instance_list])
2483

    
2484
    bad_nodes = []
2485
    if self.dynamic_fields.intersection(self.op.output_fields):
2486
      live_data = {}
2487
      node_data = rpc.call_all_instances_info(nodes)
2488
      for name in nodes:
2489
        result = node_data[name]
2490
        if result:
2491
          live_data.update(result)
2492
        elif result == False:
2493
          bad_nodes.append(name)
2494
        # else no instance is alive
2495
    else:
2496
      live_data = dict([(name, {}) for name in instance_names])
2497

    
2498
    # end data gathering
2499

    
2500
    output = []
2501
    for instance in instance_list:
2502
      iout = []
2503
      for field in self.op.output_fields:
2504
        if field == "name":
2505
          val = instance.name
2506
        elif field == "os":
2507
          val = instance.os
2508
        elif field == "pnode":
2509
          val = instance.primary_node
2510
        elif field == "snodes":
2511
          val = list(instance.secondary_nodes)
2512
        elif field == "admin_state":
2513
          val = (instance.status != "down")
2514
        elif field == "oper_state":
2515
          if instance.primary_node in bad_nodes:
2516
            val = None
2517
          else:
2518
            val = bool(live_data.get(instance.name))
2519
        elif field == "status":
2520
          if instance.primary_node in bad_nodes:
2521
            val = "ERROR_nodedown"
2522
          else:
2523
            running = bool(live_data.get(instance.name))
2524
            if running:
2525
              if instance.status != "down":
2526
                val = "running"
2527
              else:
2528
                val = "ERROR_up"
2529
            else:
2530
              if instance.status != "down":
2531
                val = "ERROR_down"
2532
              else:
2533
                val = "ADMIN_down"
2534
        elif field == "admin_ram":
2535
          val = instance.memory
2536
        elif field == "oper_ram":
2537
          if instance.primary_node in bad_nodes:
2538
            val = None
2539
          elif instance.name in live_data:
2540
            val = live_data[instance.name].get("memory", "?")
2541
          else:
2542
            val = "-"
2543
        elif field == "disk_template":
2544
          val = instance.disk_template
2545
        elif field == "ip":
2546
          val = instance.nics[0].ip
2547
        elif field == "bridge":
2548
          val = instance.nics[0].bridge
2549
        elif field == "mac":
2550
          val = instance.nics[0].mac
2551
        elif field == "sda_size" or field == "sdb_size":
2552
          disk = instance.FindDisk(field[:3])
2553
          if disk is None:
2554
            val = None
2555
          else:
2556
            val = disk.size
2557
        elif field == "vcpus":
2558
          val = instance.vcpus
2559
        elif field == "tags":
2560
          val = list(instance.GetTags())
2561
        else:
2562
          raise errors.ParameterError(field)
2563
        iout.append(val)
2564
      output.append(iout)
2565

    
2566
    return output
2567

    
2568

    
2569
class LUFailoverInstance(LogicalUnit):
2570
  """Failover an instance.
2571

2572
  """
2573
  HPATH = "instance-failover"
2574
  HTYPE = constants.HTYPE_INSTANCE
2575
  _OP_REQP = ["instance_name", "ignore_consistency"]
2576
  REQ_BGL = False
2577

    
2578
  def ExpandNames(self):
2579
    self._ExpandAndLockInstance()
2580
    self.needed_locks[locking.LEVEL_NODE] = []
2581
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2582

    
2583
  def DeclareLocks(self, level):
2584
    if level == locking.LEVEL_NODE:
2585
      self._LockInstancesNodes()
2586

    
2587
  def BuildHooksEnv(self):
2588
    """Build hooks env.
2589

2590
    This runs on master, primary and secondary nodes of the instance.
2591

2592
    """
2593
    env = {
2594
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2595
      }
2596
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2597
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2598
    return env, nl, nl
2599

    
2600
  def CheckPrereq(self):
2601
    """Check prerequisites.
2602

2603
    This checks that the instance is in the cluster.
2604

2605
    """
2606
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2607
    assert self.instance is not None, \
2608
      "Cannot retrieve locked instance %s" % self.op.instance_name
2609

    
2610
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2611
      raise errors.OpPrereqError("Instance's disk layout is not"
2612
                                 " network mirrored, cannot failover.")
2613

    
2614
    secondary_nodes = instance.secondary_nodes
2615
    if not secondary_nodes:
2616
      raise errors.ProgrammerError("no secondary node but using "
2617
                                   "a mirrored disk template")
2618

    
2619
    target_node = secondary_nodes[0]
2620
    # check memory requirements on the secondary node
2621
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2622
                         instance.name, instance.memory)
2623

    
2624
    # check bridge existance
2625
    brlist = [nic.bridge for nic in instance.nics]
2626
    if not rpc.call_bridges_exist(target_node, brlist):
2627
      raise errors.OpPrereqError("One or more target bridges %s does not"
2628
                                 " exist on destination node '%s'" %
2629
                                 (brlist, target_node))
2630

    
2631
  def Exec(self, feedback_fn):
2632
    """Failover an instance.
2633

2634
    The failover is done by shutting it down on its present node and
2635
    starting it on the secondary.
2636

2637
    """
2638
    instance = self.instance
2639

    
2640
    source_node = instance.primary_node
2641
    target_node = instance.secondary_nodes[0]
2642

    
2643
    feedback_fn("* checking disk consistency between source and target")
2644
    for dev in instance.disks:
2645
      # for drbd, these are drbd over lvm
2646
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2647
        if instance.status == "up" and not self.op.ignore_consistency:
2648
          raise errors.OpExecError("Disk %s is degraded on target node,"
2649
                                   " aborting failover." % dev.iv_name)
2650

    
2651
    feedback_fn("* shutting down instance on source node")
2652
    logger.Info("Shutting down instance %s on node %s" %
2653
                (instance.name, source_node))
2654

    
2655
    if not rpc.call_instance_shutdown(source_node, instance):
2656
      if self.op.ignore_consistency:
2657
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2658
                     " anyway. Please make sure node %s is down"  %
2659
                     (instance.name, source_node, source_node))
2660
      else:
2661
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2662
                                 (instance.name, source_node))
2663

    
2664
    feedback_fn("* deactivating the instance's disks on source node")
2665
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2666
      raise errors.OpExecError("Can't shut down the instance's disks.")
2667

    
2668
    instance.primary_node = target_node
2669
    # distribute new instance config to the other nodes
2670
    self.cfg.Update(instance)
2671

    
2672
    # Only start the instance if it's marked as up
2673
    if instance.status == "up":
2674
      feedback_fn("* activating the instance's disks on target node")
2675
      logger.Info("Starting instance %s on node %s" %
2676
                  (instance.name, target_node))
2677

    
2678
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2679
                                               ignore_secondaries=True)
2680
      if not disks_ok:
2681
        _ShutdownInstanceDisks(instance, self.cfg)
2682
        raise errors.OpExecError("Can't activate the instance's disks")
2683

    
2684
      feedback_fn("* starting the instance on the target node")
2685
      if not rpc.call_instance_start(target_node, instance, None):
2686
        _ShutdownInstanceDisks(instance, self.cfg)
2687
        raise errors.OpExecError("Could not start instance %s on node %s." %
2688
                                 (instance.name, target_node))
2689

    
2690

    
2691
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2692
  """Create a tree of block devices on the primary node.
2693

2694
  This always creates all devices.
2695

2696
  """
2697
  if device.children:
2698
    for child in device.children:
2699
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2700
        return False
2701

    
2702
  cfg.SetDiskID(device, node)
2703
  new_id = rpc.call_blockdev_create(node, device, device.size,
2704
                                    instance.name, True, info)
2705
  if not new_id:
2706
    return False
2707
  if device.physical_id is None:
2708
    device.physical_id = new_id
2709
  return True
2710

    
2711

    
2712
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2713
  """Create a tree of block devices on a secondary node.
2714

2715
  If this device type has to be created on secondaries, create it and
2716
  all its children.
2717

2718
  If not, just recurse to children keeping the same 'force' value.
2719

2720
  """
2721
  if device.CreateOnSecondary():
2722
    force = True
2723
  if device.children:
2724
    for child in device.children:
2725
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2726
                                        child, force, info):
2727
        return False
2728

    
2729
  if not force:
2730
    return True
2731
  cfg.SetDiskID(device, node)
2732
  new_id = rpc.call_blockdev_create(node, device, device.size,
2733
                                    instance.name, False, info)
2734
  if not new_id:
2735
    return False
2736
  if device.physical_id is None:
2737
    device.physical_id = new_id
2738
  return True
2739

    
2740

    
2741
def _GenerateUniqueNames(cfg, exts):
2742
  """Generate a suitable LV name.
2743

2744
  This will generate a logical volume name for the given instance.
2745

2746
  """
2747
  results = []
2748
  for val in exts:
2749
    new_id = cfg.GenerateUniqueID()
2750
    results.append("%s%s" % (new_id, val))
2751
  return results
2752

    
2753

    
2754
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2755
  """Generate a drbd8 device complete with its children.
2756

2757
  """
2758
  port = cfg.AllocatePort()
2759
  vgname = cfg.GetVGName()
2760
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2761
                          logical_id=(vgname, names[0]))
2762
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2763
                          logical_id=(vgname, names[1]))
2764
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2765
                          logical_id = (primary, secondary, port),
2766
                          children = [dev_data, dev_meta],
2767
                          iv_name=iv_name)
2768
  return drbd_dev
2769

    
2770

    
2771
def _GenerateDiskTemplate(cfg, template_name,
2772
                          instance_name, primary_node,
2773
                          secondary_nodes, disk_sz, swap_sz,
2774
                          file_storage_dir, file_driver):
2775
  """Generate the entire disk layout for a given template type.
2776

2777
  """
2778
  #TODO: compute space requirements
2779

    
2780
  vgname = cfg.GetVGName()
2781
  if template_name == constants.DT_DISKLESS:
2782
    disks = []
2783
  elif template_name == constants.DT_PLAIN:
2784
    if len(secondary_nodes) != 0:
2785
      raise errors.ProgrammerError("Wrong template configuration")
2786

    
2787
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2788
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2789
                           logical_id=(vgname, names[0]),
2790
                           iv_name = "sda")
2791
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2792
                           logical_id=(vgname, names[1]),
2793
                           iv_name = "sdb")
2794
    disks = [sda_dev, sdb_dev]
2795
  elif template_name == constants.DT_DRBD8:
2796
    if len(secondary_nodes) != 1:
2797
      raise errors.ProgrammerError("Wrong template configuration")
2798
    remote_node = secondary_nodes[0]
2799
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2800
                                       ".sdb_data", ".sdb_meta"])
2801
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2802
                                         disk_sz, names[0:2], "sda")
2803
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2804
                                         swap_sz, names[2:4], "sdb")
2805
    disks = [drbd_sda_dev, drbd_sdb_dev]
2806
  elif template_name == constants.DT_FILE:
2807
    if len(secondary_nodes) != 0:
2808
      raise errors.ProgrammerError("Wrong template configuration")
2809

    
2810
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2811
                                iv_name="sda", logical_id=(file_driver,
2812
                                "%s/sda" % file_storage_dir))
2813
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2814
                                iv_name="sdb", logical_id=(file_driver,
2815
                                "%s/sdb" % file_storage_dir))
2816
    disks = [file_sda_dev, file_sdb_dev]
2817
  else:
2818
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2819
  return disks
2820

    
2821

    
2822
def _GetInstanceInfoText(instance):
2823
  """Compute that text that should be added to the disk's metadata.
2824

2825
  """
2826
  return "originstname+%s" % instance.name
2827

    
2828

    
2829
def _CreateDisks(cfg, instance):
2830
  """Create all disks for an instance.
2831

2832
  This abstracts away some work from AddInstance.
2833

2834
  Args:
2835
    instance: the instance object
2836

2837
  Returns:
2838
    True or False showing the success of the creation process
2839

2840
  """
2841
  info = _GetInstanceInfoText(instance)
2842

    
2843
  if instance.disk_template == constants.DT_FILE:
2844
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2845
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2846
                                              file_storage_dir)
2847

    
2848
    if not result:
2849
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2850
      return False
2851

    
2852
    if not result[0]:
2853
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2854
      return False
2855

    
2856
  for device in instance.disks:
2857
    logger.Info("creating volume %s for instance %s" %
2858
                (device.iv_name, instance.name))
2859
    #HARDCODE
2860
    for secondary_node in instance.secondary_nodes:
2861
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2862
                                        device, False, info):
2863
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2864
                     (device.iv_name, device, secondary_node))
2865
        return False
2866
    #HARDCODE
2867
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2868
                                    instance, device, info):
2869
      logger.Error("failed to create volume %s on primary!" %
2870
                   device.iv_name)
2871
      return False
2872

    
2873
  return True
2874

    
2875

    
2876
def _RemoveDisks(instance, cfg):
2877
  """Remove all disks for an instance.
2878

2879
  This abstracts away some work from `AddInstance()` and
2880
  `RemoveInstance()`. Note that in case some of the devices couldn't
2881
  be removed, the removal will continue with the other ones (compare
2882
  with `_CreateDisks()`).
2883

2884
  Args:
2885
    instance: the instance object
2886

2887
  Returns:
2888
    True or False showing the success of the removal proces
2889

2890
  """
2891
  logger.Info("removing block devices for instance %s" % instance.name)
2892

    
2893
  result = True
2894
  for device in instance.disks:
2895
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2896
      cfg.SetDiskID(disk, node)
2897
      if not rpc.call_blockdev_remove(node, disk):
2898
        logger.Error("could not remove block device %s on node %s,"
2899
                     " continuing anyway" %
2900
                     (device.iv_name, node))
2901
        result = False
2902

    
2903
  if instance.disk_template == constants.DT_FILE:
2904
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2905
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2906
                                            file_storage_dir):
2907
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2908
      result = False
2909

    
2910
  return result
2911

    
2912

    
2913
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2914
  """Compute disk size requirements in the volume group
2915

2916
  This is currently hard-coded for the two-drive layout.
2917

2918
  """
2919
  # Required free disk space as a function of disk and swap space
2920
  req_size_dict = {
2921
    constants.DT_DISKLESS: None,
2922
    constants.DT_PLAIN: disk_size + swap_size,
2923
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2924
    constants.DT_DRBD8: disk_size + swap_size + 256,
2925
    constants.DT_FILE: None,
2926
  }
2927

    
2928
  if disk_template not in req_size_dict:
2929
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2930
                                 " is unknown" %  disk_template)
2931

    
2932
  return req_size_dict[disk_template]
2933

    
2934

    
2935
class LUCreateInstance(LogicalUnit):
2936
  """Create an instance.
2937

2938
  """
2939
  HPATH = "instance-add"
2940
  HTYPE = constants.HTYPE_INSTANCE
2941
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2942
              "disk_template", "swap_size", "mode", "start", "vcpus",
2943
              "wait_for_sync", "ip_check", "mac"]
2944

    
2945
  def _RunAllocator(self):
2946
    """Run the allocator based on input opcode.
2947

2948
    """
2949
    disks = [{"size": self.op.disk_size, "mode": "w"},
2950
             {"size": self.op.swap_size, "mode": "w"}]
2951
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2952
             "bridge": self.op.bridge}]
2953
    ial = IAllocator(self.cfg, self.sstore,
2954
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2955
                     name=self.op.instance_name,
2956
                     disk_template=self.op.disk_template,
2957
                     tags=[],
2958
                     os=self.op.os_type,
2959
                     vcpus=self.op.vcpus,
2960
                     mem_size=self.op.mem_size,
2961
                     disks=disks,
2962
                     nics=nics,
2963
                     )
2964

    
2965
    ial.Run(self.op.iallocator)
2966

    
2967
    if not ial.success:
2968
      raise errors.OpPrereqError("Can't compute nodes using"
2969
                                 " iallocator '%s': %s" % (self.op.iallocator,
2970
                                                           ial.info))
2971
    if len(ial.nodes) != ial.required_nodes:
2972
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2973
                                 " of nodes (%s), required %s" %
2974
                                 (len(ial.nodes), ial.required_nodes))
2975
    self.op.pnode = ial.nodes[0]
2976
    logger.ToStdout("Selected nodes for the instance: %s" %
2977
                    (", ".join(ial.nodes),))
2978
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2979
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2980
    if ial.required_nodes == 2:
2981
      self.op.snode = ial.nodes[1]
2982

    
2983
  def BuildHooksEnv(self):
2984
    """Build hooks env.
2985

2986
    This runs on master, primary and secondary nodes of the instance.
2987

2988
    """
2989
    env = {
2990
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2991
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2992
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2993
      "INSTANCE_ADD_MODE": self.op.mode,
2994
      }
2995
    if self.op.mode == constants.INSTANCE_IMPORT:
2996
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2997
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2998
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2999

    
3000
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3001
      primary_node=self.op.pnode,
3002
      secondary_nodes=self.secondaries,
3003
      status=self.instance_status,
3004
      os_type=self.op.os_type,
3005
      memory=self.op.mem_size,
3006
      vcpus=self.op.vcpus,
3007
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3008
    ))
3009

    
3010
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3011
          self.secondaries)
3012
    return env, nl, nl
3013

    
3014

    
3015
  def CheckPrereq(self):
3016
    """Check prerequisites.
3017

3018
    """
3019
    # set optional parameters to none if they don't exist
3020
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3021
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3022
                 "vnc_bind_address"]:
3023
      if not hasattr(self.op, attr):
3024
        setattr(self.op, attr, None)
3025

    
3026
    if self.op.mode not in (constants.INSTANCE_CREATE,
3027
                            constants.INSTANCE_IMPORT):
3028
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3029
                                 self.op.mode)
3030

    
3031
    if (not self.cfg.GetVGName() and
3032
        self.op.disk_template not in constants.DTS_NOT_LVM):
3033
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3034
                                 " instances")
3035

    
3036
    if self.op.mode == constants.INSTANCE_IMPORT:
3037
      src_node = getattr(self.op, "src_node", None)
3038
      src_path = getattr(self.op, "src_path", None)
3039
      if src_node is None or src_path is None:
3040
        raise errors.OpPrereqError("Importing an instance requires source"
3041
                                   " node and path options")
3042
      src_node_full = self.cfg.ExpandNodeName(src_node)
3043
      if src_node_full is None:
3044
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3045
      self.op.src_node = src_node = src_node_full
3046

    
3047
      if not os.path.isabs(src_path):
3048
        raise errors.OpPrereqError("The source path must be absolute")
3049

    
3050
      export_info = rpc.call_export_info(src_node, src_path)
3051

    
3052
      if not export_info:
3053
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3054

    
3055
      if not export_info.has_section(constants.INISECT_EXP):
3056
        raise errors.ProgrammerError("Corrupted export config")
3057

    
3058
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3059
      if (int(ei_version) != constants.EXPORT_VERSION):
3060
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3061
                                   (ei_version, constants.EXPORT_VERSION))
3062

    
3063
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3064
        raise errors.OpPrereqError("Can't import instance with more than"
3065
                                   " one data disk")
3066

    
3067
      # FIXME: are the old os-es, disk sizes, etc. useful?
3068
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3069
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3070
                                                         'disk0_dump'))
3071
      self.src_image = diskimage
3072
    else: # INSTANCE_CREATE
3073
      if getattr(self.op, "os_type", None) is None:
3074
        raise errors.OpPrereqError("No guest OS specified")
3075

    
3076
    #### instance parameters check
3077

    
3078
    # disk template and mirror node verification
3079
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3080
      raise errors.OpPrereqError("Invalid disk template name")
3081

    
3082
    # instance name verification
3083
    hostname1 = utils.HostInfo(self.op.instance_name)
3084

    
3085
    self.op.instance_name = instance_name = hostname1.name
3086
    instance_list = self.cfg.GetInstanceList()
3087
    if instance_name in instance_list:
3088
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3089
                                 instance_name)
3090

    
3091
    # ip validity checks
3092
    ip = getattr(self.op, "ip", None)
3093
    if ip is None or ip.lower() == "none":
3094
      inst_ip = None
3095
    elif ip.lower() == "auto":
3096
      inst_ip = hostname1.ip
3097
    else:
3098
      if not utils.IsValidIP(ip):
3099
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3100
                                   " like a valid IP" % ip)
3101
      inst_ip = ip
3102
    self.inst_ip = self.op.ip = inst_ip
3103

    
3104
    if self.op.start and not self.op.ip_check:
3105
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3106
                                 " adding an instance in start mode")
3107

    
3108
    if self.op.ip_check:
3109
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3110
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3111
                                   (hostname1.ip, instance_name))
3112

    
3113
    # MAC address verification
3114
    if self.op.mac != "auto":
3115
      if not utils.IsValidMac(self.op.mac.lower()):
3116
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3117
                                   self.op.mac)
3118

    
3119
    # bridge verification
3120
    bridge = getattr(self.op, "bridge", None)
3121
    if bridge is None:
3122
      self.op.bridge = self.cfg.GetDefBridge()
3123
    else:
3124
      self.op.bridge = bridge
3125

    
3126
    # boot order verification
3127
    if self.op.hvm_boot_order is not None:
3128
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3129
        raise errors.OpPrereqError("invalid boot order specified,"
3130
                                   " must be one or more of [acdn]")
3131
    # file storage checks
3132
    if (self.op.file_driver and
3133
        not self.op.file_driver in constants.FILE_DRIVER):
3134
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3135
                                 self.op.file_driver)
3136

    
3137
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3138
      raise errors.OpPrereqError("File storage directory not a relative"
3139
                                 " path")
3140
    #### allocator run
3141

    
3142
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3143
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3144
                                 " node must be given")
3145

    
3146
    if self.op.iallocator is not None:
3147
      self._RunAllocator()
3148

    
3149
    #### node related checks
3150

    
3151
    # check primary node
3152
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3153
    if pnode is None:
3154
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3155
                                 self.op.pnode)
3156
    self.op.pnode = pnode.name
3157
    self.pnode = pnode
3158
    self.secondaries = []
3159

    
3160
    # mirror node verification
3161
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3162
      if getattr(self.op, "snode", None) is None:
3163
        raise errors.OpPrereqError("The networked disk templates need"
3164
                                   " a mirror node")
3165

    
3166
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3167
      if snode_name is None:
3168
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3169
                                   self.op.snode)
3170
      elif snode_name == pnode.name:
3171
        raise errors.OpPrereqError("The secondary node cannot be"
3172
                                   " the primary node.")
3173
      self.secondaries.append(snode_name)
3174

    
3175
    req_size = _ComputeDiskSize(self.op.disk_template,
3176
                                self.op.disk_size, self.op.swap_size)
3177

    
3178
    # Check lv size requirements
3179
    if req_size is not None:
3180
      nodenames = [pnode.name] + self.secondaries
3181
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3182
      for node in nodenames:
3183
        info = nodeinfo.get(node, None)
3184
        if not info:
3185
          raise errors.OpPrereqError("Cannot get current information"
3186
                                     " from node '%s'" % node)
3187
        vg_free = info.get('vg_free', None)
3188
        if not isinstance(vg_free, int):
3189
          raise errors.OpPrereqError("Can't compute free disk space on"
3190
                                     " node %s" % node)
3191
        if req_size > info['vg_free']:
3192
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3193
                                     " %d MB available, %d MB required" %
3194
                                     (node, info['vg_free'], req_size))
3195

    
3196
    # os verification
3197
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3198
    if not os_obj:
3199
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3200
                                 " primary node"  % self.op.os_type)
3201

    
3202
    if self.op.kernel_path == constants.VALUE_NONE:
3203
      raise errors.OpPrereqError("Can't set instance kernel to none")
3204

    
3205

    
3206
    # bridge check on primary node
3207
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3208
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3209
                                 " destination node '%s'" %
3210
                                 (self.op.bridge, pnode.name))
3211

    
3212
    # memory check on primary node
3213
    if self.op.start:
3214
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3215
                           "creating instance %s" % self.op.instance_name,
3216
                           self.op.mem_size)
3217

    
3218
    # hvm_cdrom_image_path verification
3219
    if self.op.hvm_cdrom_image_path is not None:
3220
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3221
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3222
                                   " be an absolute path or None, not %s" %
3223
                                   self.op.hvm_cdrom_image_path)
3224
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3225
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3226
                                   " regular file or a symlink pointing to"
3227
                                   " an existing regular file, not %s" %
3228
                                   self.op.hvm_cdrom_image_path)
3229

    
3230
    # vnc_bind_address verification
3231
    if self.op.vnc_bind_address is not None:
3232
      if not utils.IsValidIP(self.op.vnc_bind_address):
3233
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3234
                                   " like a valid IP address" %
3235
                                   self.op.vnc_bind_address)
3236

    
3237
    if self.op.start:
3238
      self.instance_status = 'up'
3239
    else:
3240
      self.instance_status = 'down'
3241

    
3242
  def Exec(self, feedback_fn):
3243
    """Create and add the instance to the cluster.
3244

3245
    """
3246
    instance = self.op.instance_name
3247
    pnode_name = self.pnode.name
3248

    
3249
    if self.op.mac == "auto":
3250
      mac_address = self.cfg.GenerateMAC()
3251
    else:
3252
      mac_address = self.op.mac
3253

    
3254
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3255
    if self.inst_ip is not None:
3256
      nic.ip = self.inst_ip
3257

    
3258
    ht_kind = self.sstore.GetHypervisorType()
3259
    if ht_kind in constants.HTS_REQ_PORT:
3260
      network_port = self.cfg.AllocatePort()
3261
    else:
3262
      network_port = None
3263

    
3264
    if self.op.vnc_bind_address is None:
3265
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3266

    
3267
    # this is needed because os.path.join does not accept None arguments
3268
    if self.op.file_storage_dir is None:
3269
      string_file_storage_dir = ""
3270
    else:
3271
      string_file_storage_dir = self.op.file_storage_dir
3272

    
3273
    # build the full file storage dir path
3274
    file_storage_dir = os.path.normpath(os.path.join(
3275
                                        self.sstore.GetFileStorageDir(),
3276
                                        string_file_storage_dir, instance))
3277

    
3278

    
3279
    disks = _GenerateDiskTemplate(self.cfg,
3280
                                  self.op.disk_template,
3281
                                  instance, pnode_name,
3282
                                  self.secondaries, self.op.disk_size,
3283
                                  self.op.swap_size,
3284
                                  file_storage_dir,
3285
                                  self.op.file_driver)
3286

    
3287
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3288
                            primary_node=pnode_name,
3289
                            memory=self.op.mem_size,
3290
                            vcpus=self.op.vcpus,
3291
                            nics=[nic], disks=disks,
3292
                            disk_template=self.op.disk_template,
3293
                            status=self.instance_status,
3294
                            network_port=network_port,
3295
                            kernel_path=self.op.kernel_path,
3296
                            initrd_path=self.op.initrd_path,
3297
                            hvm_boot_order=self.op.hvm_boot_order,
3298
                            hvm_acpi=self.op.hvm_acpi,
3299
                            hvm_pae=self.op.hvm_pae,
3300
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3301
                            vnc_bind_address=self.op.vnc_bind_address,
3302
                            )
3303

    
3304
    feedback_fn("* creating instance disks...")
3305
    if not _CreateDisks(self.cfg, iobj):
3306
      _RemoveDisks(iobj, self.cfg)
3307
      raise errors.OpExecError("Device creation failed, reverting...")
3308

    
3309
    feedback_fn("adding instance %s to cluster config" % instance)
3310

    
3311
    self.cfg.AddInstance(iobj)
3312
    # Add the new instance to the Ganeti Lock Manager
3313
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3314

    
3315
    if self.op.wait_for_sync:
3316
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3317
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3318
      # make sure the disks are not degraded (still sync-ing is ok)
3319
      time.sleep(15)
3320
      feedback_fn("* checking mirrors status")
3321
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3322
    else:
3323
      disk_abort = False
3324

    
3325
    if disk_abort:
3326
      _RemoveDisks(iobj, self.cfg)
3327
      self.cfg.RemoveInstance(iobj.name)
3328
      # Remove the new instance from the Ganeti Lock Manager
3329
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3330
      raise errors.OpExecError("There are some degraded disks for"
3331
                               " this instance")
3332

    
3333
    feedback_fn("creating os for instance %s on node %s" %
3334
                (instance, pnode_name))
3335

    
3336
    if iobj.disk_template != constants.DT_DISKLESS:
3337
      if self.op.mode == constants.INSTANCE_CREATE:
3338
        feedback_fn("* running the instance OS create scripts...")
3339
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3340
          raise errors.OpExecError("could not add os for instance %s"
3341
                                   " on node %s" %
3342
                                   (instance, pnode_name))
3343

    
3344
      elif self.op.mode == constants.INSTANCE_IMPORT:
3345
        feedback_fn("* running the instance OS import scripts...")
3346
        src_node = self.op.src_node
3347
        src_image = self.src_image
3348
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3349
                                                src_node, src_image):
3350
          raise errors.OpExecError("Could not import os for instance"
3351
                                   " %s on node %s" %
3352
                                   (instance, pnode_name))
3353
      else:
3354
        # also checked in the prereq part
3355
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3356
                                     % self.op.mode)
3357

    
3358
    if self.op.start:
3359
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3360
      feedback_fn("* starting instance...")
3361
      if not rpc.call_instance_start(pnode_name, iobj, None):
3362
        raise errors.OpExecError("Could not start instance")
3363

    
3364

    
3365
class LUConnectConsole(NoHooksLU):
3366
  """Connect to an instance's console.
3367

3368
  This is somewhat special in that it returns the command line that
3369
  you need to run on the master node in order to connect to the
3370
  console.
3371

3372
  """
3373
  _OP_REQP = ["instance_name"]
3374
  REQ_BGL = False
3375

    
3376
  def ExpandNames(self):
3377
    self._ExpandAndLockInstance()
3378

    
3379
  def CheckPrereq(self):
3380
    """Check prerequisites.
3381

3382
    This checks that the instance is in the cluster.
3383

3384
    """
3385
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3386
    assert self.instance is not None, \
3387
      "Cannot retrieve locked instance %s" % self.op.instance_name
3388

    
3389
  def Exec(self, feedback_fn):
3390
    """Connect to the console of an instance
3391

3392
    """
3393
    instance = self.instance
3394
    node = instance.primary_node
3395

    
3396
    node_insts = rpc.call_instance_list([node])[node]
3397
    if node_insts is False:
3398
      raise errors.OpExecError("Can't connect to node %s." % node)
3399

    
3400
    if instance.name not in node_insts:
3401
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3402

    
3403
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3404

    
3405
    hyper = hypervisor.GetHypervisor()
3406
    console_cmd = hyper.GetShellCommandForConsole(instance)
3407

    
3408
    # build ssh cmdline
3409
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3410

    
3411

    
3412
class LUReplaceDisks(LogicalUnit):
3413
  """Replace the disks of an instance.
3414

3415
  """
3416
  HPATH = "mirrors-replace"
3417
  HTYPE = constants.HTYPE_INSTANCE
3418
  _OP_REQP = ["instance_name", "mode", "disks"]
3419

    
3420
  def _RunAllocator(self):
3421
    """Compute a new secondary node using an IAllocator.
3422

3423
    """
3424
    ial = IAllocator(self.cfg, self.sstore,
3425
                     mode=constants.IALLOCATOR_MODE_RELOC,
3426
                     name=self.op.instance_name,
3427
                     relocate_from=[self.sec_node])
3428

    
3429
    ial.Run(self.op.iallocator)
3430

    
3431
    if not ial.success:
3432
      raise errors.OpPrereqError("Can't compute nodes using"
3433
                                 " iallocator '%s': %s" % (self.op.iallocator,
3434
                                                           ial.info))
3435
    if len(ial.nodes) != ial.required_nodes:
3436
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3437
                                 " of nodes (%s), required %s" %
3438
                                 (len(ial.nodes), ial.required_nodes))
3439
    self.op.remote_node = ial.nodes[0]
3440
    logger.ToStdout("Selected new secondary for the instance: %s" %
3441
                    self.op.remote_node)
3442

    
3443
  def BuildHooksEnv(self):
3444
    """Build hooks env.
3445

3446
    This runs on the master, the primary and all the secondaries.
3447

3448
    """
3449
    env = {
3450
      "MODE": self.op.mode,
3451
      "NEW_SECONDARY": self.op.remote_node,
3452
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3453
      }
3454
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3455
    nl = [
3456
      self.sstore.GetMasterNode(),
3457
      self.instance.primary_node,
3458
      ]
3459
    if self.op.remote_node is not None:
3460
      nl.append(self.op.remote_node)
3461
    return env, nl, nl
3462

    
3463
  def CheckPrereq(self):
3464
    """Check prerequisites.
3465

3466
    This checks that the instance is in the cluster.
3467

3468
    """
3469
    if not hasattr(self.op, "remote_node"):
3470
      self.op.remote_node = None
3471

    
3472
    instance = self.cfg.GetInstanceInfo(
3473
      self.cfg.ExpandInstanceName(self.op.instance_name))
3474
    if instance is None:
3475
      raise errors.OpPrereqError("Instance '%s' not known" %
3476
                                 self.op.instance_name)
3477
    self.instance = instance
3478
    self.op.instance_name = instance.name
3479

    
3480
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3481
      raise errors.OpPrereqError("Instance's disk layout is not"
3482
                                 " network mirrored.")
3483

    
3484
    if len(instance.secondary_nodes) != 1:
3485
      raise errors.OpPrereqError("The instance has a strange layout,"
3486
                                 " expected one secondary but found %d" %
3487
                                 len(instance.secondary_nodes))
3488

    
3489
    self.sec_node = instance.secondary_nodes[0]
3490

    
3491
    ia_name = getattr(self.op, "iallocator", None)
3492
    if ia_name is not None:
3493
      if self.op.remote_node is not None:
3494
        raise errors.OpPrereqError("Give either the iallocator or the new"
3495
                                   " secondary, not both")
3496
      self.op.remote_node = self._RunAllocator()
3497

    
3498
    remote_node = self.op.remote_node
3499
    if remote_node is not None:
3500
      remote_node = self.cfg.ExpandNodeName(remote_node)
3501
      if remote_node is None:
3502
        raise errors.OpPrereqError("Node '%s' not known" %
3503
                                   self.op.remote_node)
3504
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3505
    else:
3506
      self.remote_node_info = None
3507
    if remote_node == instance.primary_node:
3508
      raise errors.OpPrereqError("The specified node is the primary node of"
3509
                                 " the instance.")
3510
    elif remote_node == self.sec_node:
3511
      if self.op.mode == constants.REPLACE_DISK_SEC:
3512
        # this is for DRBD8, where we can't execute the same mode of
3513
        # replacement as for drbd7 (no different port allocated)
3514
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3515
                                   " replacement")
3516
    if instance.disk_template == constants.DT_DRBD8:
3517
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3518
          remote_node is not None):
3519
        # switch to replace secondary mode
3520
        self.op.mode = constants.REPLACE_DISK_SEC
3521

    
3522
      if self.op.mode == constants.REPLACE_DISK_ALL:
3523
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3524
                                   " secondary disk replacement, not"
3525
                                   " both at once")
3526
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3527
        if remote_node is not None:
3528
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3529
                                     " the secondary while doing a primary"
3530
                                     " node disk replacement")
3531
        self.tgt_node = instance.primary_node
3532
        self.oth_node = instance.secondary_nodes[0]
3533
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3534
        self.new_node = remote_node # this can be None, in which case
3535
                                    # we don't change the secondary
3536
        self.tgt_node = instance.secondary_nodes[0]
3537
        self.oth_node = instance.primary_node
3538
      else:
3539
        raise errors.ProgrammerError("Unhandled disk replace mode")
3540

    
3541
    for name in self.op.disks:
3542
      if instance.FindDisk(name) is None:
3543
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3544
                                   (name, instance.name))
3545
    self.op.remote_node = remote_node
3546

    
3547
  def _ExecD8DiskOnly(self, feedback_fn):
3548
    """Replace a disk on the primary or secondary for dbrd8.
3549

3550
    The algorithm for replace is quite complicated:
3551
      - for each disk to be replaced:
3552
        - create new LVs on the target node with unique names
3553
        - detach old LVs from the drbd device
3554
        - rename old LVs to name_replaced.<time_t>
3555
        - rename new LVs to old LVs
3556
        - attach the new LVs (with the old names now) to the drbd device
3557
      - wait for sync across all devices
3558
      - for each modified disk:
3559
        - remove old LVs (which have the name name_replaces.<time_t>)
3560

3561
    Failures are not very well handled.
3562

3563
    """
3564
    steps_total = 6
3565
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3566
    instance = self.instance
3567
    iv_names = {}
3568
    vgname = self.cfg.GetVGName()
3569
    # start of work
3570
    cfg = self.cfg
3571
    tgt_node = self.tgt_node
3572
    oth_node = self.oth_node
3573

    
3574
    # Step: check device activation
3575
    self.proc.LogStep(1, steps_total, "check device existence")
3576
    info("checking volume groups")
3577
    my_vg = cfg.GetVGName()
3578
    results = rpc.call_vg_list([oth_node, tgt_node])
3579
    if not results:
3580
      raise errors.OpExecError("Can't list volume groups on the nodes")
3581
    for node in oth_node, tgt_node:
3582
      res = results.get(node, False)
3583
      if not res or my_vg not in res:
3584
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3585
                                 (my_vg, node))
3586
    for dev in instance.disks:
3587
      if not dev.iv_name in self.op.disks:
3588
        continue
3589
      for node in tgt_node, oth_node:
3590
        info("checking %s on %s" % (dev.iv_name, node))
3591
        cfg.SetDiskID(dev, node)
3592
        if not rpc.call_blockdev_find(node, dev):
3593
          raise errors.OpExecError("Can't find device %s on node %s" %
3594
                                   (dev.iv_name, node))
3595

    
3596
    # Step: check other node consistency
3597
    self.proc.LogStep(2, steps_total, "check peer consistency")
3598
    for dev in instance.disks:
3599
      if not dev.iv_name in self.op.disks:
3600
        continue
3601
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3602
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3603
                                   oth_node==instance.primary_node):
3604
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3605
                                 " to replace disks on this node (%s)" %
3606
                                 (oth_node, tgt_node))
3607

    
3608
    # Step: create new storage
3609
    self.proc.LogStep(3, steps_total, "allocate new storage")
3610
    for dev in instance.disks:
3611
      if not dev.iv_name in self.op.disks:
3612
        continue
3613
      size = dev.size
3614
      cfg.SetDiskID(dev, tgt_node)
3615
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3616
      names = _GenerateUniqueNames(cfg, lv_names)
3617
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3618
                             logical_id=(vgname, names[0]))
3619
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3620
                             logical_id=(vgname, names[1]))
3621
      new_lvs = [lv_data, lv_meta]
3622
      old_lvs = dev.children
3623
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3624
      info("creating new local storage on %s for %s" %
3625
           (tgt_node, dev.iv_name))
3626
      # since we *always* want to create this LV, we use the
3627
      # _Create...OnPrimary (which forces the creation), even if we
3628
      # are talking about the secondary node
3629
      for new_lv in new_lvs:
3630
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3631
                                        _GetInstanceInfoText(instance)):
3632
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3633
                                   " node '%s'" %
3634
                                   (new_lv.logical_id[1], tgt_node))
3635

    
3636
    # Step: for each lv, detach+rename*2+attach
3637
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3638
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3639
      info("detaching %s drbd from local storage" % dev.iv_name)
3640
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3641
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3642
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3643
      #dev.children = []
3644
      #cfg.Update(instance)
3645

    
3646
      # ok, we created the new LVs, so now we know we have the needed
3647
      # storage; as such, we proceed on the target node to rename
3648
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3649
      # using the assumption that logical_id == physical_id (which in
3650
      # turn is the unique_id on that node)
3651

    
3652
      # FIXME(iustin): use a better name for the replaced LVs
3653
      temp_suffix = int(time.time())
3654
      ren_fn = lambda d, suff: (d.physical_id[0],
3655
                                d.physical_id[1] + "_replaced-%s" % suff)
3656
      # build the rename list based on what LVs exist on the node
3657
      rlist = []
3658
      for to_ren in old_lvs:
3659
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3660
        if find_res is not None: # device exists
3661
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3662

    
3663
      info("renaming the old LVs on the target node")
3664
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3665
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3666
      # now we rename the new LVs to the old LVs
3667
      info("renaming the new LVs on the target node")
3668
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3669
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3670
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3671

    
3672
      for old, new in zip(old_lvs, new_lvs):
3673
        new.logical_id = old.logical_id
3674
        cfg.SetDiskID(new, tgt_node)
3675

    
3676
      for disk in old_lvs:
3677
        disk.logical_id = ren_fn(disk, temp_suffix)
3678
        cfg.SetDiskID(disk, tgt_node)
3679

    
3680
      # now that the new lvs have the old name, we can add them to the device
3681
      info("adding new mirror component on %s" % tgt_node)
3682
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3683
        for new_lv in new_lvs:
3684
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3685
            warning("Can't rollback device %s", hint="manually cleanup unused"
3686
                    " logical volumes")
3687
        raise errors.OpExecError("Can't add local storage to drbd")
3688

    
3689
      dev.children = new_lvs
3690
      cfg.Update(instance)
3691

    
3692
    # Step: wait for sync
3693

    
3694
    # this can fail as the old devices are degraded and _WaitForSync
3695
    # does a combined result over all disks, so we don't check its
3696
    # return value
3697
    self.proc.LogStep(5, steps_total, "sync devices")
3698
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3699

    
3700
    # so check manually all the devices
3701
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3702
      cfg.SetDiskID(dev, instance.primary_node)
3703
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3704
      if is_degr:
3705
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3706

    
3707
    # Step: remove old storage
3708
    self.proc.LogStep(6, steps_total, "removing old storage")
3709
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3710
      info("remove logical volumes for %s" % name)
3711
      for lv in old_lvs:
3712
        cfg.SetDiskID(lv, tgt_node)
3713
        if not rpc.call_blockdev_remove(tgt_node, lv):
3714
          warning("Can't remove old LV", hint="manually remove unused LVs")
3715
          continue
3716

    
3717
  def _ExecD8Secondary(self, feedback_fn):
3718
    """Replace the secondary node for drbd8.
3719

3720
    The algorithm for replace is quite complicated:
3721
      - for all disks of the instance:
3722
        - create new LVs on the new node with same names
3723
        - shutdown the drbd device on the old secondary
3724
        - disconnect the drbd network on the primary
3725
        - create the drbd device on the new secondary
3726
        - network attach the drbd on the primary, using an artifice:
3727
          the drbd code for Attach() will connect to the network if it
3728
          finds a device which is connected to the good local disks but
3729
          not network enabled
3730
      - wait for sync across all devices
3731
      - remove all disks from the old secondary
3732

3733
    Failures are not very well handled.
3734

3735
    """
3736
    steps_total = 6
3737
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3738
    instance = self.instance
3739
    iv_names = {}
3740
    vgname = self.cfg.GetVGName()
3741
    # start of work
3742
    cfg = self.cfg
3743
    old_node = self.tgt_node
3744
    new_node = self.new_node
3745
    pri_node = instance.primary_node
3746

    
3747
    # Step: check device activation
3748
    self.proc.LogStep(1, steps_total, "check device existence")
3749
    info("checking volume groups")
3750
    my_vg = cfg.GetVGName()
3751
    results = rpc.call_vg_list([pri_node, new_node])
3752
    if not results:
3753
      raise errors.OpExecError("Can't list volume groups on the nodes")
3754
    for node in pri_node, new_node:
3755
      res = results.get(node, False)
3756
      if not res or my_vg not in res:
3757
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3758
                                 (my_vg, node))
3759
    for dev in instance.disks:
3760
      if not dev.iv_name in self.op.disks:
3761
        continue
3762
      info("checking %s on %s" % (dev.iv_name, pri_node))
3763
      cfg.SetDiskID(dev, pri_node)
3764
      if not rpc.call_blockdev_find(pri_node, dev):
3765
        raise errors.OpExecError("Can't find device %s on node %s" %
3766
                                 (dev.iv_name, pri_node))
3767

    
3768
    # Step: check other node consistency
3769
    self.proc.LogStep(2, steps_total, "check peer consistency")
3770
    for dev in instance.disks:
3771
      if not dev.iv_name in self.op.disks:
3772
        continue
3773
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3774
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3775
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3776
                                 " unsafe to replace the secondary" %
3777
                                 pri_node)
3778

    
3779
    # Step: create new storage
3780
    self.proc.LogStep(3, steps_total, "allocate new storage")
3781
    for dev in instance.disks:
3782
      size = dev.size
3783
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3784
      # since we *always* want to create this LV, we use the
3785
      # _Create...OnPrimary (which forces the creation), even if we
3786
      # are talking about the secondary node
3787
      for new_lv in dev.children:
3788
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3789
                                        _GetInstanceInfoText(instance)):
3790
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3791
                                   " node '%s'" %
3792
                                   (new_lv.logical_id[1], new_node))
3793

    
3794
      iv_names[dev.iv_name] = (dev, dev.children)
3795

    
3796
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3797
    for dev in instance.disks:
3798
      size = dev.size
3799
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3800
      # create new devices on new_node
3801
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3802
                              logical_id=(pri_node, new_node,
3803
                                          dev.logical_id[2]),
3804
                              children=dev.children)
3805
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3806
                                        new_drbd, False,
3807
                                      _GetInstanceInfoText(instance)):
3808
        raise errors.OpExecError("Failed to create new DRBD on"
3809
                                 " node '%s'" % new_node)
3810

    
3811
    for dev in instance.disks:
3812
      # we have new devices, shutdown the drbd on the old secondary
3813
      info("shutting down drbd for %s on old node" % dev.iv_name)
3814
      cfg.SetDiskID(dev, old_node)
3815
      if not rpc.call_blockdev_shutdown(old_node, dev):
3816
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3817
                hint="Please cleanup this device manually as soon as possible")
3818

    
3819
    info("detaching primary drbds from the network (=> standalone)")
3820
    done = 0
3821
    for dev in instance.disks:
3822
      cfg.SetDiskID(dev, pri_node)
3823
      # set the physical (unique in bdev terms) id to None, meaning
3824
      # detach from network
3825
      dev.physical_id = (None,) * len(dev.physical_id)
3826
      # and 'find' the device, which will 'fix' it to match the
3827
      # standalone state
3828
      if rpc.call_blockdev_find(pri_node, dev):
3829
        done += 1
3830
      else:
3831
        warning("Failed to detach drbd %s from network, unusual case" %
3832
                dev.iv_name)
3833

    
3834
    if not done:
3835
      # no detaches succeeded (very unlikely)
3836
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3837

    
3838
    # if we managed to detach at least one, we update all the disks of
3839
    # the instance to point to the new secondary
3840
    info("updating instance configuration")
3841
    for dev in instance.disks:
3842
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3843
      cfg.SetDiskID(dev, pri_node)
3844
    cfg.Update(instance)
3845

    
3846
    # and now perform the drbd attach
3847
    info("attaching primary drbds to new secondary (standalone => connected)")
3848
    failures = []
3849
    for dev in instance.disks:
3850
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3851
      # since the attach is smart, it's enough to 'find' the device,
3852
      # it will automatically activate the network, if the physical_id
3853
      # is correct
3854
      cfg.SetDiskID(dev, pri_node)
3855
      if not rpc.call_blockdev_find(pri_node, dev):
3856
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3857
                "please do a gnt-instance info to see the status of disks")
3858

    
3859
    # this can fail as the old devices are degraded and _WaitForSync
3860
    # does a combined result over all disks, so we don't check its
3861
    # return value
3862
    self.proc.LogStep(5, steps_total, "sync devices")
3863
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3864

    
3865
    # so check manually all the devices
3866
    for name, (dev, old_lvs) in iv_names.iteritems():
3867
      cfg.SetDiskID(dev, pri_node)
3868
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3869
      if is_degr:
3870
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3871

    
3872
    self.proc.LogStep(6, steps_total, "removing old storage")
3873
    for name, (dev, old_lvs) in iv_names.iteritems():
3874
      info("remove logical volumes for %s" % name)
3875
      for lv in old_lvs:
3876
        cfg.SetDiskID(lv, old_node)
3877
        if not rpc.call_blockdev_remove(old_node, lv):
3878
          warning("Can't remove LV on old secondary",
3879
                  hint="Cleanup stale volumes by hand")
3880

    
3881
  def Exec(self, feedback_fn):
3882
    """Execute disk replacement.
3883

3884
    This dispatches the disk replacement to the appropriate handler.
3885

3886
    """
3887
    instance = self.instance
3888

    
3889
    # Activate the instance disks if we're replacing them on a down instance
3890
    if instance.status == "down":
3891
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3892
      self.proc.ChainOpCode(op)
3893

    
3894
    if instance.disk_template == constants.DT_DRBD8:
3895
      if self.op.remote_node is None:
3896
        fn = self._ExecD8DiskOnly
3897
      else:
3898
        fn = self._ExecD8Secondary
3899
    else:
3900
      raise errors.ProgrammerError("Unhandled disk replacement case")
3901

    
3902
    ret = fn(feedback_fn)
3903

    
3904
    # Deactivate the instance disks if we're replacing them on a down instance
3905
    if instance.status == "down":
3906
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3907
      self.proc.ChainOpCode(op)
3908

    
3909
    return ret
3910

    
3911

    
3912
class LUGrowDisk(LogicalUnit):
3913
  """Grow a disk of an instance.
3914

3915
  """
3916
  HPATH = "disk-grow"
3917
  HTYPE = constants.HTYPE_INSTANCE
3918
  _OP_REQP = ["instance_name", "disk", "amount"]
3919

    
3920
  def BuildHooksEnv(self):
3921
    """Build hooks env.
3922

3923
    This runs on the master, the primary and all the secondaries.
3924

3925
    """
3926
    env = {
3927
      "DISK": self.op.disk,
3928
      "AMOUNT": self.op.amount,
3929
      }
3930
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3931
    nl = [
3932
      self.sstore.GetMasterNode(),
3933
      self.instance.primary_node,
3934
      ]
3935
    return env, nl, nl
3936

    
3937
  def CheckPrereq(self):
3938
    """Check prerequisites.
3939

3940
    This checks that the instance is in the cluster.
3941

3942
    """
3943
    instance = self.cfg.GetInstanceInfo(
3944
      self.cfg.ExpandInstanceName(self.op.instance_name))
3945
    if instance is None:
3946
      raise errors.OpPrereqError("Instance '%s' not known" %
3947
                                 self.op.instance_name)
3948
    self.instance = instance
3949
    self.op.instance_name = instance.name
3950

    
3951
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3952
      raise errors.OpPrereqError("Instance's disk layout does not support"
3953
                                 " growing.")
3954

    
3955
    if instance.FindDisk(self.op.disk) is None:
3956
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3957
                                 (self.op.disk, instance.name))
3958

    
3959
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3960
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3961
    for node in nodenames:
3962
      info = nodeinfo.get(node, None)
3963
      if not info:
3964
        raise errors.OpPrereqError("Cannot get current information"
3965
                                   " from node '%s'" % node)
3966
      vg_free = info.get('vg_free', None)
3967
      if not isinstance(vg_free, int):
3968
        raise errors.OpPrereqError("Can't compute free disk space on"
3969
                                   " node %s" % node)
3970
      if self.op.amount > info['vg_free']:
3971
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3972
                                   " %d MiB available, %d MiB required" %
3973
                                   (node, info['vg_free'], self.op.amount))
3974

    
3975
  def Exec(self, feedback_fn):
3976
    """Execute disk grow.
3977

3978
    """
3979
    instance = self.instance
3980
    disk = instance.FindDisk(self.op.disk)
3981
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3982
      self.cfg.SetDiskID(disk, node)
3983
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3984
      if not result or not isinstance(result, tuple) or len(result) != 2:
3985
        raise errors.OpExecError("grow request failed to node %s" % node)
3986
      elif not result[0]:
3987
        raise errors.OpExecError("grow request failed to node %s: %s" %
3988
                                 (node, result[1]))
3989
    disk.RecordGrow(self.op.amount)
3990
    self.cfg.Update(instance)
3991
    return
3992

    
3993

    
3994
class LUQueryInstanceData(NoHooksLU):
3995
  """Query runtime instance data.
3996

3997
  """
3998
  _OP_REQP = ["instances"]
3999

    
4000
  def CheckPrereq(self):
4001
    """Check prerequisites.
4002

4003
    This only checks the optional instance list against the existing names.
4004

4005
    """
4006
    if not isinstance(self.op.instances, list):
4007
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4008
    if self.op.instances:
4009
      self.wanted_instances = []
4010
      names = self.op.instances
4011
      for name in names:
4012
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4013
        if instance is None:
4014
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4015
        self.wanted_instances.append(instance)
4016
    else:
4017
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4018
                               in self.cfg.GetInstanceList()]
4019
    return
4020

    
4021

    
4022
  def _ComputeDiskStatus(self, instance, snode, dev):
4023
    """Compute block device status.
4024

4025
    """
4026
    self.cfg.SetDiskID(dev, instance.primary_node)
4027
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4028
    if dev.dev_type in constants.LDS_DRBD:
4029
      # we change the snode then (otherwise we use the one passed in)
4030
      if dev.logical_id[0] == instance.primary_node:
4031
        snode = dev.logical_id[1]
4032
      else:
4033
        snode = dev.logical_id[0]
4034

    
4035
    if snode:
4036
      self.cfg.SetDiskID(dev, snode)
4037
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4038
    else:
4039
      dev_sstatus = None
4040

    
4041
    if dev.children:
4042
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4043
                      for child in dev.children]
4044
    else:
4045
      dev_children = []
4046

    
4047
    data = {
4048
      "iv_name": dev.iv_name,
4049
      "dev_type": dev.dev_type,
4050
      "logical_id": dev.logical_id,
4051
      "physical_id": dev.physical_id,
4052
      "pstatus": dev_pstatus,
4053
      "sstatus": dev_sstatus,
4054
      "children": dev_children,
4055
      }
4056

    
4057
    return data
4058

    
4059
  def Exec(self, feedback_fn):
4060
    """Gather and return data"""
4061
    result = {}
4062
    for instance in self.wanted_instances:
4063
      remote_info = rpc.call_instance_info(instance.primary_node,
4064
                                                instance.name)
4065
      if remote_info and "state" in remote_info:
4066
        remote_state = "up"
4067
      else:
4068
        remote_state = "down"
4069
      if instance.status == "down":
4070
        config_state = "down"
4071
      else:
4072
        config_state = "up"
4073

    
4074
      disks = [self._ComputeDiskStatus(instance, None, device)
4075
               for device in instance.disks]
4076

    
4077
      idict = {
4078
        "name": instance.name,
4079
        "config_state": config_state,
4080
        "run_state": remote_state,
4081
        "pnode": instance.primary_node,
4082
        "snodes": instance.secondary_nodes,
4083
        "os": instance.os,
4084
        "memory": instance.memory,
4085
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4086
        "disks": disks,
4087
        "vcpus": instance.vcpus,
4088
        }
4089

    
4090
      htkind = self.sstore.GetHypervisorType()
4091
      if htkind == constants.HT_XEN_PVM30:
4092
        idict["kernel_path"] = instance.kernel_path
4093
        idict["initrd_path"] = instance.initrd_path
4094

    
4095
      if htkind == constants.HT_XEN_HVM31:
4096
        idict["hvm_boot_order"] = instance.hvm_boot_order
4097
        idict["hvm_acpi"] = instance.hvm_acpi
4098
        idict["hvm_pae"] = instance.hvm_pae
4099
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4100

    
4101
      if htkind in constants.HTS_REQ_PORT:
4102
        idict["vnc_bind_address"] = instance.vnc_bind_address
4103
        idict["network_port"] = instance.network_port
4104

    
4105
      result[instance.name] = idict
4106

    
4107
    return result
4108

    
4109

    
4110
class LUSetInstanceParams(LogicalUnit):
4111
  """Modifies an instances's parameters.
4112

4113
  """
4114
  HPATH = "instance-modify"
4115
  HTYPE = constants.HTYPE_INSTANCE
4116
  _OP_REQP = ["instance_name"]
4117
  REQ_BGL = False
4118

    
4119
  def ExpandNames(self):
4120
    self._ExpandAndLockInstance()
4121

    
4122
  def BuildHooksEnv(self):
4123
    """Build hooks env.
4124

4125
    This runs on the master, primary and secondaries.
4126

4127
    """
4128
    args = dict()
4129
    if self.mem:
4130
      args['memory'] = self.mem
4131
    if self.vcpus:
4132
      args['vcpus'] = self.vcpus
4133
    if self.do_ip or self.do_bridge or self.mac:
4134
      if self.do_ip:
4135
        ip = self.ip
4136
      else:
4137
        ip = self.instance.nics[0].ip
4138
      if self.bridge:
4139
        bridge = self.bridge
4140
      else:
4141
        bridge = self.instance.nics[0].bridge
4142
      if self.mac:
4143
        mac = self.mac
4144
      else:
4145
        mac = self.instance.nics[0].mac
4146
      args['nics'] = [(ip, bridge, mac)]
4147
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4148
    nl = [self.sstore.GetMasterNode(),
4149
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4150
    return env, nl, nl
4151

    
4152
  def CheckPrereq(self):
4153
    """Check prerequisites.
4154

4155
    This only checks the instance list against the existing names.
4156

4157
    """
4158
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4159
    # a separate CheckArguments function, if we implement one, so the operation
4160
    # can be aborted without waiting for any lock, should it have an error...
4161
    self.mem = getattr(self.op, "mem", None)
4162
    self.vcpus = getattr(self.op, "vcpus", None)
4163
    self.ip = getattr(self.op, "ip", None)
4164
    self.mac = getattr(self.op, "mac", None)
4165
    self.bridge = getattr(self.op, "bridge", None)
4166
    self.kernel_path = getattr(self.op, "kernel_path", None)
4167
    self.initrd_path = getattr(self.op, "initrd_path", None)
4168
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4169
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4170
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4171
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4172
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4173
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4174
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4175
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4176
                 self.vnc_bind_address]
4177
    if all_parms.count(None) == len(all_parms):
4178
      raise errors.OpPrereqError("No changes submitted")
4179
    if self.mem is not None:
4180
      try:
4181
        self.mem = int(self.mem)
4182
      except ValueError, err:
4183
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4184
    if self.vcpus is not None:
4185
      try:
4186
        self.vcpus = int(self.vcpus)
4187
      except ValueError, err:
4188
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4189
    if self.ip is not None:
4190
      self.do_ip = True
4191
      if self.ip.lower() == "none":
4192
        self.ip = None
4193
      else:
4194
        if not utils.IsValidIP(self.ip):
4195
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4196
    else:
4197
      self.do_ip = False
4198
    self.do_bridge = (self.bridge is not None)
4199
    if self.mac is not None:
4200
      if self.cfg.IsMacInUse(self.mac):
4201
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4202
                                   self.mac)
4203
      if not utils.IsValidMac(self.mac):
4204
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4205

    
4206
    if self.kernel_path is not None:
4207
      self.do_kernel_path = True
4208
      if self.kernel_path == constants.VALUE_NONE:
4209
        raise errors.OpPrereqError("Can't set instance to no kernel")
4210

    
4211
      if self.kernel_path != constants.VALUE_DEFAULT:
4212
        if not os.path.isabs(self.kernel_path):
4213
          raise errors.OpPrereqError("The kernel path must be an absolute"
4214
                                    " filename")
4215
    else:
4216
      self.do_kernel_path = False
4217

    
4218
    if self.initrd_path is not None:
4219
      self.do_initrd_path = True
4220
      if self.initrd_path not in (constants.VALUE_NONE,
4221
                                  constants.VALUE_DEFAULT):
4222
        if not os.path.isabs(self.initrd_path):
4223
          raise errors.OpPrereqError("The initrd path must be an absolute"
4224
                                    " filename")
4225
    else:
4226
      self.do_initrd_path = False
4227

    
4228
    # boot order verification
4229
    if self.hvm_boot_order is not None:
4230
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4231
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4232
          raise errors.OpPrereqError("invalid boot order specified,"
4233
                                     " must be one or more of [acdn]"
4234
                                     " or 'default'")
4235

    
4236
    # hvm_cdrom_image_path verification
4237
    if self.op.hvm_cdrom_image_path is not None:
4238
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4239
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4240
                                   " be an absolute path or None, not %s" %
4241
                                   self.op.hvm_cdrom_image_path)
4242
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4243
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4244
                                   " regular file or a symlink pointing to"
4245
                                   " an existing regular file, not %s" %
4246
                                   self.op.hvm_cdrom_image_path)
4247

    
4248
    # vnc_bind_address verification
4249
    if self.op.vnc_bind_address is not None:
4250
      if not utils.IsValidIP(self.op.vnc_bind_address):
4251
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4252
                                   " like a valid IP address" %
4253
                                   self.op.vnc_bind_address)
4254

    
4255
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4256
    assert self.instance is not None, \
4257
      "Cannot retrieve locked instance %s" % self.op.instance_name
4258
    return
4259

    
4260
  def Exec(self, feedback_fn):
4261
    """Modifies an instance.
4262

4263
    All parameters take effect only at the next restart of the instance.
4264
    """
4265
    result = []
4266
    instance = self.instance
4267
    if self.mem:
4268
      instance.memory = self.mem
4269
      result.append(("mem", self.mem))
4270
    if self.vcpus:
4271
      instance.vcpus = self.vcpus
4272
      result.append(("vcpus",  self.vcpus))
4273
    if self.do_ip:
4274
      instance.nics[0].ip = self.ip
4275
      result.append(("ip", self.ip))
4276
    if self.bridge:
4277
      instance.nics[0].bridge = self.bridge
4278
      result.append(("bridge", self.bridge))
4279
    if self.mac:
4280
      instance.nics[0].mac = self.mac
4281
      result.append(("mac", self.mac))
4282
    if self.do_kernel_path:
4283
      instance.kernel_path = self.kernel_path
4284
      result.append(("kernel_path", self.kernel_path))
4285
    if self.do_initrd_path:
4286
      instance.initrd_path = self.initrd_path
4287
      result.append(("initrd_path", self.initrd_path))
4288
    if self.hvm_boot_order:
4289
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4290
        instance.hvm_boot_order = None
4291
      else:
4292
        instance.hvm_boot_order = self.hvm_boot_order
4293
      result.append(("hvm_boot_order", self.hvm_boot_order))
4294
    if self.hvm_acpi:
4295
      instance.hvm_acpi = self.hvm_acpi
4296
      result.append(("hvm_acpi", self.hvm_acpi))
4297
    if self.hvm_pae:
4298
      instance.hvm_pae = self.hvm_pae
4299
      result.append(("hvm_pae", self.hvm_pae))
4300
    if self.hvm_cdrom_image_path:
4301
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4302
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4303
    if self.vnc_bind_address:
4304
      instance.vnc_bind_address = self.vnc_bind_address
4305
      result.append(("vnc_bind_address", self.vnc_bind_address))
4306

    
4307
    self.cfg.Update(instance)
4308

    
4309
    return result
4310

    
4311

    
4312
class LUQueryExports(NoHooksLU):
4313
  """Query the exports list
4314

4315
  """
4316
  _OP_REQP = []
4317

    
4318
  def CheckPrereq(self):
4319
    """Check that the nodelist contains only existing nodes.
4320

4321
    """
4322
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4323

    
4324
  def Exec(self, feedback_fn):
4325
    """Compute the list of all the exported system images.
4326

4327
    Returns:
4328
      a dictionary with the structure node->(export-list)
4329
      where export-list is a list of the instances exported on
4330
      that node.
4331

4332
    """
4333
    return rpc.call_export_list(self.nodes)
4334

    
4335

    
4336
class LUExportInstance(LogicalUnit):
4337
  """Export an instance to an image in the cluster.
4338

4339
  """
4340
  HPATH = "instance-export"
4341
  HTYPE = constants.HTYPE_INSTANCE
4342
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4343

    
4344
  def BuildHooksEnv(self):
4345
    """Build hooks env.
4346

4347
    This will run on the master, primary node and target node.
4348

4349
    """
4350
    env = {
4351
      "EXPORT_NODE": self.op.target_node,
4352
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4353
      }
4354
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4355
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4356
          self.op.target_node]
4357
    return env, nl, nl
4358

    
4359
  def CheckPrereq(self):
4360
    """Check prerequisites.
4361

4362
    This checks that the instance and node names are valid.
4363

4364
    """
4365
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4366
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4367
    if self.instance is None:
4368
      raise errors.OpPrereqError("Instance '%s' not found" %
4369
                                 self.op.instance_name)
4370

    
4371
    # node verification
4372
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4373
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4374

    
4375
    if self.dst_node is None:
4376
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4377
                                 self.op.target_node)
4378
    self.op.target_node = self.dst_node.name
4379

    
4380
    # instance disk type verification
4381
    for disk in self.instance.disks:
4382
      if disk.dev_type == constants.LD_FILE:
4383
        raise errors.OpPrereqError("Export not supported for instances with"
4384
                                   " file-based disks")
4385

    
4386
  def Exec(self, feedback_fn):
4387
    """Export an instance to an image in the cluster.
4388

4389
    """
4390
    instance = self.instance
4391
    dst_node = self.dst_node
4392
    src_node = instance.primary_node
4393
    if self.op.shutdown:
4394
      # shutdown the instance, but not the disks
4395
      if not rpc.call_instance_shutdown(src_node, instance):
4396
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4397
                                 (instance.name, src_node))
4398

    
4399
    vgname = self.cfg.GetVGName()
4400

    
4401
    snap_disks = []
4402

    
4403
    try:
4404
      for disk in instance.disks:
4405
        if disk.iv_name == "sda":
4406
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4407
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4408

    
4409
          if not new_dev_name:
4410
            logger.Error("could not snapshot block device %s on node %s" %
4411
                         (disk.logical_id[1], src_node))
4412
          else:
4413
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4414
                                      logical_id=(vgname, new_dev_name),
4415
                                      physical_id=(vgname, new_dev_name),
4416
                                      iv_name=disk.iv_name)
4417
            snap_disks.append(new_dev)
4418

    
4419
    finally:
4420
      if self.op.shutdown and instance.status == "up":
4421
        if not rpc.call_instance_start(src_node, instance, None):
4422
          _ShutdownInstanceDisks(instance, self.cfg)
4423
          raise errors.OpExecError("Could not start instance")
4424

    
4425
    # TODO: check for size
4426

    
4427
    for dev in snap_disks:
4428
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4429
        logger.Error("could not export block device %s from node %s to node %s"
4430
                     % (dev.logical_id[1], src_node, dst_node.name))
4431
      if not rpc.call_blockdev_remove(src_node, dev):
4432
        logger.Error("could not remove snapshot block device %s from node %s" %
4433
                     (dev.logical_id[1], src_node))
4434

    
4435
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4436
      logger.Error("could not finalize export for instance %s on node %s" %
4437
                   (instance.name, dst_node.name))
4438

    
4439
    nodelist = self.cfg.GetNodeList()
4440
    nodelist.remove(dst_node.name)
4441

    
4442
    # on one-node clusters nodelist will be empty after the removal
4443
    # if we proceed the backup would be removed because OpQueryExports
4444
    # substitutes an empty list with the full cluster node list.
4445
    if nodelist:
4446
      op = opcodes.OpQueryExports(nodes=nodelist)
4447
      exportlist = self.proc.ChainOpCode(op)
4448
      for node in exportlist:
4449
        if instance.name in exportlist[node]:
4450
          if not rpc.call_export_remove(node, instance.name):
4451
            logger.Error("could not remove older export for instance %s"
4452
                         " on node %s" % (instance.name, node))
4453

    
4454

    
4455
class LURemoveExport(NoHooksLU):
4456
  """Remove exports related to the named instance.
4457

4458
  """
4459
  _OP_REQP = ["instance_name"]
4460

    
4461
  def CheckPrereq(self):
4462
    """Check prerequisites.
4463
    """
4464
    pass
4465

    
4466
  def Exec(self, feedback_fn):
4467
    """Remove any export.
4468

4469
    """
4470
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4471
    # If the instance was not found we'll try with the name that was passed in.
4472
    # This will only work if it was an FQDN, though.
4473
    fqdn_warn = False
4474
    if not instance_name:
4475
      fqdn_warn = True
4476
      instance_name = self.op.instance_name
4477

    
4478
    op = opcodes.OpQueryExports(nodes=[])
4479
    exportlist = self.proc.ChainOpCode(op)
4480
    found = False
4481
    for node in exportlist:
4482
      if instance_name in exportlist[node]:
4483
        found = True
4484
        if not rpc.call_export_remove(node, instance_name):
4485
          logger.Error("could not remove export for instance %s"
4486
                       " on node %s" % (instance_name, node))
4487

    
4488
    if fqdn_warn and not found:
4489
      feedback_fn("Export not found. If trying to remove an export belonging"
4490
                  " to a deleted instance please use its Fully Qualified"
4491
                  " Domain Name.")
4492

    
4493

    
4494
class TagsLU(NoHooksLU):
4495
  """Generic tags LU.
4496

4497
  This is an abstract class which is the parent of all the other tags LUs.
4498

4499
  """
4500
  def CheckPrereq(self):
4501
    """Check prerequisites.
4502

4503
    """
4504
    if self.op.kind == constants.TAG_CLUSTER:
4505
      self.target = self.cfg.GetClusterInfo()
4506
    elif self.op.kind == constants.TAG_NODE:
4507
      name = self.cfg.ExpandNodeName(self.op.name)
4508
      if name is None:
4509
        raise errors.OpPrereqError("Invalid node name (%s)" %
4510
                                   (self.op.name,))
4511
      self.op.name = name
4512
      self.target = self.cfg.GetNodeInfo(name)
4513
    elif self.op.kind == constants.TAG_INSTANCE:
4514
      name = self.cfg.ExpandInstanceName(self.op.name)
4515
      if name is None:
4516
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4517
                                   (self.op.name,))
4518
      self.op.name = name
4519
      self.target = self.cfg.GetInstanceInfo(name)
4520
    else:
4521
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4522
                                 str(self.op.kind))
4523

    
4524

    
4525
class LUGetTags(TagsLU):
4526
  """Returns the tags of a given object.
4527

4528
  """
4529
  _OP_REQP = ["kind", "name"]
4530

    
4531
  def Exec(self, feedback_fn):
4532
    """Returns the tag list.
4533

4534
    """
4535
    return list(self.target.GetTags())
4536

    
4537

    
4538
class LUSearchTags(NoHooksLU):
4539
  """Searches the tags for a given pattern.
4540

4541
  """
4542
  _OP_REQP = ["pattern"]
4543

    
4544
  def CheckPrereq(self):
4545
    """Check prerequisites.
4546

4547
    This checks the pattern passed for validity by compiling it.
4548

4549
    """
4550
    try:
4551
      self.re = re.compile(self.op.pattern)
4552
    except re.error, err:
4553
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4554
                                 (self.op.pattern, err))
4555

    
4556
  def Exec(self, feedback_fn):
4557
    """Returns the tag list.
4558

4559
    """
4560
    cfg = self.cfg
4561
    tgts = [("/cluster", cfg.GetClusterInfo())]
4562
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4563
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4564
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4565
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4566
    results = []
4567
    for path, target in tgts:
4568
      for tag in target.GetTags():
4569
        if self.re.search(tag):
4570
          results.append((path, tag))
4571
    return results
4572

    
4573

    
4574
class LUAddTags(TagsLU):
4575
  """Sets a tag on a given object.
4576

4577
  """
4578
  _OP_REQP = ["kind", "name", "tags"]
4579

    
4580
  def CheckPrereq(self):
4581
    """Check prerequisites.
4582

4583
    This checks the type and length of the tag name and value.
4584

4585
    """
4586
    TagsLU.CheckPrereq(self)
4587
    for tag in self.op.tags:
4588
      objects.TaggableObject.ValidateTag(tag)
4589

    
4590
  def Exec(self, feedback_fn):
4591
    """Sets the tag.
4592

4593
    """
4594
    try:
4595
      for tag in self.op.tags:
4596
        self.target.AddTag(tag)
4597
    except errors.TagError, err:
4598
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4599
    try:
4600
      self.cfg.Update(self.target)
4601
    except errors.ConfigurationError:
4602
      raise errors.OpRetryError("There has been a modification to the"
4603
                                " config file and the operation has been"
4604
                                " aborted. Please retry.")
4605

    
4606

    
4607
class LUDelTags(TagsLU):
4608
  """Delete a list of tags from a given object.
4609

4610
  """
4611
  _OP_REQP = ["kind", "name", "tags"]
4612

    
4613
  def CheckPrereq(self):
4614
    """Check prerequisites.
4615

4616
    This checks that we have the given tag.
4617

4618
    """
4619
    TagsLU.CheckPrereq(self)
4620
    for tag in self.op.tags:
4621
      objects.TaggableObject.ValidateTag(tag)
4622
    del_tags = frozenset(self.op.tags)
4623
    cur_tags = self.target.GetTags()
4624
    if not del_tags <= cur_tags:
4625
      diff_tags = del_tags - cur_tags
4626
      diff_names = ["'%s'" % tag for tag in diff_tags]
4627
      diff_names.sort()
4628
      raise errors.OpPrereqError("Tag(s) %s not found" %
4629
                                 (",".join(diff_names)))
4630

    
4631
  def Exec(self, feedback_fn):
4632
    """Remove the tag from the object.
4633

4634
    """
4635
    for tag in self.op.tags:
4636
      self.target.RemoveTag(tag)
4637
    try:
4638
      self.cfg.Update(self.target)
4639
    except errors.ConfigurationError:
4640
      raise errors.OpRetryError("There has been a modification to the"
4641
                                " config file and the operation has been"
4642
                                " aborted. Please retry.")
4643

    
4644

    
4645
class LUTestDelay(NoHooksLU):
4646
  """Sleep for a specified amount of time.
4647

4648
  This LU sleeps on the master and/or nodes for a specified amount of
4649
  time.
4650

4651
  """
4652
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4653
  REQ_BGL = False
4654

    
4655
  def ExpandNames(self):
4656
    """Expand names and set required locks.
4657

4658
    This expands the node list, if any.
4659

4660
    """
4661
    self.needed_locks = {}
4662
    if self.op.on_nodes:
4663
      # _GetWantedNodes can be used here, but is not always appropriate to use
4664
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4665
      # more information.
4666
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4667
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4668

    
4669
  def CheckPrereq(self):
4670
    """Check prerequisites.
4671

4672
    """
4673

    
4674
  def Exec(self, feedback_fn):
4675
    """Do the actual sleep.
4676

4677
    """
4678
    if self.op.on_master:
4679
      if not utils.TestDelay(self.op.duration):
4680
        raise errors.OpExecError("Error during master delay test")
4681
    if self.op.on_nodes:
4682
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4683
      if not result:
4684
        raise errors.OpExecError("Complete failure from rpc call")
4685
      for node, node_result in result.items():
4686
        if not node_result:
4687
          raise errors.OpExecError("Failure during rpc call to node %s,"
4688
                                   " result: %s" % (node, node_result))
4689

    
4690

    
4691
class IAllocator(object):
4692
  """IAllocator framework.
4693

4694
  An IAllocator instance has three sets of attributes:
4695
    - cfg/sstore that are needed to query the cluster
4696
    - input data (all members of the _KEYS class attribute are required)
4697
    - four buffer attributes (in|out_data|text), that represent the
4698
      input (to the external script) in text and data structure format,
4699
      and the output from it, again in two formats
4700
    - the result variables from the script (success, info, nodes) for
4701
      easy usage
4702

4703
  """
4704
  _ALLO_KEYS = [
4705
    "mem_size", "disks", "disk_template",
4706
    "os", "tags", "nics", "vcpus",
4707
    ]
4708
  _RELO_KEYS = [
4709
    "relocate_from",
4710
    ]
4711

    
4712
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4713
    self.cfg = cfg
4714
    self.sstore = sstore
4715
    # init buffer variables
4716
    self.in_text = self.out_text = self.in_data = self.out_data = None
4717
    # init all input fields so that pylint is happy
4718
    self.mode = mode
4719
    self.name = name
4720
    self.mem_size = self.disks = self.disk_template = None
4721
    self.os = self.tags = self.nics = self.vcpus = None
4722
    self.relocate_from = None
4723
    # computed fields
4724
    self.required_nodes = None
4725
    # init result fields
4726
    self.success = self.info = self.nodes = None
4727
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4728
      keyset = self._ALLO_KEYS
4729
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4730
      keyset = self._RELO_KEYS
4731
    else:
4732
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4733
                                   " IAllocator" % self.mode)
4734
    for key in kwargs:
4735
      if key not in keyset:
4736
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4737
                                     " IAllocator" % key)
4738
      setattr(self, key, kwargs[key])
4739
    for key in keyset:
4740
      if key not in kwargs:
4741
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4742
                                     " IAllocator" % key)
4743
    self._BuildInputData()
4744

    
4745
  def _ComputeClusterData(self):
4746
    """Compute the generic allocator input data.
4747

4748
    This is the data that is independent of the actual operation.
4749

4750
    """
4751
    cfg = self.cfg
4752
    # cluster data
4753
    data = {
4754
      "version": 1,
4755
      "cluster_name": self.sstore.GetClusterName(),
4756
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4757
      "hypervisor_type": self.sstore.GetHypervisorType(),
4758
      # we don't have job IDs
4759
      }
4760

    
4761
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4762

    
4763
    # node data
4764
    node_results = {}
4765
    node_list = cfg.GetNodeList()
4766
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4767
    for nname in node_list:
4768
      ninfo = cfg.GetNodeInfo(nname)
4769
      if nname not in node_data or not isinstance(node_data[nname], dict):
4770
        raise errors.OpExecError("Can't get data for node %s" % nname)
4771
      remote_info = node_data[nname]
4772
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4773
                   'vg_size', 'vg_free', 'cpu_total']:
4774
        if attr not in remote_info:
4775
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4776
                                   (nname, attr))
4777
        try:
4778
          remote_info[attr] = int(remote_info[attr])
4779
        except ValueError, err:
4780
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4781
                                   " %s" % (nname, attr, str(err)))
4782
      # compute memory used by primary instances
4783
      i_p_mem = i_p_up_mem = 0
4784
      for iinfo in i_list:
4785
        if iinfo.primary_node == nname:
4786
          i_p_mem += iinfo.memory
4787
          if iinfo.status == "up":
4788
            i_p_up_mem += iinfo.memory
4789

    
4790
      # compute memory used by instances
4791
      pnr = {
4792
        "tags": list(ninfo.GetTags()),
4793
        "total_memory": remote_info['memory_total'],
4794
        "reserved_memory": remote_info['memory_dom0'],
4795
        "free_memory": remote_info['memory_free'],
4796
        "i_pri_memory": i_p_mem,
4797
        "i_pri_up_memory": i_p_up_mem,
4798
        "total_disk": remote_info['vg_size'],
4799
        "free_disk": remote_info['vg_free'],
4800
        "primary_ip": ninfo.primary_ip,
4801
        "secondary_ip": ninfo.secondary_ip,
4802
        "total_cpus": remote_info['cpu_total'],
4803
        }
4804
      node_results[nname] = pnr
4805
    data["nodes"] = node_results
4806

    
4807
    # instance data
4808
    instance_data = {}
4809
    for iinfo in i_list:
4810
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4811
                  for n in iinfo.nics]
4812
      pir = {
4813
        "tags": list(iinfo.GetTags()),
4814
        "should_run": iinfo.status == "up",
4815
        "vcpus": iinfo.vcpus,
4816
        "memory": iinfo.memory,
4817
        "os": iinfo.os,
4818
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4819
        "nics": nic_data,
4820
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4821
        "disk_template": iinfo.disk_template,
4822
        }
4823
      instance_data[iinfo.name] = pir
4824

    
4825
    data["instances"] = instance_data
4826

    
4827
    self.in_data = data
4828

    
4829
  def _AddNewInstance(self):
4830
    """Add new instance data to allocator structure.
4831

4832
    This in combination with _AllocatorGetClusterData will create the
4833
    correct structure needed as input for the allocator.
4834

4835
    The checks for the completeness of the opcode must have already been
4836
    done.
4837

4838
    """
4839
    data = self.in_data
4840
    if len(self.disks) != 2:
4841
      raise errors.OpExecError("Only two-disk configurations supported")
4842

    
4843
    disk_space = _ComputeDiskSize(self.disk_template,
4844
                                  self.disks[0]["size"], self.disks[1]["size"])
4845

    
4846
    if self.disk_template in constants.DTS_NET_MIRROR:
4847
      self.required_nodes = 2
4848
    else:
4849
      self.required_nodes = 1
4850
    request = {
4851
      "type": "allocate",
4852
      "name": self.name,
4853
      "disk_template": self.disk_template,
4854
      "tags": self.tags,
4855
      "os": self.os,
4856
      "vcpus": self.vcpus,
4857
      "memory": self.mem_size,
4858
      "disks": self.disks,
4859
      "disk_space_total": disk_space,
4860
      "nics": self.nics,
4861
      "required_nodes": self.required_nodes,
4862
      }
4863
    data["request"] = request
4864

    
4865
  def _AddRelocateInstance(self):
4866
    """Add relocate instance data to allocator structure.
4867

4868
    This in combination with _IAllocatorGetClusterData will create the
4869
    correct structure needed as input for the allocator.
4870

4871
    The checks for the completeness of the opcode must have already been
4872
    done.
4873

4874
    """
4875
    instance = self.cfg.GetInstanceInfo(self.name)
4876
    if instance is None:
4877
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4878
                                   " IAllocator" % self.name)
4879

    
4880
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4881
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4882

    
4883
    if len(instance.secondary_nodes) != 1:
4884
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4885

    
4886
    self.required_nodes = 1
4887

    
4888
    disk_space = _ComputeDiskSize(instance.disk_template,
4889
                                  instance.disks[0].size,
4890
                                  instance.disks[1].size)
4891

    
4892
    request = {
4893
      "type": "relocate",
4894
      "name": self.name,
4895
      "disk_space_total": disk_space,
4896
      "required_nodes": self.required_nodes,
4897
      "relocate_from": self.relocate_from,
4898
      }
4899
    self.in_data["request"] = request
4900

    
4901
  def _BuildInputData(self):
4902
    """Build input data structures.
4903

4904
    """
4905
    self._ComputeClusterData()
4906

    
4907
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4908
      self._AddNewInstance()
4909
    else:
4910
      self._AddRelocateInstance()
4911

    
4912
    self.in_text = serializer.Dump(self.in_data)
4913

    
4914
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4915
    """Run an instance allocator and return the results.
4916

4917
    """
4918
    data = self.in_text
4919

    
4920
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4921

    
4922
    if not isinstance(result, tuple) or len(result) != 4:
4923
      raise errors.OpExecError("Invalid result from master iallocator runner")
4924

    
4925
    rcode, stdout, stderr, fail = result
4926

    
4927
    if rcode == constants.IARUN_NOTFOUND:
4928
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4929
    elif rcode == constants.IARUN_FAILURE:
4930
      raise errors.OpExecError("Instance allocator call failed: %s,"
4931
                               " output: %s" % (fail, stdout+stderr))
4932
    self.out_text = stdout
4933
    if validate:
4934
      self._ValidateResult()
4935

    
4936
  def _ValidateResult(self):
4937
    """Process the allocator results.
4938

4939
    This will process and if successful save the result in
4940
    self.out_data and the other parameters.
4941

4942
    """
4943
    try:
4944
      rdict = serializer.Load(self.out_text)
4945
    except Exception, err:
4946
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4947

    
4948
    if not isinstance(rdict, dict):
4949
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4950

    
4951
    for key in "success", "info", "nodes":
4952
      if key not in rdict:
4953
        raise errors.OpExecError("Can't parse iallocator results:"
4954
                                 " missing key '%s'" % key)
4955
      setattr(self, key, rdict[key])
4956

    
4957
    if not isinstance(rdict["nodes"], list):
4958
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4959
                               " is not a list")
4960
    self.out_data = rdict
4961

    
4962

    
4963
class LUTestAllocator(NoHooksLU):
4964
  """Run allocator tests.
4965

4966
  This LU runs the allocator tests
4967

4968
  """
4969
  _OP_REQP = ["direction", "mode", "name"]
4970

    
4971
  def CheckPrereq(self):
4972
    """Check prerequisites.
4973

4974
    This checks the opcode parameters depending on the director and mode test.
4975

4976
    """
4977
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4978
      for attr in ["name", "mem_size", "disks", "disk_template",
4979
                   "os", "tags", "nics", "vcpus"]:
4980
        if not hasattr(self.op, attr):
4981
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4982
                                     attr)
4983
      iname = self.cfg.ExpandInstanceName(self.op.name)
4984
      if iname is not None:
4985
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4986
                                   iname)
4987
      if not isinstance(self.op.nics, list):
4988
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4989
      for row in self.op.nics:
4990
        if (not isinstance(row, dict) or
4991
            "mac" not in row or
4992
            "ip" not in row or
4993
            "bridge" not in row):
4994
          raise errors.OpPrereqError("Invalid contents of the"
4995
                                     " 'nics' parameter")
4996
      if not isinstance(self.op.disks, list):
4997
        raise errors.OpPrereqError("Invalid parameter 'disks'")
4998
      if len(self.op.disks) != 2:
4999
        raise errors.OpPrereqError("Only two-disk configurations supported")
5000
      for row in self.op.disks:
5001
        if (not isinstance(row, dict) or
5002
            "size" not in row or
5003
            not isinstance(row["size"], int) or
5004
            "mode" not in row or
5005
            row["mode"] not in ['r', 'w']):
5006
          raise errors.OpPrereqError("Invalid contents of the"
5007
                                     " 'disks' parameter")
5008
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5009
      if not hasattr(self.op, "name"):
5010
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5011
      fname = self.cfg.ExpandInstanceName(self.op.name)
5012
      if fname is None:
5013
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5014
                                   self.op.name)
5015
      self.op.name = fname
5016
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5017
    else:
5018
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5019
                                 self.op.mode)
5020

    
5021
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5022
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5023
        raise errors.OpPrereqError("Missing allocator name")
5024
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5025
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5026
                                 self.op.direction)
5027

    
5028
  def Exec(self, feedback_fn):
5029
    """Run the allocator test.
5030

5031
    """
5032
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5033
      ial = IAllocator(self.cfg, self.sstore,
5034
                       mode=self.op.mode,
5035
                       name=self.op.name,
5036
                       mem_size=self.op.mem_size,
5037
                       disks=self.op.disks,
5038
                       disk_template=self.op.disk_template,
5039
                       os=self.op.os,
5040
                       tags=self.op.tags,
5041
                       nics=self.op.nics,
5042
                       vcpus=self.op.vcpus,
5043
                       )
5044
    else:
5045
      ial = IAllocator(self.cfg, self.sstore,
5046
                       mode=self.op.mode,
5047
                       name=self.op.name,
5048
                       relocate_from=list(self.relocate_from),
5049
                       )
5050

    
5051
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5052
      result = ial.in_text
5053
    else:
5054
      ial.Run(self.op.allocator, validate=False)
5055
      result = ial.out_text
5056
    return result