Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 64381ad7

History | View | Annotate | Download (172.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    # Used to force good behavior when calling helper functions
86
    self.recalculate_locks = {}
87
    self.__ssh = None
88

    
89
    for attr_name in self._OP_REQP:
90
      attr_val = getattr(op, attr_name, None)
91
      if attr_val is None:
92
        raise errors.OpPrereqError("Required parameter '%s' missing" %
93
                                   attr_name)
94

    
95
    if not self.cfg.IsCluster():
96
      raise errors.OpPrereqError("Cluster not initialized yet,"
97
                                 " use 'gnt-cluster init' first.")
98
    if self.REQ_MASTER:
99
      master = sstore.GetMasterNode()
100
      if master != utils.HostInfo().name:
101
        raise errors.OpPrereqError("Commands must be run on the master"
102
                                   " node %s" % master)
103

    
104
  def __GetSSH(self):
105
    """Returns the SshRunner object
106

107
    """
108
    if not self.__ssh:
109
      self.__ssh = ssh.SshRunner(self.sstore)
110
    return self.__ssh
111

    
112
  ssh = property(fget=__GetSSH)
113

    
114
  def ExpandNames(self):
115
    """Expand names for this LU.
116

117
    This method is called before starting to execute the opcode, and it should
118
    update all the parameters of the opcode to their canonical form (e.g. a
119
    short node name must be fully expanded after this method has successfully
120
    completed). This way locking, hooks, logging, ecc. can work correctly.
121

122
    LUs which implement this method must also populate the self.needed_locks
123
    member, as a dict with lock levels as keys, and a list of needed lock names
124
    as values. Rules:
125
      - Use an empty dict if you don't need any lock
126
      - If you don't need any lock at a particular level omit that level
127
      - Don't put anything for the BGL level
128
      - If you want all locks at a level use None as a value
129
        (this reflects what LockSet does, and will be replaced before
130
        CheckPrereq with the full list of nodes that have been locked)
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: None,
140
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265
  def _LockInstancesNodes(self):
266
    """Helper function to declare instances' nodes for locking.
267

268
    This function should be called after locking one or more instances to lock
269
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270
    with all primary or secondary nodes for instances already locked and
271
    present in self.needed_locks[locking.LEVEL_INSTANCE].
272

273
    It should be called from DeclareLocks, and for safety only works if
274
    self.recalculate_locks[locking.LEVEL_NODE] is set.
275

276
    In the future it may grow parameters to just lock some instance's nodes, or
277
    to just lock primaries or secondary nodes, if needed.
278

279
    If should be called in DeclareLocks in a way similar to:
280

281
    if level == locking.LEVEL_NODE:
282
      self._LockInstancesNodes()
283

284
    """
285
    assert locking.LEVEL_NODE in self.recalculate_locks, \
286
      "_LockInstancesNodes helper function called with no nodes to recalculate"
287

    
288
    # TODO: check if we're really been called with the instance locks held
289

    
290
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
291
    # future we might want to have different behaviors depending on the value
292
    # of self.recalculate_locks[locking.LEVEL_NODE]
293
    wanted_nodes = []
294
    for instance_name in self.needed_locks[locking.LEVEL_INSTANCE]:
295
      instance = self.context.cfg.GetInstanceInfo(instance_name)
296
      wanted_nodes.append(instance.primary_node)
297
      wanted_nodes.extend(instance.secondary_nodes)
298
    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
299

    
300
    del self.recalculate_locks[locking.LEVEL_NODE]
301

    
302

    
303
class NoHooksLU(LogicalUnit):
304
  """Simple LU which runs no hooks.
305

306
  This LU is intended as a parent for other LogicalUnits which will
307
  run no hooks, in order to reduce duplicate code.
308

309
  """
310
  HPATH = None
311
  HTYPE = None
312

    
313

    
314
def _GetWantedNodes(lu, nodes):
315
  """Returns list of checked and expanded node names.
316

317
  Args:
318
    nodes: List of nodes (strings) or None for all
319

320
  """
321
  if not isinstance(nodes, list):
322
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
323

    
324
  if nodes:
325
    wanted = []
326

    
327
    for name in nodes:
328
      node = lu.cfg.ExpandNodeName(name)
329
      if node is None:
330
        raise errors.OpPrereqError("No such node name '%s'" % name)
331
      wanted.append(node)
332

    
333
  else:
334
    wanted = lu.cfg.GetNodeList()
335
  return utils.NiceSort(wanted)
336

    
337

    
338
def _GetWantedInstances(lu, instances):
339
  """Returns list of checked and expanded instance names.
340

341
  Args:
342
    instances: List of instances (strings) or None for all
343

344
  """
345
  if not isinstance(instances, list):
346
    raise errors.OpPrereqError("Invalid argument type 'instances'")
347

    
348
  if instances:
349
    wanted = []
350

    
351
    for name in instances:
352
      instance = lu.cfg.ExpandInstanceName(name)
353
      if instance is None:
354
        raise errors.OpPrereqError("No such instance name '%s'" % name)
355
      wanted.append(instance)
356

    
357
  else:
358
    wanted = lu.cfg.GetInstanceList()
359
  return utils.NiceSort(wanted)
360

    
361

    
362
def _CheckOutputFields(static, dynamic, selected):
363
  """Checks whether all selected fields are valid.
364

365
  Args:
366
    static: Static fields
367
    dynamic: Dynamic fields
368

369
  """
370
  static_fields = frozenset(static)
371
  dynamic_fields = frozenset(dynamic)
372

    
373
  all_fields = static_fields | dynamic_fields
374

    
375
  if not all_fields.issuperset(selected):
376
    raise errors.OpPrereqError("Unknown output fields selected: %s"
377
                               % ",".join(frozenset(selected).
378
                                          difference(all_fields)))
379

    
380

    
381
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
382
                          memory, vcpus, nics):
383
  """Builds instance related env variables for hooks from single variables.
384

385
  Args:
386
    secondary_nodes: List of secondary nodes as strings
387
  """
388
  env = {
389
    "OP_TARGET": name,
390
    "INSTANCE_NAME": name,
391
    "INSTANCE_PRIMARY": primary_node,
392
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
393
    "INSTANCE_OS_TYPE": os_type,
394
    "INSTANCE_STATUS": status,
395
    "INSTANCE_MEMORY": memory,
396
    "INSTANCE_VCPUS": vcpus,
397
  }
398

    
399
  if nics:
400
    nic_count = len(nics)
401
    for idx, (ip, bridge, mac) in enumerate(nics):
402
      if ip is None:
403
        ip = ""
404
      env["INSTANCE_NIC%d_IP" % idx] = ip
405
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
406
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
407
  else:
408
    nic_count = 0
409

    
410
  env["INSTANCE_NIC_COUNT"] = nic_count
411

    
412
  return env
413

    
414

    
415
def _BuildInstanceHookEnvByObject(instance, override=None):
416
  """Builds instance related env variables for hooks from an object.
417

418
  Args:
419
    instance: objects.Instance object of instance
420
    override: dict of values to override
421
  """
422
  args = {
423
    'name': instance.name,
424
    'primary_node': instance.primary_node,
425
    'secondary_nodes': instance.secondary_nodes,
426
    'os_type': instance.os,
427
    'status': instance.os,
428
    'memory': instance.memory,
429
    'vcpus': instance.vcpus,
430
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
431
  }
432
  if override:
433
    args.update(override)
434
  return _BuildInstanceHookEnv(**args)
435

    
436

    
437
def _CheckInstanceBridgesExist(instance):
438
  """Check that the brigdes needed by an instance exist.
439

440
  """
441
  # check bridges existance
442
  brlist = [nic.bridge for nic in instance.nics]
443
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
444
    raise errors.OpPrereqError("one or more target bridges %s does not"
445
                               " exist on destination node '%s'" %
446
                               (brlist, instance.primary_node))
447

    
448

    
449
class LUDestroyCluster(NoHooksLU):
450
  """Logical unit for destroying the cluster.
451

452
  """
453
  _OP_REQP = []
454

    
455
  def CheckPrereq(self):
456
    """Check prerequisites.
457

458
    This checks whether the cluster is empty.
459

460
    Any errors are signalled by raising errors.OpPrereqError.
461

462
    """
463
    master = self.sstore.GetMasterNode()
464

    
465
    nodelist = self.cfg.GetNodeList()
466
    if len(nodelist) != 1 or nodelist[0] != master:
467
      raise errors.OpPrereqError("There are still %d node(s) in"
468
                                 " this cluster." % (len(nodelist) - 1))
469
    instancelist = self.cfg.GetInstanceList()
470
    if instancelist:
471
      raise errors.OpPrereqError("There are still %d instance(s) in"
472
                                 " this cluster." % len(instancelist))
473

    
474
  def Exec(self, feedback_fn):
475
    """Destroys the cluster.
476

477
    """
478
    master = self.sstore.GetMasterNode()
479
    if not rpc.call_node_stop_master(master, False):
480
      raise errors.OpExecError("Could not disable the master role")
481
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
482
    utils.CreateBackup(priv_key)
483
    utils.CreateBackup(pub_key)
484
    rpc.call_node_leave_cluster(master)
485

    
486

    
487
class LUVerifyCluster(LogicalUnit):
488
  """Verifies the cluster status.
489

490
  """
491
  HPATH = "cluster-verify"
492
  HTYPE = constants.HTYPE_CLUSTER
493
  _OP_REQP = ["skip_checks"]
494

    
495
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
496
                  remote_version, feedback_fn):
497
    """Run multiple tests against a node.
498

499
    Test list:
500
      - compares ganeti version
501
      - checks vg existance and size > 20G
502
      - checks config file checksum
503
      - checks ssh to other nodes
504

505
    Args:
506
      node: name of the node to check
507
      file_list: required list of files
508
      local_cksum: dictionary of local files and their checksums
509

510
    """
511
    # compares ganeti version
512
    local_version = constants.PROTOCOL_VERSION
513
    if not remote_version:
514
      feedback_fn("  - ERROR: connection to %s failed" % (node))
515
      return True
516

    
517
    if local_version != remote_version:
518
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
519
                      (local_version, node, remote_version))
520
      return True
521

    
522
    # checks vg existance and size > 20G
523

    
524
    bad = False
525
    if not vglist:
526
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
527
                      (node,))
528
      bad = True
529
    else:
530
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
531
                                            constants.MIN_VG_SIZE)
532
      if vgstatus:
533
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
534
        bad = True
535

    
536
    # checks config file checksum
537
    # checks ssh to any
538

    
539
    if 'filelist' not in node_result:
540
      bad = True
541
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
542
    else:
543
      remote_cksum = node_result['filelist']
544
      for file_name in file_list:
545
        if file_name not in remote_cksum:
546
          bad = True
547
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
548
        elif remote_cksum[file_name] != local_cksum[file_name]:
549
          bad = True
550
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
551

    
552
    if 'nodelist' not in node_result:
553
      bad = True
554
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
555
    else:
556
      if node_result['nodelist']:
557
        bad = True
558
        for node in node_result['nodelist']:
559
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
560
                          (node, node_result['nodelist'][node]))
561
    if 'node-net-test' not in node_result:
562
      bad = True
563
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
564
    else:
565
      if node_result['node-net-test']:
566
        bad = True
567
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
568
        for node in nlist:
569
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
570
                          (node, node_result['node-net-test'][node]))
571

    
572
    hyp_result = node_result.get('hypervisor', None)
573
    if hyp_result is not None:
574
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
575
    return bad
576

    
577
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
578
                      node_instance, feedback_fn):
579
    """Verify an instance.
580

581
    This function checks to see if the required block devices are
582
    available on the instance's node.
583

584
    """
585
    bad = False
586

    
587
    node_current = instanceconfig.primary_node
588

    
589
    node_vol_should = {}
590
    instanceconfig.MapLVsByNode(node_vol_should)
591

    
592
    for node in node_vol_should:
593
      for volume in node_vol_should[node]:
594
        if node not in node_vol_is or volume not in node_vol_is[node]:
595
          feedback_fn("  - ERROR: volume %s missing on node %s" %
596
                          (volume, node))
597
          bad = True
598

    
599
    if not instanceconfig.status == 'down':
600
      if (node_current not in node_instance or
601
          not instance in node_instance[node_current]):
602
        feedback_fn("  - ERROR: instance %s not running on node %s" %
603
                        (instance, node_current))
604
        bad = True
605

    
606
    for node in node_instance:
607
      if (not node == node_current):
608
        if instance in node_instance[node]:
609
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
610
                          (instance, node))
611
          bad = True
612

    
613
    return bad
614

    
615
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
616
    """Verify if there are any unknown volumes in the cluster.
617

618
    The .os, .swap and backup volumes are ignored. All other volumes are
619
    reported as unknown.
620

621
    """
622
    bad = False
623

    
624
    for node in node_vol_is:
625
      for volume in node_vol_is[node]:
626
        if node not in node_vol_should or volume not in node_vol_should[node]:
627
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
628
                      (volume, node))
629
          bad = True
630
    return bad
631

    
632
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
633
    """Verify the list of running instances.
634

635
    This checks what instances are running but unknown to the cluster.
636

637
    """
638
    bad = False
639
    for node in node_instance:
640
      for runninginstance in node_instance[node]:
641
        if runninginstance not in instancelist:
642
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
643
                          (runninginstance, node))
644
          bad = True
645
    return bad
646

    
647
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
648
    """Verify N+1 Memory Resilience.
649

650
    Check that if one single node dies we can still start all the instances it
651
    was primary for.
652

653
    """
654
    bad = False
655

    
656
    for node, nodeinfo in node_info.iteritems():
657
      # This code checks that every node which is now listed as secondary has
658
      # enough memory to host all instances it is supposed to should a single
659
      # other node in the cluster fail.
660
      # FIXME: not ready for failover to an arbitrary node
661
      # FIXME: does not support file-backed instances
662
      # WARNING: we currently take into account down instances as well as up
663
      # ones, considering that even if they're down someone might want to start
664
      # them even in the event of a node failure.
665
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
666
        needed_mem = 0
667
        for instance in instances:
668
          needed_mem += instance_cfg[instance].memory
669
        if nodeinfo['mfree'] < needed_mem:
670
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
671
                      " failovers should node %s fail" % (node, prinode))
672
          bad = True
673
    return bad
674

    
675
  def CheckPrereq(self):
676
    """Check prerequisites.
677

678
    Transform the list of checks we're going to skip into a set and check that
679
    all its members are valid.
680

681
    """
682
    self.skip_set = frozenset(self.op.skip_checks)
683
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
684
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
685

    
686
  def BuildHooksEnv(self):
687
    """Build hooks env.
688

689
    Cluster-Verify hooks just rone in the post phase and their failure makes
690
    the output be logged in the verify output and the verification to fail.
691

692
    """
693
    all_nodes = self.cfg.GetNodeList()
694
    # TODO: populate the environment with useful information for verify hooks
695
    env = {}
696
    return env, [], all_nodes
697

    
698
  def Exec(self, feedback_fn):
699
    """Verify integrity of cluster, performing various test on nodes.
700

701
    """
702
    bad = False
703
    feedback_fn("* Verifying global settings")
704
    for msg in self.cfg.VerifyConfig():
705
      feedback_fn("  - ERROR: %s" % msg)
706

    
707
    vg_name = self.cfg.GetVGName()
708
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
709
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
710
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
711
    i_non_redundant = [] # Non redundant instances
712
    node_volume = {}
713
    node_instance = {}
714
    node_info = {}
715
    instance_cfg = {}
716

    
717
    # FIXME: verify OS list
718
    # do local checksums
719
    file_names = list(self.sstore.GetFileList())
720
    file_names.append(constants.SSL_CERT_FILE)
721
    file_names.append(constants.CLUSTER_CONF_FILE)
722
    local_checksums = utils.FingerprintFiles(file_names)
723

    
724
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
725
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
726
    all_instanceinfo = rpc.call_instance_list(nodelist)
727
    all_vglist = rpc.call_vg_list(nodelist)
728
    node_verify_param = {
729
      'filelist': file_names,
730
      'nodelist': nodelist,
731
      'hypervisor': None,
732
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
733
                        for node in nodeinfo]
734
      }
735
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
736
    all_rversion = rpc.call_version(nodelist)
737
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
738

    
739
    for node in nodelist:
740
      feedback_fn("* Verifying node %s" % node)
741
      result = self._VerifyNode(node, file_names, local_checksums,
742
                                all_vglist[node], all_nvinfo[node],
743
                                all_rversion[node], feedback_fn)
744
      bad = bad or result
745

    
746
      # node_volume
747
      volumeinfo = all_volumeinfo[node]
748

    
749
      if isinstance(volumeinfo, basestring):
750
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
751
                    (node, volumeinfo[-400:].encode('string_escape')))
752
        bad = True
753
        node_volume[node] = {}
754
      elif not isinstance(volumeinfo, dict):
755
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
756
        bad = True
757
        continue
758
      else:
759
        node_volume[node] = volumeinfo
760

    
761
      # node_instance
762
      nodeinstance = all_instanceinfo[node]
763
      if type(nodeinstance) != list:
764
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
765
        bad = True
766
        continue
767

    
768
      node_instance[node] = nodeinstance
769

    
770
      # node_info
771
      nodeinfo = all_ninfo[node]
772
      if not isinstance(nodeinfo, dict):
773
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
774
        bad = True
775
        continue
776

    
777
      try:
778
        node_info[node] = {
779
          "mfree": int(nodeinfo['memory_free']),
780
          "dfree": int(nodeinfo['vg_free']),
781
          "pinst": [],
782
          "sinst": [],
783
          # dictionary holding all instances this node is secondary for,
784
          # grouped by their primary node. Each key is a cluster node, and each
785
          # value is a list of instances which have the key as primary and the
786
          # current node as secondary.  this is handy to calculate N+1 memory
787
          # availability if you can only failover from a primary to its
788
          # secondary.
789
          "sinst-by-pnode": {},
790
        }
791
      except ValueError:
792
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
793
        bad = True
794
        continue
795

    
796
    node_vol_should = {}
797

    
798
    for instance in instancelist:
799
      feedback_fn("* Verifying instance %s" % instance)
800
      inst_config = self.cfg.GetInstanceInfo(instance)
801
      result =  self._VerifyInstance(instance, inst_config, node_volume,
802
                                     node_instance, feedback_fn)
803
      bad = bad or result
804

    
805
      inst_config.MapLVsByNode(node_vol_should)
806

    
807
      instance_cfg[instance] = inst_config
808

    
809
      pnode = inst_config.primary_node
810
      if pnode in node_info:
811
        node_info[pnode]['pinst'].append(instance)
812
      else:
813
        feedback_fn("  - ERROR: instance %s, connection to primary node"
814
                    " %s failed" % (instance, pnode))
815
        bad = True
816

    
817
      # If the instance is non-redundant we cannot survive losing its primary
818
      # node, so we are not N+1 compliant. On the other hand we have no disk
819
      # templates with more than one secondary so that situation is not well
820
      # supported either.
821
      # FIXME: does not support file-backed instances
822
      if len(inst_config.secondary_nodes) == 0:
823
        i_non_redundant.append(instance)
824
      elif len(inst_config.secondary_nodes) > 1:
825
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
826
                    % instance)
827

    
828
      for snode in inst_config.secondary_nodes:
829
        if snode in node_info:
830
          node_info[snode]['sinst'].append(instance)
831
          if pnode not in node_info[snode]['sinst-by-pnode']:
832
            node_info[snode]['sinst-by-pnode'][pnode] = []
833
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
834
        else:
835
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
836
                      " %s failed" % (instance, snode))
837

    
838
    feedback_fn("* Verifying orphan volumes")
839
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
840
                                       feedback_fn)
841
    bad = bad or result
842

    
843
    feedback_fn("* Verifying remaining instances")
844
    result = self._VerifyOrphanInstances(instancelist, node_instance,
845
                                         feedback_fn)
846
    bad = bad or result
847

    
848
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
849
      feedback_fn("* Verifying N+1 Memory redundancy")
850
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
851
      bad = bad or result
852

    
853
    feedback_fn("* Other Notes")
854
    if i_non_redundant:
855
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
856
                  % len(i_non_redundant))
857

    
858
    return int(bad)
859

    
860
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
861
    """Analize the post-hooks' result, handle it, and send some
862
    nicely-formatted feedback back to the user.
863

864
    Args:
865
      phase: the hooks phase that has just been run
866
      hooks_results: the results of the multi-node hooks rpc call
867
      feedback_fn: function to send feedback back to the caller
868
      lu_result: previous Exec result
869

870
    """
871
    # We only really run POST phase hooks, and are only interested in
872
    # their results
873
    if phase == constants.HOOKS_PHASE_POST:
874
      # Used to change hooks' output to proper indentation
875
      indent_re = re.compile('^', re.M)
876
      feedback_fn("* Hooks Results")
877
      if not hooks_results:
878
        feedback_fn("  - ERROR: general communication failure")
879
        lu_result = 1
880
      else:
881
        for node_name in hooks_results:
882
          show_node_header = True
883
          res = hooks_results[node_name]
884
          if res is False or not isinstance(res, list):
885
            feedback_fn("    Communication failure")
886
            lu_result = 1
887
            continue
888
          for script, hkr, output in res:
889
            if hkr == constants.HKR_FAIL:
890
              # The node header is only shown once, if there are
891
              # failing hooks on that node
892
              if show_node_header:
893
                feedback_fn("  Node %s:" % node_name)
894
                show_node_header = False
895
              feedback_fn("    ERROR: Script %s failed, output:" % script)
896
              output = indent_re.sub('      ', output)
897
              feedback_fn("%s" % output)
898
              lu_result = 1
899

    
900
      return lu_result
901

    
902

    
903
class LUVerifyDisks(NoHooksLU):
904
  """Verifies the cluster disks status.
905

906
  """
907
  _OP_REQP = []
908

    
909
  def CheckPrereq(self):
910
    """Check prerequisites.
911

912
    This has no prerequisites.
913

914
    """
915
    pass
916

    
917
  def Exec(self, feedback_fn):
918
    """Verify integrity of cluster disks.
919

920
    """
921
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
922

    
923
    vg_name = self.cfg.GetVGName()
924
    nodes = utils.NiceSort(self.cfg.GetNodeList())
925
    instances = [self.cfg.GetInstanceInfo(name)
926
                 for name in self.cfg.GetInstanceList()]
927

    
928
    nv_dict = {}
929
    for inst in instances:
930
      inst_lvs = {}
931
      if (inst.status != "up" or
932
          inst.disk_template not in constants.DTS_NET_MIRROR):
933
        continue
934
      inst.MapLVsByNode(inst_lvs)
935
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
936
      for node, vol_list in inst_lvs.iteritems():
937
        for vol in vol_list:
938
          nv_dict[(node, vol)] = inst
939

    
940
    if not nv_dict:
941
      return result
942

    
943
    node_lvs = rpc.call_volume_list(nodes, vg_name)
944

    
945
    to_act = set()
946
    for node in nodes:
947
      # node_volume
948
      lvs = node_lvs[node]
949

    
950
      if isinstance(lvs, basestring):
951
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
952
        res_nlvm[node] = lvs
953
      elif not isinstance(lvs, dict):
954
        logger.Info("connection to node %s failed or invalid data returned" %
955
                    (node,))
956
        res_nodes.append(node)
957
        continue
958

    
959
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
960
        inst = nv_dict.pop((node, lv_name), None)
961
        if (not lv_online and inst is not None
962
            and inst.name not in res_instances):
963
          res_instances.append(inst.name)
964

    
965
    # any leftover items in nv_dict are missing LVs, let's arrange the
966
    # data better
967
    for key, inst in nv_dict.iteritems():
968
      if inst.name not in res_missing:
969
        res_missing[inst.name] = []
970
      res_missing[inst.name].append(key)
971

    
972
    return result
973

    
974

    
975
class LURenameCluster(LogicalUnit):
976
  """Rename the cluster.
977

978
  """
979
  HPATH = "cluster-rename"
980
  HTYPE = constants.HTYPE_CLUSTER
981
  _OP_REQP = ["name"]
982
  REQ_WSSTORE = True
983

    
984
  def BuildHooksEnv(self):
985
    """Build hooks env.
986

987
    """
988
    env = {
989
      "OP_TARGET": self.sstore.GetClusterName(),
990
      "NEW_NAME": self.op.name,
991
      }
992
    mn = self.sstore.GetMasterNode()
993
    return env, [mn], [mn]
994

    
995
  def CheckPrereq(self):
996
    """Verify that the passed name is a valid one.
997

998
    """
999
    hostname = utils.HostInfo(self.op.name)
1000

    
1001
    new_name = hostname.name
1002
    self.ip = new_ip = hostname.ip
1003
    old_name = self.sstore.GetClusterName()
1004
    old_ip = self.sstore.GetMasterIP()
1005
    if new_name == old_name and new_ip == old_ip:
1006
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1007
                                 " cluster has changed")
1008
    if new_ip != old_ip:
1009
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1010
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1011
                                   " reachable on the network. Aborting." %
1012
                                   new_ip)
1013

    
1014
    self.op.name = new_name
1015

    
1016
  def Exec(self, feedback_fn):
1017
    """Rename the cluster.
1018

1019
    """
1020
    clustername = self.op.name
1021
    ip = self.ip
1022
    ss = self.sstore
1023

    
1024
    # shutdown the master IP
1025
    master = ss.GetMasterNode()
1026
    if not rpc.call_node_stop_master(master, False):
1027
      raise errors.OpExecError("Could not disable the master role")
1028

    
1029
    try:
1030
      # modify the sstore
1031
      ss.SetKey(ss.SS_MASTER_IP, ip)
1032
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1033

    
1034
      # Distribute updated ss config to all nodes
1035
      myself = self.cfg.GetNodeInfo(master)
1036
      dist_nodes = self.cfg.GetNodeList()
1037
      if myself.name in dist_nodes:
1038
        dist_nodes.remove(myself.name)
1039

    
1040
      logger.Debug("Copying updated ssconf data to all nodes")
1041
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1042
        fname = ss.KeyToFilename(keyname)
1043
        result = rpc.call_upload_file(dist_nodes, fname)
1044
        for to_node in dist_nodes:
1045
          if not result[to_node]:
1046
            logger.Error("copy of file %s to node %s failed" %
1047
                         (fname, to_node))
1048
    finally:
1049
      if not rpc.call_node_start_master(master, False):
1050
        logger.Error("Could not re-enable the master role on the master,"
1051
                     " please restart manually.")
1052

    
1053

    
1054
def _RecursiveCheckIfLVMBased(disk):
1055
  """Check if the given disk or its children are lvm-based.
1056

1057
  Args:
1058
    disk: ganeti.objects.Disk object
1059

1060
  Returns:
1061
    boolean indicating whether a LD_LV dev_type was found or not
1062

1063
  """
1064
  if disk.children:
1065
    for chdisk in disk.children:
1066
      if _RecursiveCheckIfLVMBased(chdisk):
1067
        return True
1068
  return disk.dev_type == constants.LD_LV
1069

    
1070

    
1071
class LUSetClusterParams(LogicalUnit):
1072
  """Change the parameters of the cluster.
1073

1074
  """
1075
  HPATH = "cluster-modify"
1076
  HTYPE = constants.HTYPE_CLUSTER
1077
  _OP_REQP = []
1078

    
1079
  def BuildHooksEnv(self):
1080
    """Build hooks env.
1081

1082
    """
1083
    env = {
1084
      "OP_TARGET": self.sstore.GetClusterName(),
1085
      "NEW_VG_NAME": self.op.vg_name,
1086
      }
1087
    mn = self.sstore.GetMasterNode()
1088
    return env, [mn], [mn]
1089

    
1090
  def CheckPrereq(self):
1091
    """Check prerequisites.
1092

1093
    This checks whether the given params don't conflict and
1094
    if the given volume group is valid.
1095

1096
    """
1097
    if not self.op.vg_name:
1098
      instances = [self.cfg.GetInstanceInfo(name)
1099
                   for name in self.cfg.GetInstanceList()]
1100
      for inst in instances:
1101
        for disk in inst.disks:
1102
          if _RecursiveCheckIfLVMBased(disk):
1103
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1104
                                       " lvm-based instances exist")
1105

    
1106
    # if vg_name not None, checks given volume group on all nodes
1107
    if self.op.vg_name:
1108
      node_list = self.cfg.GetNodeList()
1109
      vglist = rpc.call_vg_list(node_list)
1110
      for node in node_list:
1111
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1112
                                              constants.MIN_VG_SIZE)
1113
        if vgstatus:
1114
          raise errors.OpPrereqError("Error on node '%s': %s" %
1115
                                     (node, vgstatus))
1116

    
1117
  def Exec(self, feedback_fn):
1118
    """Change the parameters of the cluster.
1119

1120
    """
1121
    if self.op.vg_name != self.cfg.GetVGName():
1122
      self.cfg.SetVGName(self.op.vg_name)
1123
    else:
1124
      feedback_fn("Cluster LVM configuration already in desired"
1125
                  " state, not changing")
1126

    
1127

    
1128
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1129
  """Sleep and poll for an instance's disk to sync.
1130

1131
  """
1132
  if not instance.disks:
1133
    return True
1134

    
1135
  if not oneshot:
1136
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1137

    
1138
  node = instance.primary_node
1139

    
1140
  for dev in instance.disks:
1141
    cfgw.SetDiskID(dev, node)
1142

    
1143
  retries = 0
1144
  while True:
1145
    max_time = 0
1146
    done = True
1147
    cumul_degraded = False
1148
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1149
    if not rstats:
1150
      proc.LogWarning("Can't get any data from node %s" % node)
1151
      retries += 1
1152
      if retries >= 10:
1153
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1154
                                 " aborting." % node)
1155
      time.sleep(6)
1156
      continue
1157
    retries = 0
1158
    for i in range(len(rstats)):
1159
      mstat = rstats[i]
1160
      if mstat is None:
1161
        proc.LogWarning("Can't compute data for node %s/%s" %
1162
                        (node, instance.disks[i].iv_name))
1163
        continue
1164
      # we ignore the ldisk parameter
1165
      perc_done, est_time, is_degraded, _ = mstat
1166
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1167
      if perc_done is not None:
1168
        done = False
1169
        if est_time is not None:
1170
          rem_time = "%d estimated seconds remaining" % est_time
1171
          max_time = est_time
1172
        else:
1173
          rem_time = "no time estimate"
1174
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1175
                     (instance.disks[i].iv_name, perc_done, rem_time))
1176
    if done or oneshot:
1177
      break
1178

    
1179
    time.sleep(min(60, max_time))
1180

    
1181
  if done:
1182
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1183
  return not cumul_degraded
1184

    
1185

    
1186
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1187
  """Check that mirrors are not degraded.
1188

1189
  The ldisk parameter, if True, will change the test from the
1190
  is_degraded attribute (which represents overall non-ok status for
1191
  the device(s)) to the ldisk (representing the local storage status).
1192

1193
  """
1194
  cfgw.SetDiskID(dev, node)
1195
  if ldisk:
1196
    idx = 6
1197
  else:
1198
    idx = 5
1199

    
1200
  result = True
1201
  if on_primary or dev.AssembleOnSecondary():
1202
    rstats = rpc.call_blockdev_find(node, dev)
1203
    if not rstats:
1204
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1205
      result = False
1206
    else:
1207
      result = result and (not rstats[idx])
1208
  if dev.children:
1209
    for child in dev.children:
1210
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1211

    
1212
  return result
1213

    
1214

    
1215
class LUDiagnoseOS(NoHooksLU):
1216
  """Logical unit for OS diagnose/query.
1217

1218
  """
1219
  _OP_REQP = ["output_fields", "names"]
1220

    
1221
  def CheckPrereq(self):
1222
    """Check prerequisites.
1223

1224
    This always succeeds, since this is a pure query LU.
1225

1226
    """
1227
    if self.op.names:
1228
      raise errors.OpPrereqError("Selective OS query not supported")
1229

    
1230
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1231
    _CheckOutputFields(static=[],
1232
                       dynamic=self.dynamic_fields,
1233
                       selected=self.op.output_fields)
1234

    
1235
  @staticmethod
1236
  def _DiagnoseByOS(node_list, rlist):
1237
    """Remaps a per-node return list into an a per-os per-node dictionary
1238

1239
      Args:
1240
        node_list: a list with the names of all nodes
1241
        rlist: a map with node names as keys and OS objects as values
1242

1243
      Returns:
1244
        map: a map with osnames as keys and as value another map, with
1245
             nodes as
1246
             keys and list of OS objects as values
1247
             e.g. {"debian-etch": {"node1": [<object>,...],
1248
                                   "node2": [<object>,]}
1249
                  }
1250

1251
    """
1252
    all_os = {}
1253
    for node_name, nr in rlist.iteritems():
1254
      if not nr:
1255
        continue
1256
      for os_obj in nr:
1257
        if os_obj.name not in all_os:
1258
          # build a list of nodes for this os containing empty lists
1259
          # for each node in node_list
1260
          all_os[os_obj.name] = {}
1261
          for nname in node_list:
1262
            all_os[os_obj.name][nname] = []
1263
        all_os[os_obj.name][node_name].append(os_obj)
1264
    return all_os
1265

    
1266
  def Exec(self, feedback_fn):
1267
    """Compute the list of OSes.
1268

1269
    """
1270
    node_list = self.cfg.GetNodeList()
1271
    node_data = rpc.call_os_diagnose(node_list)
1272
    if node_data == False:
1273
      raise errors.OpExecError("Can't gather the list of OSes")
1274
    pol = self._DiagnoseByOS(node_list, node_data)
1275
    output = []
1276
    for os_name, os_data in pol.iteritems():
1277
      row = []
1278
      for field in self.op.output_fields:
1279
        if field == "name":
1280
          val = os_name
1281
        elif field == "valid":
1282
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1283
        elif field == "node_status":
1284
          val = {}
1285
          for node_name, nos_list in os_data.iteritems():
1286
            val[node_name] = [(v.status, v.path) for v in nos_list]
1287
        else:
1288
          raise errors.ParameterError(field)
1289
        row.append(val)
1290
      output.append(row)
1291

    
1292
    return output
1293

    
1294

    
1295
class LURemoveNode(LogicalUnit):
1296
  """Logical unit for removing a node.
1297

1298
  """
1299
  HPATH = "node-remove"
1300
  HTYPE = constants.HTYPE_NODE
1301
  _OP_REQP = ["node_name"]
1302

    
1303
  def BuildHooksEnv(self):
1304
    """Build hooks env.
1305

1306
    This doesn't run on the target node in the pre phase as a failed
1307
    node would then be impossible to remove.
1308

1309
    """
1310
    env = {
1311
      "OP_TARGET": self.op.node_name,
1312
      "NODE_NAME": self.op.node_name,
1313
      }
1314
    all_nodes = self.cfg.GetNodeList()
1315
    all_nodes.remove(self.op.node_name)
1316
    return env, all_nodes, all_nodes
1317

    
1318
  def CheckPrereq(self):
1319
    """Check prerequisites.
1320

1321
    This checks:
1322
     - the node exists in the configuration
1323
     - it does not have primary or secondary instances
1324
     - it's not the master
1325

1326
    Any errors are signalled by raising errors.OpPrereqError.
1327

1328
    """
1329
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1330
    if node is None:
1331
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1332

    
1333
    instance_list = self.cfg.GetInstanceList()
1334

    
1335
    masternode = self.sstore.GetMasterNode()
1336
    if node.name == masternode:
1337
      raise errors.OpPrereqError("Node is the master node,"
1338
                                 " you need to failover first.")
1339

    
1340
    for instance_name in instance_list:
1341
      instance = self.cfg.GetInstanceInfo(instance_name)
1342
      if node.name == instance.primary_node:
1343
        raise errors.OpPrereqError("Instance %s still running on the node,"
1344
                                   " please remove first." % instance_name)
1345
      if node.name in instance.secondary_nodes:
1346
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1347
                                   " please remove first." % instance_name)
1348
    self.op.node_name = node.name
1349
    self.node = node
1350

    
1351
  def Exec(self, feedback_fn):
1352
    """Removes the node from the cluster.
1353

1354
    """
1355
    node = self.node
1356
    logger.Info("stopping the node daemon and removing configs from node %s" %
1357
                node.name)
1358

    
1359
    rpc.call_node_leave_cluster(node.name)
1360

    
1361
    logger.Info("Removing node %s from config" % node.name)
1362

    
1363
    self.cfg.RemoveNode(node.name)
1364
    # Remove the node from the Ganeti Lock Manager
1365
    self.context.glm.remove(locking.LEVEL_NODE, node.name)
1366

    
1367
    utils.RemoveHostFromEtcHosts(node.name)
1368

    
1369

    
1370
class LUQueryNodes(NoHooksLU):
1371
  """Logical unit for querying nodes.
1372

1373
  """
1374
  _OP_REQP = ["output_fields", "names"]
1375

    
1376
  def CheckPrereq(self):
1377
    """Check prerequisites.
1378

1379
    This checks that the fields required are valid output fields.
1380

1381
    """
1382
    self.dynamic_fields = frozenset([
1383
      "dtotal", "dfree",
1384
      "mtotal", "mnode", "mfree",
1385
      "bootid",
1386
      "ctotal",
1387
      ])
1388

    
1389
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1390
                               "pinst_list", "sinst_list",
1391
                               "pip", "sip", "tags"],
1392
                       dynamic=self.dynamic_fields,
1393
                       selected=self.op.output_fields)
1394

    
1395
    self.wanted = _GetWantedNodes(self, self.op.names)
1396

    
1397
  def Exec(self, feedback_fn):
1398
    """Computes the list of nodes and their attributes.
1399

1400
    """
1401
    nodenames = self.wanted
1402
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1403

    
1404
    # begin data gathering
1405

    
1406
    if self.dynamic_fields.intersection(self.op.output_fields):
1407
      live_data = {}
1408
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1409
      for name in nodenames:
1410
        nodeinfo = node_data.get(name, None)
1411
        if nodeinfo:
1412
          live_data[name] = {
1413
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1414
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1415
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1416
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1417
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1418
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1419
            "bootid": nodeinfo['bootid'],
1420
            }
1421
        else:
1422
          live_data[name] = {}
1423
    else:
1424
      live_data = dict.fromkeys(nodenames, {})
1425

    
1426
    node_to_primary = dict([(name, set()) for name in nodenames])
1427
    node_to_secondary = dict([(name, set()) for name in nodenames])
1428

    
1429
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1430
                             "sinst_cnt", "sinst_list"))
1431
    if inst_fields & frozenset(self.op.output_fields):
1432
      instancelist = self.cfg.GetInstanceList()
1433

    
1434
      for instance_name in instancelist:
1435
        inst = self.cfg.GetInstanceInfo(instance_name)
1436
        if inst.primary_node in node_to_primary:
1437
          node_to_primary[inst.primary_node].add(inst.name)
1438
        for secnode in inst.secondary_nodes:
1439
          if secnode in node_to_secondary:
1440
            node_to_secondary[secnode].add(inst.name)
1441

    
1442
    # end data gathering
1443

    
1444
    output = []
1445
    for node in nodelist:
1446
      node_output = []
1447
      for field in self.op.output_fields:
1448
        if field == "name":
1449
          val = node.name
1450
        elif field == "pinst_list":
1451
          val = list(node_to_primary[node.name])
1452
        elif field == "sinst_list":
1453
          val = list(node_to_secondary[node.name])
1454
        elif field == "pinst_cnt":
1455
          val = len(node_to_primary[node.name])
1456
        elif field == "sinst_cnt":
1457
          val = len(node_to_secondary[node.name])
1458
        elif field == "pip":
1459
          val = node.primary_ip
1460
        elif field == "sip":
1461
          val = node.secondary_ip
1462
        elif field == "tags":
1463
          val = list(node.GetTags())
1464
        elif field in self.dynamic_fields:
1465
          val = live_data[node.name].get(field, None)
1466
        else:
1467
          raise errors.ParameterError(field)
1468
        node_output.append(val)
1469
      output.append(node_output)
1470

    
1471
    return output
1472

    
1473

    
1474
class LUQueryNodeVolumes(NoHooksLU):
1475
  """Logical unit for getting volumes on node(s).
1476

1477
  """
1478
  _OP_REQP = ["nodes", "output_fields"]
1479

    
1480
  def CheckPrereq(self):
1481
    """Check prerequisites.
1482

1483
    This checks that the fields required are valid output fields.
1484

1485
    """
1486
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1487

    
1488
    _CheckOutputFields(static=["node"],
1489
                       dynamic=["phys", "vg", "name", "size", "instance"],
1490
                       selected=self.op.output_fields)
1491

    
1492

    
1493
  def Exec(self, feedback_fn):
1494
    """Computes the list of nodes and their attributes.
1495

1496
    """
1497
    nodenames = self.nodes
1498
    volumes = rpc.call_node_volumes(nodenames)
1499

    
1500
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1501
             in self.cfg.GetInstanceList()]
1502

    
1503
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1504

    
1505
    output = []
1506
    for node in nodenames:
1507
      if node not in volumes or not volumes[node]:
1508
        continue
1509

    
1510
      node_vols = volumes[node][:]
1511
      node_vols.sort(key=lambda vol: vol['dev'])
1512

    
1513
      for vol in node_vols:
1514
        node_output = []
1515
        for field in self.op.output_fields:
1516
          if field == "node":
1517
            val = node
1518
          elif field == "phys":
1519
            val = vol['dev']
1520
          elif field == "vg":
1521
            val = vol['vg']
1522
          elif field == "name":
1523
            val = vol['name']
1524
          elif field == "size":
1525
            val = int(float(vol['size']))
1526
          elif field == "instance":
1527
            for inst in ilist:
1528
              if node not in lv_by_node[inst]:
1529
                continue
1530
              if vol['name'] in lv_by_node[inst][node]:
1531
                val = inst.name
1532
                break
1533
            else:
1534
              val = '-'
1535
          else:
1536
            raise errors.ParameterError(field)
1537
          node_output.append(str(val))
1538

    
1539
        output.append(node_output)
1540

    
1541
    return output
1542

    
1543

    
1544
class LUAddNode(LogicalUnit):
1545
  """Logical unit for adding node to the cluster.
1546

1547
  """
1548
  HPATH = "node-add"
1549
  HTYPE = constants.HTYPE_NODE
1550
  _OP_REQP = ["node_name"]
1551

    
1552
  def BuildHooksEnv(self):
1553
    """Build hooks env.
1554

1555
    This will run on all nodes before, and on all nodes + the new node after.
1556

1557
    """
1558
    env = {
1559
      "OP_TARGET": self.op.node_name,
1560
      "NODE_NAME": self.op.node_name,
1561
      "NODE_PIP": self.op.primary_ip,
1562
      "NODE_SIP": self.op.secondary_ip,
1563
      }
1564
    nodes_0 = self.cfg.GetNodeList()
1565
    nodes_1 = nodes_0 + [self.op.node_name, ]
1566
    return env, nodes_0, nodes_1
1567

    
1568
  def CheckPrereq(self):
1569
    """Check prerequisites.
1570

1571
    This checks:
1572
     - the new node is not already in the config
1573
     - it is resolvable
1574
     - its parameters (single/dual homed) matches the cluster
1575

1576
    Any errors are signalled by raising errors.OpPrereqError.
1577

1578
    """
1579
    node_name = self.op.node_name
1580
    cfg = self.cfg
1581

    
1582
    dns_data = utils.HostInfo(node_name)
1583

    
1584
    node = dns_data.name
1585
    primary_ip = self.op.primary_ip = dns_data.ip
1586
    secondary_ip = getattr(self.op, "secondary_ip", None)
1587
    if secondary_ip is None:
1588
      secondary_ip = primary_ip
1589
    if not utils.IsValidIP(secondary_ip):
1590
      raise errors.OpPrereqError("Invalid secondary IP given")
1591
    self.op.secondary_ip = secondary_ip
1592

    
1593
    node_list = cfg.GetNodeList()
1594
    if not self.op.readd and node in node_list:
1595
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1596
                                 node)
1597
    elif self.op.readd and node not in node_list:
1598
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1599

    
1600
    for existing_node_name in node_list:
1601
      existing_node = cfg.GetNodeInfo(existing_node_name)
1602

    
1603
      if self.op.readd and node == existing_node_name:
1604
        if (existing_node.primary_ip != primary_ip or
1605
            existing_node.secondary_ip != secondary_ip):
1606
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1607
                                     " address configuration as before")
1608
        continue
1609

    
1610
      if (existing_node.primary_ip == primary_ip or
1611
          existing_node.secondary_ip == primary_ip or
1612
          existing_node.primary_ip == secondary_ip or
1613
          existing_node.secondary_ip == secondary_ip):
1614
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1615
                                   " existing node %s" % existing_node.name)
1616

    
1617
    # check that the type of the node (single versus dual homed) is the
1618
    # same as for the master
1619
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1620
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1621
    newbie_singlehomed = secondary_ip == primary_ip
1622
    if master_singlehomed != newbie_singlehomed:
1623
      if master_singlehomed:
1624
        raise errors.OpPrereqError("The master has no private ip but the"
1625
                                   " new node has one")
1626
      else:
1627
        raise errors.OpPrereqError("The master has a private ip but the"
1628
                                   " new node doesn't have one")
1629

    
1630
    # checks reachablity
1631
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1632
      raise errors.OpPrereqError("Node not reachable by ping")
1633

    
1634
    if not newbie_singlehomed:
1635
      # check reachability from my secondary ip to newbie's secondary ip
1636
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1637
                           source=myself.secondary_ip):
1638
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1639
                                   " based ping to noded port")
1640

    
1641
    self.new_node = objects.Node(name=node,
1642
                                 primary_ip=primary_ip,
1643
                                 secondary_ip=secondary_ip)
1644

    
1645
  def Exec(self, feedback_fn):
1646
    """Adds the new node to the cluster.
1647

1648
    """
1649
    new_node = self.new_node
1650
    node = new_node.name
1651

    
1652
    # check connectivity
1653
    result = rpc.call_version([node])[node]
1654
    if result:
1655
      if constants.PROTOCOL_VERSION == result:
1656
        logger.Info("communication to node %s fine, sw version %s match" %
1657
                    (node, result))
1658
      else:
1659
        raise errors.OpExecError("Version mismatch master version %s,"
1660
                                 " node version %s" %
1661
                                 (constants.PROTOCOL_VERSION, result))
1662
    else:
1663
      raise errors.OpExecError("Cannot get version from the new node")
1664

    
1665
    # setup ssh on node
1666
    logger.Info("copy ssh key to node %s" % node)
1667
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1668
    keyarray = []
1669
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1670
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1671
                priv_key, pub_key]
1672

    
1673
    for i in keyfiles:
1674
      f = open(i, 'r')
1675
      try:
1676
        keyarray.append(f.read())
1677
      finally:
1678
        f.close()
1679

    
1680
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1681
                               keyarray[3], keyarray[4], keyarray[5])
1682

    
1683
    if not result:
1684
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1685

    
1686
    # Add node to our /etc/hosts, and add key to known_hosts
1687
    utils.AddHostToEtcHosts(new_node.name)
1688

    
1689
    if new_node.secondary_ip != new_node.primary_ip:
1690
      if not rpc.call_node_tcp_ping(new_node.name,
1691
                                    constants.LOCALHOST_IP_ADDRESS,
1692
                                    new_node.secondary_ip,
1693
                                    constants.DEFAULT_NODED_PORT,
1694
                                    10, False):
1695
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1696
                                 " you gave (%s). Please fix and re-run this"
1697
                                 " command." % new_node.secondary_ip)
1698

    
1699
    node_verify_list = [self.sstore.GetMasterNode()]
1700
    node_verify_param = {
1701
      'nodelist': [node],
1702
      # TODO: do a node-net-test as well?
1703
    }
1704

    
1705
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1706
    for verifier in node_verify_list:
1707
      if not result[verifier]:
1708
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1709
                                 " for remote verification" % verifier)
1710
      if result[verifier]['nodelist']:
1711
        for failed in result[verifier]['nodelist']:
1712
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1713
                      (verifier, result[verifier]['nodelist'][failed]))
1714
        raise errors.OpExecError("ssh/hostname verification failed.")
1715

    
1716
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1717
    # including the node just added
1718
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1719
    dist_nodes = self.cfg.GetNodeList()
1720
    if not self.op.readd:
1721
      dist_nodes.append(node)
1722
    if myself.name in dist_nodes:
1723
      dist_nodes.remove(myself.name)
1724

    
1725
    logger.Debug("Copying hosts and known_hosts to all nodes")
1726
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1727
      result = rpc.call_upload_file(dist_nodes, fname)
1728
      for to_node in dist_nodes:
1729
        if not result[to_node]:
1730
          logger.Error("copy of file %s to node %s failed" %
1731
                       (fname, to_node))
1732

    
1733
    to_copy = self.sstore.GetFileList()
1734
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1735
      to_copy.append(constants.VNC_PASSWORD_FILE)
1736
    for fname in to_copy:
1737
      result = rpc.call_upload_file([node], fname)
1738
      if not result[node]:
1739
        logger.Error("could not copy file %s to node %s" % (fname, node))
1740

    
1741
    if not self.op.readd:
1742
      logger.Info("adding node %s to cluster.conf" % node)
1743
      self.cfg.AddNode(new_node)
1744
      # Add the new node to the Ganeti Lock Manager
1745
      self.context.glm.add(locking.LEVEL_NODE, node)
1746

    
1747

    
1748
class LUQueryClusterInfo(NoHooksLU):
1749
  """Query cluster configuration.
1750

1751
  """
1752
  _OP_REQP = []
1753
  REQ_MASTER = False
1754
  REQ_BGL = False
1755

    
1756
  def ExpandNames(self):
1757
    self.needed_locks = {}
1758

    
1759
  def CheckPrereq(self):
1760
    """No prerequsites needed for this LU.
1761

1762
    """
1763
    pass
1764

    
1765
  def Exec(self, feedback_fn):
1766
    """Return cluster config.
1767

1768
    """
1769
    result = {
1770
      "name": self.sstore.GetClusterName(),
1771
      "software_version": constants.RELEASE_VERSION,
1772
      "protocol_version": constants.PROTOCOL_VERSION,
1773
      "config_version": constants.CONFIG_VERSION,
1774
      "os_api_version": constants.OS_API_VERSION,
1775
      "export_version": constants.EXPORT_VERSION,
1776
      "master": self.sstore.GetMasterNode(),
1777
      "architecture": (platform.architecture()[0], platform.machine()),
1778
      "hypervisor_type": self.sstore.GetHypervisorType(),
1779
      }
1780

    
1781
    return result
1782

    
1783

    
1784
class LUDumpClusterConfig(NoHooksLU):
1785
  """Return a text-representation of the cluster-config.
1786

1787
  """
1788
  _OP_REQP = []
1789
  REQ_BGL = False
1790

    
1791
  def ExpandNames(self):
1792
    self.needed_locks = {}
1793

    
1794
  def CheckPrereq(self):
1795
    """No prerequisites.
1796

1797
    """
1798
    pass
1799

    
1800
  def Exec(self, feedback_fn):
1801
    """Dump a representation of the cluster config to the standard output.
1802

1803
    """
1804
    return self.cfg.DumpConfig()
1805

    
1806

    
1807
class LUActivateInstanceDisks(NoHooksLU):
1808
  """Bring up an instance's disks.
1809

1810
  """
1811
  _OP_REQP = ["instance_name"]
1812

    
1813
  def CheckPrereq(self):
1814
    """Check prerequisites.
1815

1816
    This checks that the instance is in the cluster.
1817

1818
    """
1819
    instance = self.cfg.GetInstanceInfo(
1820
      self.cfg.ExpandInstanceName(self.op.instance_name))
1821
    if instance is None:
1822
      raise errors.OpPrereqError("Instance '%s' not known" %
1823
                                 self.op.instance_name)
1824
    self.instance = instance
1825

    
1826

    
1827
  def Exec(self, feedback_fn):
1828
    """Activate the disks.
1829

1830
    """
1831
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1832
    if not disks_ok:
1833
      raise errors.OpExecError("Cannot activate block devices")
1834

    
1835
    return disks_info
1836

    
1837

    
1838
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1839
  """Prepare the block devices for an instance.
1840

1841
  This sets up the block devices on all nodes.
1842

1843
  Args:
1844
    instance: a ganeti.objects.Instance object
1845
    ignore_secondaries: if true, errors on secondary nodes won't result
1846
                        in an error return from the function
1847

1848
  Returns:
1849
    false if the operation failed
1850
    list of (host, instance_visible_name, node_visible_name) if the operation
1851
         suceeded with the mapping from node devices to instance devices
1852
  """
1853
  device_info = []
1854
  disks_ok = True
1855
  iname = instance.name
1856
  # With the two passes mechanism we try to reduce the window of
1857
  # opportunity for the race condition of switching DRBD to primary
1858
  # before handshaking occured, but we do not eliminate it
1859

    
1860
  # The proper fix would be to wait (with some limits) until the
1861
  # connection has been made and drbd transitions from WFConnection
1862
  # into any other network-connected state (Connected, SyncTarget,
1863
  # SyncSource, etc.)
1864

    
1865
  # 1st pass, assemble on all nodes in secondary mode
1866
  for inst_disk in instance.disks:
1867
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1868
      cfg.SetDiskID(node_disk, node)
1869
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1870
      if not result:
1871
        logger.Error("could not prepare block device %s on node %s"
1872
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1873
        if not ignore_secondaries:
1874
          disks_ok = False
1875

    
1876
  # FIXME: race condition on drbd migration to primary
1877

    
1878
  # 2nd pass, do only the primary node
1879
  for inst_disk in instance.disks:
1880
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1881
      if node != instance.primary_node:
1882
        continue
1883
      cfg.SetDiskID(node_disk, node)
1884
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1885
      if not result:
1886
        logger.Error("could not prepare block device %s on node %s"
1887
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1888
        disks_ok = False
1889
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1890

    
1891
  # leave the disks configured for the primary node
1892
  # this is a workaround that would be fixed better by
1893
  # improving the logical/physical id handling
1894
  for disk in instance.disks:
1895
    cfg.SetDiskID(disk, instance.primary_node)
1896

    
1897
  return disks_ok, device_info
1898

    
1899

    
1900
def _StartInstanceDisks(cfg, instance, force):
1901
  """Start the disks of an instance.
1902

1903
  """
1904
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1905
                                           ignore_secondaries=force)
1906
  if not disks_ok:
1907
    _ShutdownInstanceDisks(instance, cfg)
1908
    if force is not None and not force:
1909
      logger.Error("If the message above refers to a secondary node,"
1910
                   " you can retry the operation using '--force'.")
1911
    raise errors.OpExecError("Disk consistency error")
1912

    
1913

    
1914
class LUDeactivateInstanceDisks(NoHooksLU):
1915
  """Shutdown an instance's disks.
1916

1917
  """
1918
  _OP_REQP = ["instance_name"]
1919

    
1920
  def CheckPrereq(self):
1921
    """Check prerequisites.
1922

1923
    This checks that the instance is in the cluster.
1924

1925
    """
1926
    instance = self.cfg.GetInstanceInfo(
1927
      self.cfg.ExpandInstanceName(self.op.instance_name))
1928
    if instance is None:
1929
      raise errors.OpPrereqError("Instance '%s' not known" %
1930
                                 self.op.instance_name)
1931
    self.instance = instance
1932

    
1933
  def Exec(self, feedback_fn):
1934
    """Deactivate the disks
1935

1936
    """
1937
    instance = self.instance
1938
    ins_l = rpc.call_instance_list([instance.primary_node])
1939
    ins_l = ins_l[instance.primary_node]
1940
    if not type(ins_l) is list:
1941
      raise errors.OpExecError("Can't contact node '%s'" %
1942
                               instance.primary_node)
1943

    
1944
    if self.instance.name in ins_l:
1945
      raise errors.OpExecError("Instance is running, can't shutdown"
1946
                               " block devices.")
1947

    
1948
    _ShutdownInstanceDisks(instance, self.cfg)
1949

    
1950

    
1951
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1952
  """Shutdown block devices of an instance.
1953

1954
  This does the shutdown on all nodes of the instance.
1955

1956
  If the ignore_primary is false, errors on the primary node are
1957
  ignored.
1958

1959
  """
1960
  result = True
1961
  for disk in instance.disks:
1962
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1963
      cfg.SetDiskID(top_disk, node)
1964
      if not rpc.call_blockdev_shutdown(node, top_disk):
1965
        logger.Error("could not shutdown block device %s on node %s" %
1966
                     (disk.iv_name, node))
1967
        if not ignore_primary or node != instance.primary_node:
1968
          result = False
1969
  return result
1970

    
1971

    
1972
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1973
  """Checks if a node has enough free memory.
1974

1975
  This function check if a given node has the needed amount of free
1976
  memory. In case the node has less memory or we cannot get the
1977
  information from the node, this function raise an OpPrereqError
1978
  exception.
1979

1980
  Args:
1981
    - cfg: a ConfigWriter instance
1982
    - node: the node name
1983
    - reason: string to use in the error message
1984
    - requested: the amount of memory in MiB
1985

1986
  """
1987
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1988
  if not nodeinfo or not isinstance(nodeinfo, dict):
1989
    raise errors.OpPrereqError("Could not contact node %s for resource"
1990
                             " information" % (node,))
1991

    
1992
  free_mem = nodeinfo[node].get('memory_free')
1993
  if not isinstance(free_mem, int):
1994
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1995
                             " was '%s'" % (node, free_mem))
1996
  if requested > free_mem:
1997
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1998
                             " needed %s MiB, available %s MiB" %
1999
                             (node, reason, requested, free_mem))
2000

    
2001

    
2002
class LUStartupInstance(LogicalUnit):
2003
  """Starts an instance.
2004

2005
  """
2006
  HPATH = "instance-start"
2007
  HTYPE = constants.HTYPE_INSTANCE
2008
  _OP_REQP = ["instance_name", "force"]
2009
  REQ_BGL = False
2010

    
2011
  def ExpandNames(self):
2012
    self._ExpandAndLockInstance()
2013
    self.needed_locks[locking.LEVEL_NODE] = []
2014
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2015

    
2016
  def DeclareLocks(self, level):
2017
    if level == locking.LEVEL_NODE:
2018
      self._LockInstancesNodes()
2019

    
2020
  def BuildHooksEnv(self):
2021
    """Build hooks env.
2022

2023
    This runs on master, primary and secondary nodes of the instance.
2024

2025
    """
2026
    env = {
2027
      "FORCE": self.op.force,
2028
      }
2029
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2030
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2031
          list(self.instance.secondary_nodes))
2032
    return env, nl, nl
2033

    
2034
  def CheckPrereq(self):
2035
    """Check prerequisites.
2036

2037
    This checks that the instance is in the cluster.
2038

2039
    """
2040
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2041
    assert self.instance is not None, \
2042
      "Cannot retrieve locked instance %s" % self.op.instance_name
2043

    
2044
    # check bridges existance
2045
    _CheckInstanceBridgesExist(instance)
2046

    
2047
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2048
                         "starting instance %s" % instance.name,
2049
                         instance.memory)
2050

    
2051
  def Exec(self, feedback_fn):
2052
    """Start the instance.
2053

2054
    """
2055
    instance = self.instance
2056
    force = self.op.force
2057
    extra_args = getattr(self.op, "extra_args", "")
2058

    
2059
    self.cfg.MarkInstanceUp(instance.name)
2060

    
2061
    node_current = instance.primary_node
2062

    
2063
    _StartInstanceDisks(self.cfg, instance, force)
2064

    
2065
    if not rpc.call_instance_start(node_current, instance, extra_args):
2066
      _ShutdownInstanceDisks(instance, self.cfg)
2067
      raise errors.OpExecError("Could not start instance")
2068

    
2069

    
2070
class LURebootInstance(LogicalUnit):
2071
  """Reboot an instance.
2072

2073
  """
2074
  HPATH = "instance-reboot"
2075
  HTYPE = constants.HTYPE_INSTANCE
2076
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2077
  REQ_BGL = False
2078

    
2079
  def ExpandNames(self):
2080
    self._ExpandAndLockInstance()
2081
    self.needed_locks[locking.LEVEL_NODE] = []
2082
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2083

    
2084
  def DeclareLocks(self, level):
2085
    if level == locking.LEVEL_NODE:
2086
      self._LockInstancesNodes()
2087

    
2088
  def BuildHooksEnv(self):
2089
    """Build hooks env.
2090

2091
    This runs on master, primary and secondary nodes of the instance.
2092

2093
    """
2094
    env = {
2095
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2096
      }
2097
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2098
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2099
          list(self.instance.secondary_nodes))
2100
    return env, nl, nl
2101

    
2102
  def CheckPrereq(self):
2103
    """Check prerequisites.
2104

2105
    This checks that the instance is in the cluster.
2106

2107
    """
2108
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2109
    assert self.instance is not None, \
2110
      "Cannot retrieve locked instance %s" % self.op.instance_name
2111

    
2112
    # check bridges existance
2113
    _CheckInstanceBridgesExist(instance)
2114

    
2115
  def Exec(self, feedback_fn):
2116
    """Reboot the instance.
2117

2118
    """
2119
    instance = self.instance
2120
    ignore_secondaries = self.op.ignore_secondaries
2121
    reboot_type = self.op.reboot_type
2122
    extra_args = getattr(self.op, "extra_args", "")
2123

    
2124
    node_current = instance.primary_node
2125

    
2126
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2127
                           constants.INSTANCE_REBOOT_HARD,
2128
                           constants.INSTANCE_REBOOT_FULL]:
2129
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2130
                                  (constants.INSTANCE_REBOOT_SOFT,
2131
                                   constants.INSTANCE_REBOOT_HARD,
2132
                                   constants.INSTANCE_REBOOT_FULL))
2133

    
2134
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2135
                       constants.INSTANCE_REBOOT_HARD]:
2136
      if not rpc.call_instance_reboot(node_current, instance,
2137
                                      reboot_type, extra_args):
2138
        raise errors.OpExecError("Could not reboot instance")
2139
    else:
2140
      if not rpc.call_instance_shutdown(node_current, instance):
2141
        raise errors.OpExecError("could not shutdown instance for full reboot")
2142
      _ShutdownInstanceDisks(instance, self.cfg)
2143
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2144
      if not rpc.call_instance_start(node_current, instance, extra_args):
2145
        _ShutdownInstanceDisks(instance, self.cfg)
2146
        raise errors.OpExecError("Could not start instance for full reboot")
2147

    
2148
    self.cfg.MarkInstanceUp(instance.name)
2149

    
2150

    
2151
class LUShutdownInstance(LogicalUnit):
2152
  """Shutdown an instance.
2153

2154
  """
2155
  HPATH = "instance-stop"
2156
  HTYPE = constants.HTYPE_INSTANCE
2157
  _OP_REQP = ["instance_name"]
2158
  REQ_BGL = False
2159

    
2160
  def ExpandNames(self):
2161
    self._ExpandAndLockInstance()
2162
    self.needed_locks[locking.LEVEL_NODE] = []
2163
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2164

    
2165
  def DeclareLocks(self, level):
2166
    if level == locking.LEVEL_NODE:
2167
      self._LockInstancesNodes()
2168

    
2169
  def BuildHooksEnv(self):
2170
    """Build hooks env.
2171

2172
    This runs on master, primary and secondary nodes of the instance.
2173

2174
    """
2175
    env = _BuildInstanceHookEnvByObject(self.instance)
2176
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2177
          list(self.instance.secondary_nodes))
2178
    return env, nl, nl
2179

    
2180
  def CheckPrereq(self):
2181
    """Check prerequisites.
2182

2183
    This checks that the instance is in the cluster.
2184

2185
    """
2186
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2187
    assert self.instance is not None, \
2188
      "Cannot retrieve locked instance %s" % self.op.instance_name
2189

    
2190
  def Exec(self, feedback_fn):
2191
    """Shutdown the instance.
2192

2193
    """
2194
    instance = self.instance
2195
    node_current = instance.primary_node
2196
    self.cfg.MarkInstanceDown(instance.name)
2197
    if not rpc.call_instance_shutdown(node_current, instance):
2198
      logger.Error("could not shutdown instance")
2199

    
2200
    _ShutdownInstanceDisks(instance, self.cfg)
2201

    
2202

    
2203
class LUReinstallInstance(LogicalUnit):
2204
  """Reinstall an instance.
2205

2206
  """
2207
  HPATH = "instance-reinstall"
2208
  HTYPE = constants.HTYPE_INSTANCE
2209
  _OP_REQP = ["instance_name"]
2210
  REQ_BGL = False
2211

    
2212
  def ExpandNames(self):
2213
    self._ExpandAndLockInstance()
2214
    self.needed_locks[locking.LEVEL_NODE] = []
2215
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2216

    
2217
  def DeclareLocks(self, level):
2218
    if level == locking.LEVEL_NODE:
2219
      self._LockInstancesNodes()
2220

    
2221
  def BuildHooksEnv(self):
2222
    """Build hooks env.
2223

2224
    This runs on master, primary and secondary nodes of the instance.
2225

2226
    """
2227
    env = _BuildInstanceHookEnvByObject(self.instance)
2228
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2229
          list(self.instance.secondary_nodes))
2230
    return env, nl, nl
2231

    
2232
  def CheckPrereq(self):
2233
    """Check prerequisites.
2234

2235
    This checks that the instance is in the cluster and is not running.
2236

2237
    """
2238
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2239
    assert instance is not None, \
2240
      "Cannot retrieve locked instance %s" % self.op.instance_name
2241

    
2242
    if instance.disk_template == constants.DT_DISKLESS:
2243
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2244
                                 self.op.instance_name)
2245
    if instance.status != "down":
2246
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2247
                                 self.op.instance_name)
2248
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2249
    if remote_info:
2250
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2251
                                 (self.op.instance_name,
2252
                                  instance.primary_node))
2253

    
2254
    self.op.os_type = getattr(self.op, "os_type", None)
2255
    if self.op.os_type is not None:
2256
      # OS verification
2257
      pnode = self.cfg.GetNodeInfo(
2258
        self.cfg.ExpandNodeName(instance.primary_node))
2259
      if pnode is None:
2260
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2261
                                   self.op.pnode)
2262
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2263
      if not os_obj:
2264
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2265
                                   " primary node"  % self.op.os_type)
2266

    
2267
    self.instance = instance
2268

    
2269
  def Exec(self, feedback_fn):
2270
    """Reinstall the instance.
2271

2272
    """
2273
    inst = self.instance
2274

    
2275
    if self.op.os_type is not None:
2276
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2277
      inst.os = self.op.os_type
2278
      self.cfg.AddInstance(inst)
2279

    
2280
    _StartInstanceDisks(self.cfg, inst, None)
2281
    try:
2282
      feedback_fn("Running the instance OS create scripts...")
2283
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2284
        raise errors.OpExecError("Could not install OS for instance %s"
2285
                                 " on node %s" %
2286
                                 (inst.name, inst.primary_node))
2287
    finally:
2288
      _ShutdownInstanceDisks(inst, self.cfg)
2289

    
2290

    
2291
class LURenameInstance(LogicalUnit):
2292
  """Rename an instance.
2293

2294
  """
2295
  HPATH = "instance-rename"
2296
  HTYPE = constants.HTYPE_INSTANCE
2297
  _OP_REQP = ["instance_name", "new_name"]
2298

    
2299
  def BuildHooksEnv(self):
2300
    """Build hooks env.
2301

2302
    This runs on master, primary and secondary nodes of the instance.
2303

2304
    """
2305
    env = _BuildInstanceHookEnvByObject(self.instance)
2306
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2307
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2308
          list(self.instance.secondary_nodes))
2309
    return env, nl, nl
2310

    
2311
  def CheckPrereq(self):
2312
    """Check prerequisites.
2313

2314
    This checks that the instance is in the cluster and is not running.
2315

2316
    """
2317
    instance = self.cfg.GetInstanceInfo(
2318
      self.cfg.ExpandInstanceName(self.op.instance_name))
2319
    if instance is None:
2320
      raise errors.OpPrereqError("Instance '%s' not known" %
2321
                                 self.op.instance_name)
2322
    if instance.status != "down":
2323
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2324
                                 self.op.instance_name)
2325
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2326
    if remote_info:
2327
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2328
                                 (self.op.instance_name,
2329
                                  instance.primary_node))
2330
    self.instance = instance
2331

    
2332
    # new name verification
2333
    name_info = utils.HostInfo(self.op.new_name)
2334

    
2335
    self.op.new_name = new_name = name_info.name
2336
    instance_list = self.cfg.GetInstanceList()
2337
    if new_name in instance_list:
2338
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2339
                                 new_name)
2340

    
2341
    if not getattr(self.op, "ignore_ip", False):
2342
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2343
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2344
                                   (name_info.ip, new_name))
2345

    
2346

    
2347
  def Exec(self, feedback_fn):
2348
    """Reinstall the instance.
2349

2350
    """
2351
    inst = self.instance
2352
    old_name = inst.name
2353

    
2354
    if inst.disk_template == constants.DT_FILE:
2355
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2356

    
2357
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2358
    # Change the instance lock. This is definitely safe while we hold the BGL
2359
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2360
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2361

    
2362
    # re-read the instance from the configuration after rename
2363
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2364

    
2365
    if inst.disk_template == constants.DT_FILE:
2366
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2367
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2368
                                                old_file_storage_dir,
2369
                                                new_file_storage_dir)
2370

    
2371
      if not result:
2372
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2373
                                 " directory '%s' to '%s' (but the instance"
2374
                                 " has been renamed in Ganeti)" % (
2375
                                 inst.primary_node, old_file_storage_dir,
2376
                                 new_file_storage_dir))
2377

    
2378
      if not result[0]:
2379
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2380
                                 " (but the instance has been renamed in"
2381
                                 " Ganeti)" % (old_file_storage_dir,
2382
                                               new_file_storage_dir))
2383

    
2384
    _StartInstanceDisks(self.cfg, inst, None)
2385
    try:
2386
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2387
                                          "sda", "sdb"):
2388
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2389
               " instance has been renamed in Ganeti)" %
2390
               (inst.name, inst.primary_node))
2391
        logger.Error(msg)
2392
    finally:
2393
      _ShutdownInstanceDisks(inst, self.cfg)
2394

    
2395

    
2396
class LURemoveInstance(LogicalUnit):
2397
  """Remove an instance.
2398

2399
  """
2400
  HPATH = "instance-remove"
2401
  HTYPE = constants.HTYPE_INSTANCE
2402
  _OP_REQP = ["instance_name", "ignore_failures"]
2403

    
2404
  def BuildHooksEnv(self):
2405
    """Build hooks env.
2406

2407
    This runs on master, primary and secondary nodes of the instance.
2408

2409
    """
2410
    env = _BuildInstanceHookEnvByObject(self.instance)
2411
    nl = [self.sstore.GetMasterNode()]
2412
    return env, nl, nl
2413

    
2414
  def CheckPrereq(self):
2415
    """Check prerequisites.
2416

2417
    This checks that the instance is in the cluster.
2418

2419
    """
2420
    instance = self.cfg.GetInstanceInfo(
2421
      self.cfg.ExpandInstanceName(self.op.instance_name))
2422
    if instance is None:
2423
      raise errors.OpPrereqError("Instance '%s' not known" %
2424
                                 self.op.instance_name)
2425
    self.instance = instance
2426

    
2427
  def Exec(self, feedback_fn):
2428
    """Remove the instance.
2429

2430
    """
2431
    instance = self.instance
2432
    logger.Info("shutting down instance %s on node %s" %
2433
                (instance.name, instance.primary_node))
2434

    
2435
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2436
      if self.op.ignore_failures:
2437
        feedback_fn("Warning: can't shutdown instance")
2438
      else:
2439
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2440
                                 (instance.name, instance.primary_node))
2441

    
2442
    logger.Info("removing block devices for instance %s" % instance.name)
2443

    
2444
    if not _RemoveDisks(instance, self.cfg):
2445
      if self.op.ignore_failures:
2446
        feedback_fn("Warning: can't remove instance's disks")
2447
      else:
2448
        raise errors.OpExecError("Can't remove instance's disks")
2449

    
2450
    logger.Info("removing instance %s out of cluster config" % instance.name)
2451

    
2452
    self.cfg.RemoveInstance(instance.name)
2453
    # Remove the new instance from the Ganeti Lock Manager
2454
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2455

    
2456

    
2457
class LUQueryInstances(NoHooksLU):
2458
  """Logical unit for querying instances.
2459

2460
  """
2461
  _OP_REQP = ["output_fields", "names"]
2462

    
2463
  def CheckPrereq(self):
2464
    """Check prerequisites.
2465

2466
    This checks that the fields required are valid output fields.
2467

2468
    """
2469
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2470
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2471
                               "admin_state", "admin_ram",
2472
                               "disk_template", "ip", "mac", "bridge",
2473
                               "sda_size", "sdb_size", "vcpus", "tags"],
2474
                       dynamic=self.dynamic_fields,
2475
                       selected=self.op.output_fields)
2476

    
2477
    self.wanted = _GetWantedInstances(self, self.op.names)
2478

    
2479
  def Exec(self, feedback_fn):
2480
    """Computes the list of nodes and their attributes.
2481

2482
    """
2483
    instance_names = self.wanted
2484
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2485
                     in instance_names]
2486

    
2487
    # begin data gathering
2488

    
2489
    nodes = frozenset([inst.primary_node for inst in instance_list])
2490

    
2491
    bad_nodes = []
2492
    if self.dynamic_fields.intersection(self.op.output_fields):
2493
      live_data = {}
2494
      node_data = rpc.call_all_instances_info(nodes)
2495
      for name in nodes:
2496
        result = node_data[name]
2497
        if result:
2498
          live_data.update(result)
2499
        elif result == False:
2500
          bad_nodes.append(name)
2501
        # else no instance is alive
2502
    else:
2503
      live_data = dict([(name, {}) for name in instance_names])
2504

    
2505
    # end data gathering
2506

    
2507
    output = []
2508
    for instance in instance_list:
2509
      iout = []
2510
      for field in self.op.output_fields:
2511
        if field == "name":
2512
          val = instance.name
2513
        elif field == "os":
2514
          val = instance.os
2515
        elif field == "pnode":
2516
          val = instance.primary_node
2517
        elif field == "snodes":
2518
          val = list(instance.secondary_nodes)
2519
        elif field == "admin_state":
2520
          val = (instance.status != "down")
2521
        elif field == "oper_state":
2522
          if instance.primary_node in bad_nodes:
2523
            val = None
2524
          else:
2525
            val = bool(live_data.get(instance.name))
2526
        elif field == "status":
2527
          if instance.primary_node in bad_nodes:
2528
            val = "ERROR_nodedown"
2529
          else:
2530
            running = bool(live_data.get(instance.name))
2531
            if running:
2532
              if instance.status != "down":
2533
                val = "running"
2534
              else:
2535
                val = "ERROR_up"
2536
            else:
2537
              if instance.status != "down":
2538
                val = "ERROR_down"
2539
              else:
2540
                val = "ADMIN_down"
2541
        elif field == "admin_ram":
2542
          val = instance.memory
2543
        elif field == "oper_ram":
2544
          if instance.primary_node in bad_nodes:
2545
            val = None
2546
          elif instance.name in live_data:
2547
            val = live_data[instance.name].get("memory", "?")
2548
          else:
2549
            val = "-"
2550
        elif field == "disk_template":
2551
          val = instance.disk_template
2552
        elif field == "ip":
2553
          val = instance.nics[0].ip
2554
        elif field == "bridge":
2555
          val = instance.nics[0].bridge
2556
        elif field == "mac":
2557
          val = instance.nics[0].mac
2558
        elif field == "sda_size" or field == "sdb_size":
2559
          disk = instance.FindDisk(field[:3])
2560
          if disk is None:
2561
            val = None
2562
          else:
2563
            val = disk.size
2564
        elif field == "vcpus":
2565
          val = instance.vcpus
2566
        elif field == "tags":
2567
          val = list(instance.GetTags())
2568
        else:
2569
          raise errors.ParameterError(field)
2570
        iout.append(val)
2571
      output.append(iout)
2572

    
2573
    return output
2574

    
2575

    
2576
class LUFailoverInstance(LogicalUnit):
2577
  """Failover an instance.
2578

2579
  """
2580
  HPATH = "instance-failover"
2581
  HTYPE = constants.HTYPE_INSTANCE
2582
  _OP_REQP = ["instance_name", "ignore_consistency"]
2583

    
2584
  def BuildHooksEnv(self):
2585
    """Build hooks env.
2586

2587
    This runs on master, primary and secondary nodes of the instance.
2588

2589
    """
2590
    env = {
2591
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2592
      }
2593
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2594
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2595
    return env, nl, nl
2596

    
2597
  def CheckPrereq(self):
2598
    """Check prerequisites.
2599

2600
    This checks that the instance is in the cluster.
2601

2602
    """
2603
    instance = self.cfg.GetInstanceInfo(
2604
      self.cfg.ExpandInstanceName(self.op.instance_name))
2605
    if instance is None:
2606
      raise errors.OpPrereqError("Instance '%s' not known" %
2607
                                 self.op.instance_name)
2608

    
2609
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2610
      raise errors.OpPrereqError("Instance's disk layout is not"
2611
                                 " network mirrored, cannot failover.")
2612

    
2613
    secondary_nodes = instance.secondary_nodes
2614
    if not secondary_nodes:
2615
      raise errors.ProgrammerError("no secondary node but using "
2616
                                   "a mirrored disk template")
2617

    
2618
    target_node = secondary_nodes[0]
2619
    # check memory requirements on the secondary node
2620
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2621
                         instance.name, instance.memory)
2622

    
2623
    # check bridge existance
2624
    brlist = [nic.bridge for nic in instance.nics]
2625
    if not rpc.call_bridges_exist(target_node, brlist):
2626
      raise errors.OpPrereqError("One or more target bridges %s does not"
2627
                                 " exist on destination node '%s'" %
2628
                                 (brlist, target_node))
2629

    
2630
    self.instance = instance
2631

    
2632
  def Exec(self, feedback_fn):
2633
    """Failover an instance.
2634

2635
    The failover is done by shutting it down on its present node and
2636
    starting it on the secondary.
2637

2638
    """
2639
    instance = self.instance
2640

    
2641
    source_node = instance.primary_node
2642
    target_node = instance.secondary_nodes[0]
2643

    
2644
    feedback_fn("* checking disk consistency between source and target")
2645
    for dev in instance.disks:
2646
      # for drbd, these are drbd over lvm
2647
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2648
        if instance.status == "up" and not self.op.ignore_consistency:
2649
          raise errors.OpExecError("Disk %s is degraded on target node,"
2650
                                   " aborting failover." % dev.iv_name)
2651

    
2652
    feedback_fn("* shutting down instance on source node")
2653
    logger.Info("Shutting down instance %s on node %s" %
2654
                (instance.name, source_node))
2655

    
2656
    if not rpc.call_instance_shutdown(source_node, instance):
2657
      if self.op.ignore_consistency:
2658
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2659
                     " anyway. Please make sure node %s is down"  %
2660
                     (instance.name, source_node, source_node))
2661
      else:
2662
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2663
                                 (instance.name, source_node))
2664

    
2665
    feedback_fn("* deactivating the instance's disks on source node")
2666
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2667
      raise errors.OpExecError("Can't shut down the instance's disks.")
2668

    
2669
    instance.primary_node = target_node
2670
    # distribute new instance config to the other nodes
2671
    self.cfg.Update(instance)
2672

    
2673
    # Only start the instance if it's marked as up
2674
    if instance.status == "up":
2675
      feedback_fn("* activating the instance's disks on target node")
2676
      logger.Info("Starting instance %s on node %s" %
2677
                  (instance.name, target_node))
2678

    
2679
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2680
                                               ignore_secondaries=True)
2681
      if not disks_ok:
2682
        _ShutdownInstanceDisks(instance, self.cfg)
2683
        raise errors.OpExecError("Can't activate the instance's disks")
2684

    
2685
      feedback_fn("* starting the instance on the target node")
2686
      if not rpc.call_instance_start(target_node, instance, None):
2687
        _ShutdownInstanceDisks(instance, self.cfg)
2688
        raise errors.OpExecError("Could not start instance %s on node %s." %
2689
                                 (instance.name, target_node))
2690

    
2691

    
2692
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2693
  """Create a tree of block devices on the primary node.
2694

2695
  This always creates all devices.
2696

2697
  """
2698
  if device.children:
2699
    for child in device.children:
2700
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2701
        return False
2702

    
2703
  cfg.SetDiskID(device, node)
2704
  new_id = rpc.call_blockdev_create(node, device, device.size,
2705
                                    instance.name, True, info)
2706
  if not new_id:
2707
    return False
2708
  if device.physical_id is None:
2709
    device.physical_id = new_id
2710
  return True
2711

    
2712

    
2713
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2714
  """Create a tree of block devices on a secondary node.
2715

2716
  If this device type has to be created on secondaries, create it and
2717
  all its children.
2718

2719
  If not, just recurse to children keeping the same 'force' value.
2720

2721
  """
2722
  if device.CreateOnSecondary():
2723
    force = True
2724
  if device.children:
2725
    for child in device.children:
2726
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2727
                                        child, force, info):
2728
        return False
2729

    
2730
  if not force:
2731
    return True
2732
  cfg.SetDiskID(device, node)
2733
  new_id = rpc.call_blockdev_create(node, device, device.size,
2734
                                    instance.name, False, info)
2735
  if not new_id:
2736
    return False
2737
  if device.physical_id is None:
2738
    device.physical_id = new_id
2739
  return True
2740

    
2741

    
2742
def _GenerateUniqueNames(cfg, exts):
2743
  """Generate a suitable LV name.
2744

2745
  This will generate a logical volume name for the given instance.
2746

2747
  """
2748
  results = []
2749
  for val in exts:
2750
    new_id = cfg.GenerateUniqueID()
2751
    results.append("%s%s" % (new_id, val))
2752
  return results
2753

    
2754

    
2755
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2756
  """Generate a drbd8 device complete with its children.
2757

2758
  """
2759
  port = cfg.AllocatePort()
2760
  vgname = cfg.GetVGName()
2761
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2762
                          logical_id=(vgname, names[0]))
2763
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2764
                          logical_id=(vgname, names[1]))
2765
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2766
                          logical_id = (primary, secondary, port),
2767
                          children = [dev_data, dev_meta],
2768
                          iv_name=iv_name)
2769
  return drbd_dev
2770

    
2771

    
2772
def _GenerateDiskTemplate(cfg, template_name,
2773
                          instance_name, primary_node,
2774
                          secondary_nodes, disk_sz, swap_sz,
2775
                          file_storage_dir, file_driver):
2776
  """Generate the entire disk layout for a given template type.
2777

2778
  """
2779
  #TODO: compute space requirements
2780

    
2781
  vgname = cfg.GetVGName()
2782
  if template_name == constants.DT_DISKLESS:
2783
    disks = []
2784
  elif template_name == constants.DT_PLAIN:
2785
    if len(secondary_nodes) != 0:
2786
      raise errors.ProgrammerError("Wrong template configuration")
2787

    
2788
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2789
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2790
                           logical_id=(vgname, names[0]),
2791
                           iv_name = "sda")
2792
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2793
                           logical_id=(vgname, names[1]),
2794
                           iv_name = "sdb")
2795
    disks = [sda_dev, sdb_dev]
2796
  elif template_name == constants.DT_DRBD8:
2797
    if len(secondary_nodes) != 1:
2798
      raise errors.ProgrammerError("Wrong template configuration")
2799
    remote_node = secondary_nodes[0]
2800
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2801
                                       ".sdb_data", ".sdb_meta"])
2802
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2803
                                         disk_sz, names[0:2], "sda")
2804
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2805
                                         swap_sz, names[2:4], "sdb")
2806
    disks = [drbd_sda_dev, drbd_sdb_dev]
2807
  elif template_name == constants.DT_FILE:
2808
    if len(secondary_nodes) != 0:
2809
      raise errors.ProgrammerError("Wrong template configuration")
2810

    
2811
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2812
                                iv_name="sda", logical_id=(file_driver,
2813
                                "%s/sda" % file_storage_dir))
2814
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2815
                                iv_name="sdb", logical_id=(file_driver,
2816
                                "%s/sdb" % file_storage_dir))
2817
    disks = [file_sda_dev, file_sdb_dev]
2818
  else:
2819
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2820
  return disks
2821

    
2822

    
2823
def _GetInstanceInfoText(instance):
2824
  """Compute that text that should be added to the disk's metadata.
2825

2826
  """
2827
  return "originstname+%s" % instance.name
2828

    
2829

    
2830
def _CreateDisks(cfg, instance):
2831
  """Create all disks for an instance.
2832

2833
  This abstracts away some work from AddInstance.
2834

2835
  Args:
2836
    instance: the instance object
2837

2838
  Returns:
2839
    True or False showing the success of the creation process
2840

2841
  """
2842
  info = _GetInstanceInfoText(instance)
2843

    
2844
  if instance.disk_template == constants.DT_FILE:
2845
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2846
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2847
                                              file_storage_dir)
2848

    
2849
    if not result:
2850
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2851
      return False
2852

    
2853
    if not result[0]:
2854
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2855
      return False
2856

    
2857
  for device in instance.disks:
2858
    logger.Info("creating volume %s for instance %s" %
2859
                (device.iv_name, instance.name))
2860
    #HARDCODE
2861
    for secondary_node in instance.secondary_nodes:
2862
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2863
                                        device, False, info):
2864
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2865
                     (device.iv_name, device, secondary_node))
2866
        return False
2867
    #HARDCODE
2868
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2869
                                    instance, device, info):
2870
      logger.Error("failed to create volume %s on primary!" %
2871
                   device.iv_name)
2872
      return False
2873

    
2874
  return True
2875

    
2876

    
2877
def _RemoveDisks(instance, cfg):
2878
  """Remove all disks for an instance.
2879

2880
  This abstracts away some work from `AddInstance()` and
2881
  `RemoveInstance()`. Note that in case some of the devices couldn't
2882
  be removed, the removal will continue with the other ones (compare
2883
  with `_CreateDisks()`).
2884

2885
  Args:
2886
    instance: the instance object
2887

2888
  Returns:
2889
    True or False showing the success of the removal proces
2890

2891
  """
2892
  logger.Info("removing block devices for instance %s" % instance.name)
2893

    
2894
  result = True
2895
  for device in instance.disks:
2896
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2897
      cfg.SetDiskID(disk, node)
2898
      if not rpc.call_blockdev_remove(node, disk):
2899
        logger.Error("could not remove block device %s on node %s,"
2900
                     " continuing anyway" %
2901
                     (device.iv_name, node))
2902
        result = False
2903

    
2904
  if instance.disk_template == constants.DT_FILE:
2905
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2906
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2907
                                            file_storage_dir):
2908
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2909
      result = False
2910

    
2911
  return result
2912

    
2913

    
2914
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2915
  """Compute disk size requirements in the volume group
2916

2917
  This is currently hard-coded for the two-drive layout.
2918

2919
  """
2920
  # Required free disk space as a function of disk and swap space
2921
  req_size_dict = {
2922
    constants.DT_DISKLESS: None,
2923
    constants.DT_PLAIN: disk_size + swap_size,
2924
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2925
    constants.DT_DRBD8: disk_size + swap_size + 256,
2926
    constants.DT_FILE: None,
2927
  }
2928

    
2929
  if disk_template not in req_size_dict:
2930
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2931
                                 " is unknown" %  disk_template)
2932

    
2933
  return req_size_dict[disk_template]
2934

    
2935

    
2936
class LUCreateInstance(LogicalUnit):
2937
  """Create an instance.
2938

2939
  """
2940
  HPATH = "instance-add"
2941
  HTYPE = constants.HTYPE_INSTANCE
2942
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2943
              "disk_template", "swap_size", "mode", "start", "vcpus",
2944
              "wait_for_sync", "ip_check", "mac"]
2945

    
2946
  def _RunAllocator(self):
2947
    """Run the allocator based on input opcode.
2948

2949
    """
2950
    disks = [{"size": self.op.disk_size, "mode": "w"},
2951
             {"size": self.op.swap_size, "mode": "w"}]
2952
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2953
             "bridge": self.op.bridge}]
2954
    ial = IAllocator(self.cfg, self.sstore,
2955
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2956
                     name=self.op.instance_name,
2957
                     disk_template=self.op.disk_template,
2958
                     tags=[],
2959
                     os=self.op.os_type,
2960
                     vcpus=self.op.vcpus,
2961
                     mem_size=self.op.mem_size,
2962
                     disks=disks,
2963
                     nics=nics,
2964
                     )
2965

    
2966
    ial.Run(self.op.iallocator)
2967

    
2968
    if not ial.success:
2969
      raise errors.OpPrereqError("Can't compute nodes using"
2970
                                 " iallocator '%s': %s" % (self.op.iallocator,
2971
                                                           ial.info))
2972
    if len(ial.nodes) != ial.required_nodes:
2973
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2974
                                 " of nodes (%s), required %s" %
2975
                                 (len(ial.nodes), ial.required_nodes))
2976
    self.op.pnode = ial.nodes[0]
2977
    logger.ToStdout("Selected nodes for the instance: %s" %
2978
                    (", ".join(ial.nodes),))
2979
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2980
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2981
    if ial.required_nodes == 2:
2982
      self.op.snode = ial.nodes[1]
2983

    
2984
  def BuildHooksEnv(self):
2985
    """Build hooks env.
2986

2987
    This runs on master, primary and secondary nodes of the instance.
2988

2989
    """
2990
    env = {
2991
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2992
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2993
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2994
      "INSTANCE_ADD_MODE": self.op.mode,
2995
      }
2996
    if self.op.mode == constants.INSTANCE_IMPORT:
2997
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2998
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2999
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3000

    
3001
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3002
      primary_node=self.op.pnode,
3003
      secondary_nodes=self.secondaries,
3004
      status=self.instance_status,
3005
      os_type=self.op.os_type,
3006
      memory=self.op.mem_size,
3007
      vcpus=self.op.vcpus,
3008
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3009
    ))
3010

    
3011
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3012
          self.secondaries)
3013
    return env, nl, nl
3014

    
3015

    
3016
  def CheckPrereq(self):
3017
    """Check prerequisites.
3018

3019
    """
3020
    # set optional parameters to none if they don't exist
3021
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3022
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3023
                 "vnc_bind_address"]:
3024
      if not hasattr(self.op, attr):
3025
        setattr(self.op, attr, None)
3026

    
3027
    if self.op.mode not in (constants.INSTANCE_CREATE,
3028
                            constants.INSTANCE_IMPORT):
3029
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3030
                                 self.op.mode)
3031

    
3032
    if (not self.cfg.GetVGName() and
3033
        self.op.disk_template not in constants.DTS_NOT_LVM):
3034
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3035
                                 " instances")
3036

    
3037
    if self.op.mode == constants.INSTANCE_IMPORT:
3038
      src_node = getattr(self.op, "src_node", None)
3039
      src_path = getattr(self.op, "src_path", None)
3040
      if src_node is None or src_path is None:
3041
        raise errors.OpPrereqError("Importing an instance requires source"
3042
                                   " node and path options")
3043
      src_node_full = self.cfg.ExpandNodeName(src_node)
3044
      if src_node_full is None:
3045
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3046
      self.op.src_node = src_node = src_node_full
3047

    
3048
      if not os.path.isabs(src_path):
3049
        raise errors.OpPrereqError("The source path must be absolute")
3050

    
3051
      export_info = rpc.call_export_info(src_node, src_path)
3052

    
3053
      if not export_info:
3054
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3055

    
3056
      if not export_info.has_section(constants.INISECT_EXP):
3057
        raise errors.ProgrammerError("Corrupted export config")
3058

    
3059
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3060
      if (int(ei_version) != constants.EXPORT_VERSION):
3061
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3062
                                   (ei_version, constants.EXPORT_VERSION))
3063

    
3064
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3065
        raise errors.OpPrereqError("Can't import instance with more than"
3066
                                   " one data disk")
3067

    
3068
      # FIXME: are the old os-es, disk sizes, etc. useful?
3069
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3070
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3071
                                                         'disk0_dump'))
3072
      self.src_image = diskimage
3073
    else: # INSTANCE_CREATE
3074
      if getattr(self.op, "os_type", None) is None:
3075
        raise errors.OpPrereqError("No guest OS specified")
3076

    
3077
    #### instance parameters check
3078

    
3079
    # disk template and mirror node verification
3080
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3081
      raise errors.OpPrereqError("Invalid disk template name")
3082

    
3083
    # instance name verification
3084
    hostname1 = utils.HostInfo(self.op.instance_name)
3085

    
3086
    self.op.instance_name = instance_name = hostname1.name
3087
    instance_list = self.cfg.GetInstanceList()
3088
    if instance_name in instance_list:
3089
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3090
                                 instance_name)
3091

    
3092
    # ip validity checks
3093
    ip = getattr(self.op, "ip", None)
3094
    if ip is None or ip.lower() == "none":
3095
      inst_ip = None
3096
    elif ip.lower() == "auto":
3097
      inst_ip = hostname1.ip
3098
    else:
3099
      if not utils.IsValidIP(ip):
3100
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3101
                                   " like a valid IP" % ip)
3102
      inst_ip = ip
3103
    self.inst_ip = self.op.ip = inst_ip
3104

    
3105
    if self.op.start and not self.op.ip_check:
3106
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3107
                                 " adding an instance in start mode")
3108

    
3109
    if self.op.ip_check:
3110
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3111
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3112
                                   (hostname1.ip, instance_name))
3113

    
3114
    # MAC address verification
3115
    if self.op.mac != "auto":
3116
      if not utils.IsValidMac(self.op.mac.lower()):
3117
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3118
                                   self.op.mac)
3119

    
3120
    # bridge verification
3121
    bridge = getattr(self.op, "bridge", None)
3122
    if bridge is None:
3123
      self.op.bridge = self.cfg.GetDefBridge()
3124
    else:
3125
      self.op.bridge = bridge
3126

    
3127
    # boot order verification
3128
    if self.op.hvm_boot_order is not None:
3129
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3130
        raise errors.OpPrereqError("invalid boot order specified,"
3131
                                   " must be one or more of [acdn]")
3132
    # file storage checks
3133
    if (self.op.file_driver and
3134
        not self.op.file_driver in constants.FILE_DRIVER):
3135
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3136
                                 self.op.file_driver)
3137

    
3138
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3139
      raise errors.OpPrereqError("File storage directory not a relative"
3140
                                 " path")
3141
    #### allocator run
3142

    
3143
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3144
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3145
                                 " node must be given")
3146

    
3147
    if self.op.iallocator is not None:
3148
      self._RunAllocator()
3149

    
3150
    #### node related checks
3151

    
3152
    # check primary node
3153
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3154
    if pnode is None:
3155
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3156
                                 self.op.pnode)
3157
    self.op.pnode = pnode.name
3158
    self.pnode = pnode
3159
    self.secondaries = []
3160

    
3161
    # mirror node verification
3162
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3163
      if getattr(self.op, "snode", None) is None:
3164
        raise errors.OpPrereqError("The networked disk templates need"
3165
                                   " a mirror node")
3166

    
3167
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3168
      if snode_name is None:
3169
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3170
                                   self.op.snode)
3171
      elif snode_name == pnode.name:
3172
        raise errors.OpPrereqError("The secondary node cannot be"
3173
                                   " the primary node.")
3174
      self.secondaries.append(snode_name)
3175

    
3176
    req_size = _ComputeDiskSize(self.op.disk_template,
3177
                                self.op.disk_size, self.op.swap_size)
3178

    
3179
    # Check lv size requirements
3180
    if req_size is not None:
3181
      nodenames = [pnode.name] + self.secondaries
3182
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3183
      for node in nodenames:
3184
        info = nodeinfo.get(node, None)
3185
        if not info:
3186
          raise errors.OpPrereqError("Cannot get current information"
3187
                                     " from node '%s'" % node)
3188
        vg_free = info.get('vg_free', None)
3189
        if not isinstance(vg_free, int):
3190
          raise errors.OpPrereqError("Can't compute free disk space on"
3191
                                     " node %s" % node)
3192
        if req_size > info['vg_free']:
3193
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3194
                                     " %d MB available, %d MB required" %
3195
                                     (node, info['vg_free'], req_size))
3196

    
3197
    # os verification
3198
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3199
    if not os_obj:
3200
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3201
                                 " primary node"  % self.op.os_type)
3202

    
3203
    if self.op.kernel_path == constants.VALUE_NONE:
3204
      raise errors.OpPrereqError("Can't set instance kernel to none")
3205

    
3206

    
3207
    # bridge check on primary node
3208
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3209
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3210
                                 " destination node '%s'" %
3211
                                 (self.op.bridge, pnode.name))
3212

    
3213
    # memory check on primary node
3214
    if self.op.start:
3215
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3216
                           "creating instance %s" % self.op.instance_name,
3217
                           self.op.mem_size)
3218

    
3219
    # hvm_cdrom_image_path verification
3220
    if self.op.hvm_cdrom_image_path is not None:
3221
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3222
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3223
                                   " be an absolute path or None, not %s" %
3224
                                   self.op.hvm_cdrom_image_path)
3225
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3226
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3227
                                   " regular file or a symlink pointing to"
3228
                                   " an existing regular file, not %s" %
3229
                                   self.op.hvm_cdrom_image_path)
3230

    
3231
    # vnc_bind_address verification
3232
    if self.op.vnc_bind_address is not None:
3233
      if not utils.IsValidIP(self.op.vnc_bind_address):
3234
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3235
                                   " like a valid IP address" %
3236
                                   self.op.vnc_bind_address)
3237

    
3238
    if self.op.start:
3239
      self.instance_status = 'up'
3240
    else:
3241
      self.instance_status = 'down'
3242

    
3243
  def Exec(self, feedback_fn):
3244
    """Create and add the instance to the cluster.
3245

3246
    """
3247
    instance = self.op.instance_name
3248
    pnode_name = self.pnode.name
3249

    
3250
    if self.op.mac == "auto":
3251
      mac_address = self.cfg.GenerateMAC()
3252
    else:
3253
      mac_address = self.op.mac
3254

    
3255
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3256
    if self.inst_ip is not None:
3257
      nic.ip = self.inst_ip
3258

    
3259
    ht_kind = self.sstore.GetHypervisorType()
3260
    if ht_kind in constants.HTS_REQ_PORT:
3261
      network_port = self.cfg.AllocatePort()
3262
    else:
3263
      network_port = None
3264

    
3265
    if self.op.vnc_bind_address is None:
3266
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3267

    
3268
    # this is needed because os.path.join does not accept None arguments
3269
    if self.op.file_storage_dir is None:
3270
      string_file_storage_dir = ""
3271
    else:
3272
      string_file_storage_dir = self.op.file_storage_dir
3273

    
3274
    # build the full file storage dir path
3275
    file_storage_dir = os.path.normpath(os.path.join(
3276
                                        self.sstore.GetFileStorageDir(),
3277
                                        string_file_storage_dir, instance))
3278

    
3279

    
3280
    disks = _GenerateDiskTemplate(self.cfg,
3281
                                  self.op.disk_template,
3282
                                  instance, pnode_name,
3283
                                  self.secondaries, self.op.disk_size,
3284
                                  self.op.swap_size,
3285
                                  file_storage_dir,
3286
                                  self.op.file_driver)
3287

    
3288
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3289
                            primary_node=pnode_name,
3290
                            memory=self.op.mem_size,
3291
                            vcpus=self.op.vcpus,
3292
                            nics=[nic], disks=disks,
3293
                            disk_template=self.op.disk_template,
3294
                            status=self.instance_status,
3295
                            network_port=network_port,
3296
                            kernel_path=self.op.kernel_path,
3297
                            initrd_path=self.op.initrd_path,
3298
                            hvm_boot_order=self.op.hvm_boot_order,
3299
                            hvm_acpi=self.op.hvm_acpi,
3300
                            hvm_pae=self.op.hvm_pae,
3301
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3302
                            vnc_bind_address=self.op.vnc_bind_address,
3303
                            )
3304

    
3305
    feedback_fn("* creating instance disks...")
3306
    if not _CreateDisks(self.cfg, iobj):
3307
      _RemoveDisks(iobj, self.cfg)
3308
      raise errors.OpExecError("Device creation failed, reverting...")
3309

    
3310
    feedback_fn("adding instance %s to cluster config" % instance)
3311

    
3312
    self.cfg.AddInstance(iobj)
3313
    # Add the new instance to the Ganeti Lock Manager
3314
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3315

    
3316
    if self.op.wait_for_sync:
3317
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3318
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3319
      # make sure the disks are not degraded (still sync-ing is ok)
3320
      time.sleep(15)
3321
      feedback_fn("* checking mirrors status")
3322
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3323
    else:
3324
      disk_abort = False
3325

    
3326
    if disk_abort:
3327
      _RemoveDisks(iobj, self.cfg)
3328
      self.cfg.RemoveInstance(iobj.name)
3329
      # Remove the new instance from the Ganeti Lock Manager
3330
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3331
      raise errors.OpExecError("There are some degraded disks for"
3332
                               " this instance")
3333

    
3334
    feedback_fn("creating os for instance %s on node %s" %
3335
                (instance, pnode_name))
3336

    
3337
    if iobj.disk_template != constants.DT_DISKLESS:
3338
      if self.op.mode == constants.INSTANCE_CREATE:
3339
        feedback_fn("* running the instance OS create scripts...")
3340
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3341
          raise errors.OpExecError("could not add os for instance %s"
3342
                                   " on node %s" %
3343
                                   (instance, pnode_name))
3344

    
3345
      elif self.op.mode == constants.INSTANCE_IMPORT:
3346
        feedback_fn("* running the instance OS import scripts...")
3347
        src_node = self.op.src_node
3348
        src_image = self.src_image
3349
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3350
                                                src_node, src_image):
3351
          raise errors.OpExecError("Could not import os for instance"
3352
                                   " %s on node %s" %
3353
                                   (instance, pnode_name))
3354
      else:
3355
        # also checked in the prereq part
3356
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3357
                                     % self.op.mode)
3358

    
3359
    if self.op.start:
3360
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3361
      feedback_fn("* starting instance...")
3362
      if not rpc.call_instance_start(pnode_name, iobj, None):
3363
        raise errors.OpExecError("Could not start instance")
3364

    
3365

    
3366
class LUConnectConsole(NoHooksLU):
3367
  """Connect to an instance's console.
3368

3369
  This is somewhat special in that it returns the command line that
3370
  you need to run on the master node in order to connect to the
3371
  console.
3372

3373
  """
3374
  _OP_REQP = ["instance_name"]
3375
  REQ_BGL = False
3376

    
3377
  def ExpandNames(self):
3378
    self._ExpandAndLockInstance()
3379

    
3380
  def CheckPrereq(self):
3381
    """Check prerequisites.
3382

3383
    This checks that the instance is in the cluster.
3384

3385
    """
3386
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3387
    assert self.instance is not None, \
3388
      "Cannot retrieve locked instance %s" % self.op.instance_name
3389

    
3390
  def Exec(self, feedback_fn):
3391
    """Connect to the console of an instance
3392

3393
    """
3394
    instance = self.instance
3395
    node = instance.primary_node
3396

    
3397
    node_insts = rpc.call_instance_list([node])[node]
3398
    if node_insts is False:
3399
      raise errors.OpExecError("Can't connect to node %s." % node)
3400

    
3401
    if instance.name not in node_insts:
3402
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3403

    
3404
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3405

    
3406
    hyper = hypervisor.GetHypervisor()
3407
    console_cmd = hyper.GetShellCommandForConsole(instance)
3408

    
3409
    # build ssh cmdline
3410
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3411

    
3412

    
3413
class LUReplaceDisks(LogicalUnit):
3414
  """Replace the disks of an instance.
3415

3416
  """
3417
  HPATH = "mirrors-replace"
3418
  HTYPE = constants.HTYPE_INSTANCE
3419
  _OP_REQP = ["instance_name", "mode", "disks"]
3420

    
3421
  def _RunAllocator(self):
3422
    """Compute a new secondary node using an IAllocator.
3423

3424
    """
3425
    ial = IAllocator(self.cfg, self.sstore,
3426
                     mode=constants.IALLOCATOR_MODE_RELOC,
3427
                     name=self.op.instance_name,
3428
                     relocate_from=[self.sec_node])
3429

    
3430
    ial.Run(self.op.iallocator)
3431

    
3432
    if not ial.success:
3433
      raise errors.OpPrereqError("Can't compute nodes using"
3434
                                 " iallocator '%s': %s" % (self.op.iallocator,
3435
                                                           ial.info))
3436
    if len(ial.nodes) != ial.required_nodes:
3437
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3438
                                 " of nodes (%s), required %s" %
3439
                                 (len(ial.nodes), ial.required_nodes))
3440
    self.op.remote_node = ial.nodes[0]
3441
    logger.ToStdout("Selected new secondary for the instance: %s" %
3442
                    self.op.remote_node)
3443

    
3444
  def BuildHooksEnv(self):
3445
    """Build hooks env.
3446

3447
    This runs on the master, the primary and all the secondaries.
3448

3449
    """
3450
    env = {
3451
      "MODE": self.op.mode,
3452
      "NEW_SECONDARY": self.op.remote_node,
3453
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3454
      }
3455
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3456
    nl = [
3457
      self.sstore.GetMasterNode(),
3458
      self.instance.primary_node,
3459
      ]
3460
    if self.op.remote_node is not None:
3461
      nl.append(self.op.remote_node)
3462
    return env, nl, nl
3463

    
3464
  def CheckPrereq(self):
3465
    """Check prerequisites.
3466

3467
    This checks that the instance is in the cluster.
3468

3469
    """
3470
    if not hasattr(self.op, "remote_node"):
3471
      self.op.remote_node = None
3472

    
3473
    instance = self.cfg.GetInstanceInfo(
3474
      self.cfg.ExpandInstanceName(self.op.instance_name))
3475
    if instance is None:
3476
      raise errors.OpPrereqError("Instance '%s' not known" %
3477
                                 self.op.instance_name)
3478
    self.instance = instance
3479
    self.op.instance_name = instance.name
3480

    
3481
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3482
      raise errors.OpPrereqError("Instance's disk layout is not"
3483
                                 " network mirrored.")
3484

    
3485
    if len(instance.secondary_nodes) != 1:
3486
      raise errors.OpPrereqError("The instance has a strange layout,"
3487
                                 " expected one secondary but found %d" %
3488
                                 len(instance.secondary_nodes))
3489

    
3490
    self.sec_node = instance.secondary_nodes[0]
3491

    
3492
    ia_name = getattr(self.op, "iallocator", None)
3493
    if ia_name is not None:
3494
      if self.op.remote_node is not None:
3495
        raise errors.OpPrereqError("Give either the iallocator or the new"
3496
                                   " secondary, not both")
3497
      self.op.remote_node = self._RunAllocator()
3498

    
3499
    remote_node = self.op.remote_node
3500
    if remote_node is not None:
3501
      remote_node = self.cfg.ExpandNodeName(remote_node)
3502
      if remote_node is None:
3503
        raise errors.OpPrereqError("Node '%s' not known" %
3504
                                   self.op.remote_node)
3505
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3506
    else:
3507
      self.remote_node_info = None
3508
    if remote_node == instance.primary_node:
3509
      raise errors.OpPrereqError("The specified node is the primary node of"
3510
                                 " the instance.")
3511
    elif remote_node == self.sec_node:
3512
      if self.op.mode == constants.REPLACE_DISK_SEC:
3513
        # this is for DRBD8, where we can't execute the same mode of
3514
        # replacement as for drbd7 (no different port allocated)
3515
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3516
                                   " replacement")
3517
    if instance.disk_template == constants.DT_DRBD8:
3518
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3519
          remote_node is not None):
3520
        # switch to replace secondary mode
3521
        self.op.mode = constants.REPLACE_DISK_SEC
3522

    
3523
      if self.op.mode == constants.REPLACE_DISK_ALL:
3524
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3525
                                   " secondary disk replacement, not"
3526
                                   " both at once")
3527
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3528
        if remote_node is not None:
3529
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3530
                                     " the secondary while doing a primary"
3531
                                     " node disk replacement")
3532
        self.tgt_node = instance.primary_node
3533
        self.oth_node = instance.secondary_nodes[0]
3534
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3535
        self.new_node = remote_node # this can be None, in which case
3536
                                    # we don't change the secondary
3537
        self.tgt_node = instance.secondary_nodes[0]
3538
        self.oth_node = instance.primary_node
3539
      else:
3540
        raise errors.ProgrammerError("Unhandled disk replace mode")
3541

    
3542
    for name in self.op.disks:
3543
      if instance.FindDisk(name) is None:
3544
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3545
                                   (name, instance.name))
3546
    self.op.remote_node = remote_node
3547

    
3548
  def _ExecD8DiskOnly(self, feedback_fn):
3549
    """Replace a disk on the primary or secondary for dbrd8.
3550

3551
    The algorithm for replace is quite complicated:
3552
      - for each disk to be replaced:
3553
        - create new LVs on the target node with unique names
3554
        - detach old LVs from the drbd device
3555
        - rename old LVs to name_replaced.<time_t>
3556
        - rename new LVs to old LVs
3557
        - attach the new LVs (with the old names now) to the drbd device
3558
      - wait for sync across all devices
3559
      - for each modified disk:
3560
        - remove old LVs (which have the name name_replaces.<time_t>)
3561

3562
    Failures are not very well handled.
3563

3564
    """
3565
    steps_total = 6
3566
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3567
    instance = self.instance
3568
    iv_names = {}
3569
    vgname = self.cfg.GetVGName()
3570
    # start of work
3571
    cfg = self.cfg
3572
    tgt_node = self.tgt_node
3573
    oth_node = self.oth_node
3574

    
3575
    # Step: check device activation
3576
    self.proc.LogStep(1, steps_total, "check device existence")
3577
    info("checking volume groups")
3578
    my_vg = cfg.GetVGName()
3579
    results = rpc.call_vg_list([oth_node, tgt_node])
3580
    if not results:
3581
      raise errors.OpExecError("Can't list volume groups on the nodes")
3582
    for node in oth_node, tgt_node:
3583
      res = results.get(node, False)
3584
      if not res or my_vg not in res:
3585
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3586
                                 (my_vg, node))
3587
    for dev in instance.disks:
3588
      if not dev.iv_name in self.op.disks:
3589
        continue
3590
      for node in tgt_node, oth_node:
3591
        info("checking %s on %s" % (dev.iv_name, node))
3592
        cfg.SetDiskID(dev, node)
3593
        if not rpc.call_blockdev_find(node, dev):
3594
          raise errors.OpExecError("Can't find device %s on node %s" %
3595
                                   (dev.iv_name, node))
3596

    
3597
    # Step: check other node consistency
3598
    self.proc.LogStep(2, steps_total, "check peer consistency")
3599
    for dev in instance.disks:
3600
      if not dev.iv_name in self.op.disks:
3601
        continue
3602
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3603
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3604
                                   oth_node==instance.primary_node):
3605
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3606
                                 " to replace disks on this node (%s)" %
3607
                                 (oth_node, tgt_node))
3608

    
3609
    # Step: create new storage
3610
    self.proc.LogStep(3, steps_total, "allocate new storage")
3611
    for dev in instance.disks:
3612
      if not dev.iv_name in self.op.disks:
3613
        continue
3614
      size = dev.size
3615
      cfg.SetDiskID(dev, tgt_node)
3616
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3617
      names = _GenerateUniqueNames(cfg, lv_names)
3618
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3619
                             logical_id=(vgname, names[0]))
3620
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3621
                             logical_id=(vgname, names[1]))
3622
      new_lvs = [lv_data, lv_meta]
3623
      old_lvs = dev.children
3624
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3625
      info("creating new local storage on %s for %s" %
3626
           (tgt_node, dev.iv_name))
3627
      # since we *always* want to create this LV, we use the
3628
      # _Create...OnPrimary (which forces the creation), even if we
3629
      # are talking about the secondary node
3630
      for new_lv in new_lvs:
3631
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3632
                                        _GetInstanceInfoText(instance)):
3633
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3634
                                   " node '%s'" %
3635
                                   (new_lv.logical_id[1], tgt_node))
3636

    
3637
    # Step: for each lv, detach+rename*2+attach
3638
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3639
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3640
      info("detaching %s drbd from local storage" % dev.iv_name)
3641
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3642
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3643
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3644
      #dev.children = []
3645
      #cfg.Update(instance)
3646

    
3647
      # ok, we created the new LVs, so now we know we have the needed
3648
      # storage; as such, we proceed on the target node to rename
3649
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3650
      # using the assumption that logical_id == physical_id (which in
3651
      # turn is the unique_id on that node)
3652

    
3653
      # FIXME(iustin): use a better name for the replaced LVs
3654
      temp_suffix = int(time.time())
3655
      ren_fn = lambda d, suff: (d.physical_id[0],
3656
                                d.physical_id[1] + "_replaced-%s" % suff)
3657
      # build the rename list based on what LVs exist on the node
3658
      rlist = []
3659
      for to_ren in old_lvs:
3660
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3661
        if find_res is not None: # device exists
3662
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3663

    
3664
      info("renaming the old LVs on the target node")
3665
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3666
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3667
      # now we rename the new LVs to the old LVs
3668
      info("renaming the new LVs on the target node")
3669
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3670
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3671
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3672

    
3673
      for old, new in zip(old_lvs, new_lvs):
3674
        new.logical_id = old.logical_id
3675
        cfg.SetDiskID(new, tgt_node)
3676

    
3677
      for disk in old_lvs:
3678
        disk.logical_id = ren_fn(disk, temp_suffix)
3679
        cfg.SetDiskID(disk, tgt_node)
3680

    
3681
      # now that the new lvs have the old name, we can add them to the device
3682
      info("adding new mirror component on %s" % tgt_node)
3683
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3684
        for new_lv in new_lvs:
3685
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3686
            warning("Can't rollback device %s", hint="manually cleanup unused"
3687
                    " logical volumes")
3688
        raise errors.OpExecError("Can't add local storage to drbd")
3689

    
3690
      dev.children = new_lvs
3691
      cfg.Update(instance)
3692

    
3693
    # Step: wait for sync
3694

    
3695
    # this can fail as the old devices are degraded and _WaitForSync
3696
    # does a combined result over all disks, so we don't check its
3697
    # return value
3698
    self.proc.LogStep(5, steps_total, "sync devices")
3699
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3700

    
3701
    # so check manually all the devices
3702
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3703
      cfg.SetDiskID(dev, instance.primary_node)
3704
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3705
      if is_degr:
3706
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3707

    
3708
    # Step: remove old storage
3709
    self.proc.LogStep(6, steps_total, "removing old storage")
3710
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3711
      info("remove logical volumes for %s" % name)
3712
      for lv in old_lvs:
3713
        cfg.SetDiskID(lv, tgt_node)
3714
        if not rpc.call_blockdev_remove(tgt_node, lv):
3715
          warning("Can't remove old LV", hint="manually remove unused LVs")
3716
          continue
3717

    
3718
  def _ExecD8Secondary(self, feedback_fn):
3719
    """Replace the secondary node for drbd8.
3720

3721
    The algorithm for replace is quite complicated:
3722
      - for all disks of the instance:
3723
        - create new LVs on the new node with same names
3724
        - shutdown the drbd device on the old secondary
3725
        - disconnect the drbd network on the primary
3726
        - create the drbd device on the new secondary
3727
        - network attach the drbd on the primary, using an artifice:
3728
          the drbd code for Attach() will connect to the network if it
3729
          finds a device which is connected to the good local disks but
3730
          not network enabled
3731
      - wait for sync across all devices
3732
      - remove all disks from the old secondary
3733

3734
    Failures are not very well handled.
3735

3736
    """
3737
    steps_total = 6
3738
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3739
    instance = self.instance
3740
    iv_names = {}
3741
    vgname = self.cfg.GetVGName()
3742
    # start of work
3743
    cfg = self.cfg
3744
    old_node = self.tgt_node
3745
    new_node = self.new_node
3746
    pri_node = instance.primary_node
3747

    
3748
    # Step: check device activation
3749
    self.proc.LogStep(1, steps_total, "check device existence")
3750
    info("checking volume groups")
3751
    my_vg = cfg.GetVGName()
3752
    results = rpc.call_vg_list([pri_node, new_node])
3753
    if not results:
3754
      raise errors.OpExecError("Can't list volume groups on the nodes")
3755
    for node in pri_node, new_node:
3756
      res = results.get(node, False)
3757
      if not res or my_vg not in res:
3758
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3759
                                 (my_vg, node))
3760
    for dev in instance.disks:
3761
      if not dev.iv_name in self.op.disks:
3762
        continue
3763
      info("checking %s on %s" % (dev.iv_name, pri_node))
3764
      cfg.SetDiskID(dev, pri_node)
3765
      if not rpc.call_blockdev_find(pri_node, dev):
3766
        raise errors.OpExecError("Can't find device %s on node %s" %
3767
                                 (dev.iv_name, pri_node))
3768

    
3769
    # Step: check other node consistency
3770
    self.proc.LogStep(2, steps_total, "check peer consistency")
3771
    for dev in instance.disks:
3772
      if not dev.iv_name in self.op.disks:
3773
        continue
3774
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3775
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3776
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3777
                                 " unsafe to replace the secondary" %
3778
                                 pri_node)
3779

    
3780
    # Step: create new storage
3781
    self.proc.LogStep(3, steps_total, "allocate new storage")
3782
    for dev in instance.disks:
3783
      size = dev.size
3784
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3785
      # since we *always* want to create this LV, we use the
3786
      # _Create...OnPrimary (which forces the creation), even if we
3787
      # are talking about the secondary node
3788
      for new_lv in dev.children:
3789
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3790
                                        _GetInstanceInfoText(instance)):
3791
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3792
                                   " node '%s'" %
3793
                                   (new_lv.logical_id[1], new_node))
3794

    
3795
      iv_names[dev.iv_name] = (dev, dev.children)
3796

    
3797
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3798
    for dev in instance.disks:
3799
      size = dev.size
3800
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3801
      # create new devices on new_node
3802
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3803
                              logical_id=(pri_node, new_node,
3804
                                          dev.logical_id[2]),
3805
                              children=dev.children)
3806
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3807
                                        new_drbd, False,
3808
                                      _GetInstanceInfoText(instance)):
3809
        raise errors.OpExecError("Failed to create new DRBD on"
3810
                                 " node '%s'" % new_node)
3811

    
3812
    for dev in instance.disks:
3813
      # we have new devices, shutdown the drbd on the old secondary
3814
      info("shutting down drbd for %s on old node" % dev.iv_name)
3815
      cfg.SetDiskID(dev, old_node)
3816
      if not rpc.call_blockdev_shutdown(old_node, dev):
3817
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3818
                hint="Please cleanup this device manually as soon as possible")
3819

    
3820
    info("detaching primary drbds from the network (=> standalone)")
3821
    done = 0
3822
    for dev in instance.disks:
3823
      cfg.SetDiskID(dev, pri_node)
3824
      # set the physical (unique in bdev terms) id to None, meaning
3825
      # detach from network
3826
      dev.physical_id = (None,) * len(dev.physical_id)
3827
      # and 'find' the device, which will 'fix' it to match the
3828
      # standalone state
3829
      if rpc.call_blockdev_find(pri_node, dev):
3830
        done += 1
3831
      else:
3832
        warning("Failed to detach drbd %s from network, unusual case" %
3833
                dev.iv_name)
3834

    
3835
    if not done:
3836
      # no detaches succeeded (very unlikely)
3837
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3838

    
3839
    # if we managed to detach at least one, we update all the disks of
3840
    # the instance to point to the new secondary
3841
    info("updating instance configuration")
3842
    for dev in instance.disks:
3843
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3844
      cfg.SetDiskID(dev, pri_node)
3845
    cfg.Update(instance)
3846

    
3847
    # and now perform the drbd attach
3848
    info("attaching primary drbds to new secondary (standalone => connected)")
3849
    failures = []
3850
    for dev in instance.disks:
3851
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3852
      # since the attach is smart, it's enough to 'find' the device,
3853
      # it will automatically activate the network, if the physical_id
3854
      # is correct
3855
      cfg.SetDiskID(dev, pri_node)
3856
      if not rpc.call_blockdev_find(pri_node, dev):
3857
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3858
                "please do a gnt-instance info to see the status of disks")
3859

    
3860
    # this can fail as the old devices are degraded and _WaitForSync
3861
    # does a combined result over all disks, so we don't check its
3862
    # return value
3863
    self.proc.LogStep(5, steps_total, "sync devices")
3864
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3865

    
3866
    # so check manually all the devices
3867
    for name, (dev, old_lvs) in iv_names.iteritems():
3868
      cfg.SetDiskID(dev, pri_node)
3869
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3870
      if is_degr:
3871
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3872

    
3873
    self.proc.LogStep(6, steps_total, "removing old storage")
3874
    for name, (dev, old_lvs) in iv_names.iteritems():
3875
      info("remove logical volumes for %s" % name)
3876
      for lv in old_lvs:
3877
        cfg.SetDiskID(lv, old_node)
3878
        if not rpc.call_blockdev_remove(old_node, lv):
3879
          warning("Can't remove LV on old secondary",
3880
                  hint="Cleanup stale volumes by hand")
3881

    
3882
  def Exec(self, feedback_fn):
3883
    """Execute disk replacement.
3884

3885
    This dispatches the disk replacement to the appropriate handler.
3886

3887
    """
3888
    instance = self.instance
3889

    
3890
    # Activate the instance disks if we're replacing them on a down instance
3891
    if instance.status == "down":
3892
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3893
      self.proc.ChainOpCode(op)
3894

    
3895
    if instance.disk_template == constants.DT_DRBD8:
3896
      if self.op.remote_node is None:
3897
        fn = self._ExecD8DiskOnly
3898
      else:
3899
        fn = self._ExecD8Secondary
3900
    else:
3901
      raise errors.ProgrammerError("Unhandled disk replacement case")
3902

    
3903
    ret = fn(feedback_fn)
3904

    
3905
    # Deactivate the instance disks if we're replacing them on a down instance
3906
    if instance.status == "down":
3907
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3908
      self.proc.ChainOpCode(op)
3909

    
3910
    return ret
3911

    
3912

    
3913
class LUGrowDisk(LogicalUnit):
3914
  """Grow a disk of an instance.
3915

3916
  """
3917
  HPATH = "disk-grow"
3918
  HTYPE = constants.HTYPE_INSTANCE
3919
  _OP_REQP = ["instance_name", "disk", "amount"]
3920

    
3921
  def BuildHooksEnv(self):
3922
    """Build hooks env.
3923

3924
    This runs on the master, the primary and all the secondaries.
3925

3926
    """
3927
    env = {
3928
      "DISK": self.op.disk,
3929
      "AMOUNT": self.op.amount,
3930
      }
3931
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3932
    nl = [
3933
      self.sstore.GetMasterNode(),
3934
      self.instance.primary_node,
3935
      ]
3936
    return env, nl, nl
3937

    
3938
  def CheckPrereq(self):
3939
    """Check prerequisites.
3940

3941
    This checks that the instance is in the cluster.
3942

3943
    """
3944
    instance = self.cfg.GetInstanceInfo(
3945
      self.cfg.ExpandInstanceName(self.op.instance_name))
3946
    if instance is None:
3947
      raise errors.OpPrereqError("Instance '%s' not known" %
3948
                                 self.op.instance_name)
3949
    self.instance = instance
3950
    self.op.instance_name = instance.name
3951

    
3952
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3953
      raise errors.OpPrereqError("Instance's disk layout does not support"
3954
                                 " growing.")
3955

    
3956
    if instance.FindDisk(self.op.disk) is None:
3957
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3958
                                 (self.op.disk, instance.name))
3959

    
3960
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3961
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3962
    for node in nodenames:
3963
      info = nodeinfo.get(node, None)
3964
      if not info:
3965
        raise errors.OpPrereqError("Cannot get current information"
3966
                                   " from node '%s'" % node)
3967
      vg_free = info.get('vg_free', None)
3968
      if not isinstance(vg_free, int):
3969
        raise errors.OpPrereqError("Can't compute free disk space on"
3970
                                   " node %s" % node)
3971
      if self.op.amount > info['vg_free']:
3972
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3973
                                   " %d MiB available, %d MiB required" %
3974
                                   (node, info['vg_free'], self.op.amount))
3975

    
3976
  def Exec(self, feedback_fn):
3977
    """Execute disk grow.
3978

3979
    """
3980
    instance = self.instance
3981
    disk = instance.FindDisk(self.op.disk)
3982
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3983
      self.cfg.SetDiskID(disk, node)
3984
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3985
      if not result or not isinstance(result, tuple) or len(result) != 2:
3986
        raise errors.OpExecError("grow request failed to node %s" % node)
3987
      elif not result[0]:
3988
        raise errors.OpExecError("grow request failed to node %s: %s" %
3989
                                 (node, result[1]))
3990
    disk.RecordGrow(self.op.amount)
3991
    self.cfg.Update(instance)
3992
    return
3993

    
3994

    
3995
class LUQueryInstanceData(NoHooksLU):
3996
  """Query runtime instance data.
3997

3998
  """
3999
  _OP_REQP = ["instances"]
4000

    
4001
  def CheckPrereq(self):
4002
    """Check prerequisites.
4003

4004
    This only checks the optional instance list against the existing names.
4005

4006
    """
4007
    if not isinstance(self.op.instances, list):
4008
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4009
    if self.op.instances:
4010
      self.wanted_instances = []
4011
      names = self.op.instances
4012
      for name in names:
4013
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4014
        if instance is None:
4015
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4016
        self.wanted_instances.append(instance)
4017
    else:
4018
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4019
                               in self.cfg.GetInstanceList()]
4020
    return
4021

    
4022

    
4023
  def _ComputeDiskStatus(self, instance, snode, dev):
4024
    """Compute block device status.
4025

4026
    """
4027
    self.cfg.SetDiskID(dev, instance.primary_node)
4028
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4029
    if dev.dev_type in constants.LDS_DRBD:
4030
      # we change the snode then (otherwise we use the one passed in)
4031
      if dev.logical_id[0] == instance.primary_node:
4032
        snode = dev.logical_id[1]
4033
      else:
4034
        snode = dev.logical_id[0]
4035

    
4036
    if snode:
4037
      self.cfg.SetDiskID(dev, snode)
4038
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4039
    else:
4040
      dev_sstatus = None
4041

    
4042
    if dev.children:
4043
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4044
                      for child in dev.children]
4045
    else:
4046
      dev_children = []
4047

    
4048
    data = {
4049
      "iv_name": dev.iv_name,
4050
      "dev_type": dev.dev_type,
4051
      "logical_id": dev.logical_id,
4052
      "physical_id": dev.physical_id,
4053
      "pstatus": dev_pstatus,
4054
      "sstatus": dev_sstatus,
4055
      "children": dev_children,
4056
      }
4057

    
4058
    return data
4059

    
4060
  def Exec(self, feedback_fn):
4061
    """Gather and return data"""
4062
    result = {}
4063
    for instance in self.wanted_instances:
4064
      remote_info = rpc.call_instance_info(instance.primary_node,
4065
                                                instance.name)
4066
      if remote_info and "state" in remote_info:
4067
        remote_state = "up"
4068
      else:
4069
        remote_state = "down"
4070
      if instance.status == "down":
4071
        config_state = "down"
4072
      else:
4073
        config_state = "up"
4074

    
4075
      disks = [self._ComputeDiskStatus(instance, None, device)
4076
               for device in instance.disks]
4077

    
4078
      idict = {
4079
        "name": instance.name,
4080
        "config_state": config_state,
4081
        "run_state": remote_state,
4082
        "pnode": instance.primary_node,
4083
        "snodes": instance.secondary_nodes,
4084
        "os": instance.os,
4085
        "memory": instance.memory,
4086
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4087
        "disks": disks,
4088
        "vcpus": instance.vcpus,
4089
        }
4090

    
4091
      htkind = self.sstore.GetHypervisorType()
4092
      if htkind == constants.HT_XEN_PVM30:
4093
        idict["kernel_path"] = instance.kernel_path
4094
        idict["initrd_path"] = instance.initrd_path
4095

    
4096
      if htkind == constants.HT_XEN_HVM31:
4097
        idict["hvm_boot_order"] = instance.hvm_boot_order
4098
        idict["hvm_acpi"] = instance.hvm_acpi
4099
        idict["hvm_pae"] = instance.hvm_pae
4100
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4101

    
4102
      if htkind in constants.HTS_REQ_PORT:
4103
        idict["vnc_bind_address"] = instance.vnc_bind_address
4104
        idict["network_port"] = instance.network_port
4105

    
4106
      result[instance.name] = idict
4107

    
4108
    return result
4109

    
4110

    
4111
class LUSetInstanceParams(LogicalUnit):
4112
  """Modifies an instances's parameters.
4113

4114
  """
4115
  HPATH = "instance-modify"
4116
  HTYPE = constants.HTYPE_INSTANCE
4117
  _OP_REQP = ["instance_name"]
4118
  REQ_BGL = False
4119

    
4120
  def ExpandNames(self):
4121
    self._ExpandAndLockInstance()
4122

    
4123
  def BuildHooksEnv(self):
4124
    """Build hooks env.
4125

4126
    This runs on the master, primary and secondaries.
4127

4128
    """
4129
    args = dict()
4130
    if self.mem:
4131
      args['memory'] = self.mem
4132
    if self.vcpus:
4133
      args['vcpus'] = self.vcpus
4134
    if self.do_ip or self.do_bridge or self.mac:
4135
      if self.do_ip:
4136
        ip = self.ip
4137
      else:
4138
        ip = self.instance.nics[0].ip
4139
      if self.bridge:
4140
        bridge = self.bridge
4141
      else:
4142
        bridge = self.instance.nics[0].bridge
4143
      if self.mac:
4144
        mac = self.mac
4145
      else:
4146
        mac = self.instance.nics[0].mac
4147
      args['nics'] = [(ip, bridge, mac)]
4148
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4149
    nl = [self.sstore.GetMasterNode(),
4150
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4151
    return env, nl, nl
4152

    
4153
  def CheckPrereq(self):
4154
    """Check prerequisites.
4155

4156
    This only checks the instance list against the existing names.
4157

4158
    """
4159
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4160
    # a separate CheckArguments function, if we implement one, so the operation
4161
    # can be aborted without waiting for any lock, should it have an error...
4162
    self.mem = getattr(self.op, "mem", None)
4163
    self.vcpus = getattr(self.op, "vcpus", None)
4164
    self.ip = getattr(self.op, "ip", None)
4165
    self.mac = getattr(self.op, "mac", None)
4166
    self.bridge = getattr(self.op, "bridge", None)
4167
    self.kernel_path = getattr(self.op, "kernel_path", None)
4168
    self.initrd_path = getattr(self.op, "initrd_path", None)
4169
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4170
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4171
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4172
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4173
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4174
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4175
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4176
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4177
                 self.vnc_bind_address]
4178
    if all_parms.count(None) == len(all_parms):
4179
      raise errors.OpPrereqError("No changes submitted")
4180
    if self.mem is not None:
4181
      try:
4182
        self.mem = int(self.mem)
4183
      except ValueError, err:
4184
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4185
    if self.vcpus is not None:
4186
      try:
4187
        self.vcpus = int(self.vcpus)
4188
      except ValueError, err:
4189
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4190
    if self.ip is not None:
4191
      self.do_ip = True
4192
      if self.ip.lower() == "none":
4193
        self.ip = None
4194
      else:
4195
        if not utils.IsValidIP(self.ip):
4196
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4197
    else:
4198
      self.do_ip = False
4199
    self.do_bridge = (self.bridge is not None)
4200
    if self.mac is not None:
4201
      if self.cfg.IsMacInUse(self.mac):
4202
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4203
                                   self.mac)
4204
      if not utils.IsValidMac(self.mac):
4205
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4206

    
4207
    if self.kernel_path is not None:
4208
      self.do_kernel_path = True
4209
      if self.kernel_path == constants.VALUE_NONE:
4210
        raise errors.OpPrereqError("Can't set instance to no kernel")
4211

    
4212
      if self.kernel_path != constants.VALUE_DEFAULT:
4213
        if not os.path.isabs(self.kernel_path):
4214
          raise errors.OpPrereqError("The kernel path must be an absolute"
4215
                                    " filename")
4216
    else:
4217
      self.do_kernel_path = False
4218

    
4219
    if self.initrd_path is not None:
4220
      self.do_initrd_path = True
4221
      if self.initrd_path not in (constants.VALUE_NONE,
4222
                                  constants.VALUE_DEFAULT):
4223
        if not os.path.isabs(self.initrd_path):
4224
          raise errors.OpPrereqError("The initrd path must be an absolute"
4225
                                    " filename")
4226
    else:
4227
      self.do_initrd_path = False
4228

    
4229
    # boot order verification
4230
    if self.hvm_boot_order is not None:
4231
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4232
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4233
          raise errors.OpPrereqError("invalid boot order specified,"
4234
                                     " must be one or more of [acdn]"
4235
                                     " or 'default'")
4236

    
4237
    # hvm_cdrom_image_path verification
4238
    if self.op.hvm_cdrom_image_path is not None:
4239
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4240
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4241
                                   " be an absolute path or None, not %s" %
4242
                                   self.op.hvm_cdrom_image_path)
4243
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4244
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4245
                                   " regular file or a symlink pointing to"
4246
                                   " an existing regular file, not %s" %
4247
                                   self.op.hvm_cdrom_image_path)
4248

    
4249
    # vnc_bind_address verification
4250
    if self.op.vnc_bind_address is not None:
4251
      if not utils.IsValidIP(self.op.vnc_bind_address):
4252
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4253
                                   " like a valid IP address" %
4254
                                   self.op.vnc_bind_address)
4255

    
4256
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4257
    assert self.instance is not None, \
4258
      "Cannot retrieve locked instance %s" % self.op.instance_name
4259
    return
4260

    
4261
  def Exec(self, feedback_fn):
4262
    """Modifies an instance.
4263

4264
    All parameters take effect only at the next restart of the instance.
4265
    """
4266
    result = []
4267
    instance = self.instance
4268
    if self.mem:
4269
      instance.memory = self.mem
4270
      result.append(("mem", self.mem))
4271
    if self.vcpus:
4272
      instance.vcpus = self.vcpus
4273
      result.append(("vcpus",  self.vcpus))
4274
    if self.do_ip:
4275
      instance.nics[0].ip = self.ip
4276
      result.append(("ip", self.ip))
4277
    if self.bridge:
4278
      instance.nics[0].bridge = self.bridge
4279
      result.append(("bridge", self.bridge))
4280
    if self.mac:
4281
      instance.nics[0].mac = self.mac
4282
      result.append(("mac", self.mac))
4283
    if self.do_kernel_path:
4284
      instance.kernel_path = self.kernel_path
4285
      result.append(("kernel_path", self.kernel_path))
4286
    if self.do_initrd_path:
4287
      instance.initrd_path = self.initrd_path
4288
      result.append(("initrd_path", self.initrd_path))
4289
    if self.hvm_boot_order:
4290
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4291
        instance.hvm_boot_order = None
4292
      else:
4293
        instance.hvm_boot_order = self.hvm_boot_order
4294
      result.append(("hvm_boot_order", self.hvm_boot_order))
4295
    if self.hvm_acpi:
4296
      instance.hvm_acpi = self.hvm_acpi
4297
      result.append(("hvm_acpi", self.hvm_acpi))
4298
    if self.hvm_pae:
4299
      instance.hvm_pae = self.hvm_pae
4300
      result.append(("hvm_pae", self.hvm_pae))
4301
    if self.hvm_cdrom_image_path:
4302
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4303
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4304
    if self.vnc_bind_address:
4305
      instance.vnc_bind_address = self.vnc_bind_address
4306
      result.append(("vnc_bind_address", self.vnc_bind_address))
4307

    
4308
    self.cfg.Update(instance)
4309

    
4310
    return result
4311

    
4312

    
4313
class LUQueryExports(NoHooksLU):
4314
  """Query the exports list
4315

4316
  """
4317
  _OP_REQP = []
4318

    
4319
  def CheckPrereq(self):
4320
    """Check that the nodelist contains only existing nodes.
4321

4322
    """
4323
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4324

    
4325
  def Exec(self, feedback_fn):
4326
    """Compute the list of all the exported system images.
4327

4328
    Returns:
4329
      a dictionary with the structure node->(export-list)
4330
      where export-list is a list of the instances exported on
4331
      that node.
4332

4333
    """
4334
    return rpc.call_export_list(self.nodes)
4335

    
4336

    
4337
class LUExportInstance(LogicalUnit):
4338
  """Export an instance to an image in the cluster.
4339

4340
  """
4341
  HPATH = "instance-export"
4342
  HTYPE = constants.HTYPE_INSTANCE
4343
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4344

    
4345
  def BuildHooksEnv(self):
4346
    """Build hooks env.
4347

4348
    This will run on the master, primary node and target node.
4349

4350
    """
4351
    env = {
4352
      "EXPORT_NODE": self.op.target_node,
4353
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4354
      }
4355
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4356
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4357
          self.op.target_node]
4358
    return env, nl, nl
4359

    
4360
  def CheckPrereq(self):
4361
    """Check prerequisites.
4362

4363
    This checks that the instance and node names are valid.
4364

4365
    """
4366
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4367
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4368
    if self.instance is None:
4369
      raise errors.OpPrereqError("Instance '%s' not found" %
4370
                                 self.op.instance_name)
4371

    
4372
    # node verification
4373
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4374
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4375

    
4376
    if self.dst_node is None:
4377
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4378
                                 self.op.target_node)
4379
    self.op.target_node = self.dst_node.name
4380

    
4381
    # instance disk type verification
4382
    for disk in self.instance.disks:
4383
      if disk.dev_type == constants.LD_FILE:
4384
        raise errors.OpPrereqError("Export not supported for instances with"
4385
                                   " file-based disks")
4386

    
4387
  def Exec(self, feedback_fn):
4388
    """Export an instance to an image in the cluster.
4389

4390
    """
4391
    instance = self.instance
4392
    dst_node = self.dst_node
4393
    src_node = instance.primary_node
4394
    if self.op.shutdown:
4395
      # shutdown the instance, but not the disks
4396
      if not rpc.call_instance_shutdown(src_node, instance):
4397
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4398
                                 (instance.name, src_node))
4399

    
4400
    vgname = self.cfg.GetVGName()
4401

    
4402
    snap_disks = []
4403

    
4404
    try:
4405
      for disk in instance.disks:
4406
        if disk.iv_name == "sda":
4407
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4408
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4409

    
4410
          if not new_dev_name:
4411
            logger.Error("could not snapshot block device %s on node %s" %
4412
                         (disk.logical_id[1], src_node))
4413
          else:
4414
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4415
                                      logical_id=(vgname, new_dev_name),
4416
                                      physical_id=(vgname, new_dev_name),
4417
                                      iv_name=disk.iv_name)
4418
            snap_disks.append(new_dev)
4419

    
4420
    finally:
4421
      if self.op.shutdown and instance.status == "up":
4422
        if not rpc.call_instance_start(src_node, instance, None):
4423
          _ShutdownInstanceDisks(instance, self.cfg)
4424
          raise errors.OpExecError("Could not start instance")
4425

    
4426
    # TODO: check for size
4427

    
4428
    for dev in snap_disks:
4429
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4430
        logger.Error("could not export block device %s from node %s to node %s"
4431
                     % (dev.logical_id[1], src_node, dst_node.name))
4432
      if not rpc.call_blockdev_remove(src_node, dev):
4433
        logger.Error("could not remove snapshot block device %s from node %s" %
4434
                     (dev.logical_id[1], src_node))
4435

    
4436
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4437
      logger.Error("could not finalize export for instance %s on node %s" %
4438
                   (instance.name, dst_node.name))
4439

    
4440
    nodelist = self.cfg.GetNodeList()
4441
    nodelist.remove(dst_node.name)
4442

    
4443
    # on one-node clusters nodelist will be empty after the removal
4444
    # if we proceed the backup would be removed because OpQueryExports
4445
    # substitutes an empty list with the full cluster node list.
4446
    if nodelist:
4447
      op = opcodes.OpQueryExports(nodes=nodelist)
4448
      exportlist = self.proc.ChainOpCode(op)
4449
      for node in exportlist:
4450
        if instance.name in exportlist[node]:
4451
          if not rpc.call_export_remove(node, instance.name):
4452
            logger.Error("could not remove older export for instance %s"
4453
                         " on node %s" % (instance.name, node))
4454

    
4455

    
4456
class LURemoveExport(NoHooksLU):
4457
  """Remove exports related to the named instance.
4458

4459
  """
4460
  _OP_REQP = ["instance_name"]
4461

    
4462
  def CheckPrereq(self):
4463
    """Check prerequisites.
4464
    """
4465
    pass
4466

    
4467
  def Exec(self, feedback_fn):
4468
    """Remove any export.
4469

4470
    """
4471
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4472
    # If the instance was not found we'll try with the name that was passed in.
4473
    # This will only work if it was an FQDN, though.
4474
    fqdn_warn = False
4475
    if not instance_name:
4476
      fqdn_warn = True
4477
      instance_name = self.op.instance_name
4478

    
4479
    op = opcodes.OpQueryExports(nodes=[])
4480
    exportlist = self.proc.ChainOpCode(op)
4481
    found = False
4482
    for node in exportlist:
4483
      if instance_name in exportlist[node]:
4484
        found = True
4485
        if not rpc.call_export_remove(node, instance_name):
4486
          logger.Error("could not remove export for instance %s"
4487
                       " on node %s" % (instance_name, node))
4488

    
4489
    if fqdn_warn and not found:
4490
      feedback_fn("Export not found. If trying to remove an export belonging"
4491
                  " to a deleted instance please use its Fully Qualified"
4492
                  " Domain Name.")
4493

    
4494

    
4495
class TagsLU(NoHooksLU):
4496
  """Generic tags LU.
4497

4498
  This is an abstract class which is the parent of all the other tags LUs.
4499

4500
  """
4501
  def CheckPrereq(self):
4502
    """Check prerequisites.
4503

4504
    """
4505
    if self.op.kind == constants.TAG_CLUSTER:
4506
      self.target = self.cfg.GetClusterInfo()
4507
    elif self.op.kind == constants.TAG_NODE:
4508
      name = self.cfg.ExpandNodeName(self.op.name)
4509
      if name is None:
4510
        raise errors.OpPrereqError("Invalid node name (%s)" %
4511
                                   (self.op.name,))
4512
      self.op.name = name
4513
      self.target = self.cfg.GetNodeInfo(name)
4514
    elif self.op.kind == constants.TAG_INSTANCE:
4515
      name = self.cfg.ExpandInstanceName(self.op.name)
4516
      if name is None:
4517
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4518
                                   (self.op.name,))
4519
      self.op.name = name
4520
      self.target = self.cfg.GetInstanceInfo(name)
4521
    else:
4522
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4523
                                 str(self.op.kind))
4524

    
4525

    
4526
class LUGetTags(TagsLU):
4527
  """Returns the tags of a given object.
4528

4529
  """
4530
  _OP_REQP = ["kind", "name"]
4531

    
4532
  def Exec(self, feedback_fn):
4533
    """Returns the tag list.
4534

4535
    """
4536
    return list(self.target.GetTags())
4537

    
4538

    
4539
class LUSearchTags(NoHooksLU):
4540
  """Searches the tags for a given pattern.
4541

4542
  """
4543
  _OP_REQP = ["pattern"]
4544

    
4545
  def CheckPrereq(self):
4546
    """Check prerequisites.
4547

4548
    This checks the pattern passed for validity by compiling it.
4549

4550
    """
4551
    try:
4552
      self.re = re.compile(self.op.pattern)
4553
    except re.error, err:
4554
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4555
                                 (self.op.pattern, err))
4556

    
4557
  def Exec(self, feedback_fn):
4558
    """Returns the tag list.
4559

4560
    """
4561
    cfg = self.cfg
4562
    tgts = [("/cluster", cfg.GetClusterInfo())]
4563
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4564
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4565
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4566
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4567
    results = []
4568
    for path, target in tgts:
4569
      for tag in target.GetTags():
4570
        if self.re.search(tag):
4571
          results.append((path, tag))
4572
    return results
4573

    
4574

    
4575
class LUAddTags(TagsLU):
4576
  """Sets a tag on a given object.
4577

4578
  """
4579
  _OP_REQP = ["kind", "name", "tags"]
4580

    
4581
  def CheckPrereq(self):
4582
    """Check prerequisites.
4583

4584
    This checks the type and length of the tag name and value.
4585

4586
    """
4587
    TagsLU.CheckPrereq(self)
4588
    for tag in self.op.tags:
4589
      objects.TaggableObject.ValidateTag(tag)
4590

    
4591
  def Exec(self, feedback_fn):
4592
    """Sets the tag.
4593

4594
    """
4595
    try:
4596
      for tag in self.op.tags:
4597
        self.target.AddTag(tag)
4598
    except errors.TagError, err:
4599
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4600
    try:
4601
      self.cfg.Update(self.target)
4602
    except errors.ConfigurationError:
4603
      raise errors.OpRetryError("There has been a modification to the"
4604
                                " config file and the operation has been"
4605
                                " aborted. Please retry.")
4606

    
4607

    
4608
class LUDelTags(TagsLU):
4609
  """Delete a list of tags from a given object.
4610

4611
  """
4612
  _OP_REQP = ["kind", "name", "tags"]
4613

    
4614
  def CheckPrereq(self):
4615
    """Check prerequisites.
4616

4617
    This checks that we have the given tag.
4618

4619
    """
4620
    TagsLU.CheckPrereq(self)
4621
    for tag in self.op.tags:
4622
      objects.TaggableObject.ValidateTag(tag)
4623
    del_tags = frozenset(self.op.tags)
4624
    cur_tags = self.target.GetTags()
4625
    if not del_tags <= cur_tags:
4626
      diff_tags = del_tags - cur_tags
4627
      diff_names = ["'%s'" % tag for tag in diff_tags]
4628
      diff_names.sort()
4629
      raise errors.OpPrereqError("Tag(s) %s not found" %
4630
                                 (",".join(diff_names)))
4631

    
4632
  def Exec(self, feedback_fn):
4633
    """Remove the tag from the object.
4634

4635
    """
4636
    for tag in self.op.tags:
4637
      self.target.RemoveTag(tag)
4638
    try:
4639
      self.cfg.Update(self.target)
4640
    except errors.ConfigurationError:
4641
      raise errors.OpRetryError("There has been a modification to the"
4642
                                " config file and the operation has been"
4643
                                " aborted. Please retry.")
4644

    
4645

    
4646
class LUTestDelay(NoHooksLU):
4647
  """Sleep for a specified amount of time.
4648

4649
  This LU sleeps on the master and/or nodes for a specified amount of
4650
  time.
4651

4652
  """
4653
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4654
  REQ_BGL = False
4655

    
4656
  def ExpandNames(self):
4657
    """Expand names and set required locks.
4658

4659
    This expands the node list, if any.
4660

4661
    """
4662
    self.needed_locks = {}
4663
    if self.op.on_nodes:
4664
      # _GetWantedNodes can be used here, but is not always appropriate to use
4665
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4666
      # more information.
4667
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4668
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4669

    
4670
  def CheckPrereq(self):
4671
    """Check prerequisites.
4672

4673
    """
4674

    
4675
  def Exec(self, feedback_fn):
4676
    """Do the actual sleep.
4677

4678
    """
4679
    if self.op.on_master:
4680
      if not utils.TestDelay(self.op.duration):
4681
        raise errors.OpExecError("Error during master delay test")
4682
    if self.op.on_nodes:
4683
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4684
      if not result:
4685
        raise errors.OpExecError("Complete failure from rpc call")
4686
      for node, node_result in result.items():
4687
        if not node_result:
4688
          raise errors.OpExecError("Failure during rpc call to node %s,"
4689
                                   " result: %s" % (node, node_result))
4690

    
4691

    
4692
class IAllocator(object):
4693
  """IAllocator framework.
4694

4695
  An IAllocator instance has three sets of attributes:
4696
    - cfg/sstore that are needed to query the cluster
4697
    - input data (all members of the _KEYS class attribute are required)
4698
    - four buffer attributes (in|out_data|text), that represent the
4699
      input (to the external script) in text and data structure format,
4700
      and the output from it, again in two formats
4701
    - the result variables from the script (success, info, nodes) for
4702
      easy usage
4703

4704
  """
4705
  _ALLO_KEYS = [
4706
    "mem_size", "disks", "disk_template",
4707
    "os", "tags", "nics", "vcpus",
4708
    ]
4709
  _RELO_KEYS = [
4710
    "relocate_from",
4711
    ]
4712

    
4713
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4714
    self.cfg = cfg
4715
    self.sstore = sstore
4716
    # init buffer variables
4717
    self.in_text = self.out_text = self.in_data = self.out_data = None
4718
    # init all input fields so that pylint is happy
4719
    self.mode = mode
4720
    self.name = name
4721
    self.mem_size = self.disks = self.disk_template = None
4722
    self.os = self.tags = self.nics = self.vcpus = None
4723
    self.relocate_from = None
4724
    # computed fields
4725
    self.required_nodes = None
4726
    # init result fields
4727
    self.success = self.info = self.nodes = None
4728
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4729
      keyset = self._ALLO_KEYS
4730
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4731
      keyset = self._RELO_KEYS
4732
    else:
4733
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4734
                                   " IAllocator" % self.mode)
4735
    for key in kwargs:
4736
      if key not in keyset:
4737
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4738
                                     " IAllocator" % key)
4739
      setattr(self, key, kwargs[key])
4740
    for key in keyset:
4741
      if key not in kwargs:
4742
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4743
                                     " IAllocator" % key)
4744
    self._BuildInputData()
4745

    
4746
  def _ComputeClusterData(self):
4747
    """Compute the generic allocator input data.
4748

4749
    This is the data that is independent of the actual operation.
4750

4751
    """
4752
    cfg = self.cfg
4753
    # cluster data
4754
    data = {
4755
      "version": 1,
4756
      "cluster_name": self.sstore.GetClusterName(),
4757
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4758
      "hypervisor_type": self.sstore.GetHypervisorType(),
4759
      # we don't have job IDs
4760
      }
4761

    
4762
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4763

    
4764
    # node data
4765
    node_results = {}
4766
    node_list = cfg.GetNodeList()
4767
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4768
    for nname in node_list:
4769
      ninfo = cfg.GetNodeInfo(nname)
4770
      if nname not in node_data or not isinstance(node_data[nname], dict):
4771
        raise errors.OpExecError("Can't get data for node %s" % nname)
4772
      remote_info = node_data[nname]
4773
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4774
                   'vg_size', 'vg_free', 'cpu_total']:
4775
        if attr not in remote_info:
4776
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4777
                                   (nname, attr))
4778
        try:
4779
          remote_info[attr] = int(remote_info[attr])
4780
        except ValueError, err:
4781
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4782
                                   " %s" % (nname, attr, str(err)))
4783
      # compute memory used by primary instances
4784
      i_p_mem = i_p_up_mem = 0
4785
      for iinfo in i_list:
4786
        if iinfo.primary_node == nname:
4787
          i_p_mem += iinfo.memory
4788
          if iinfo.status == "up":
4789
            i_p_up_mem += iinfo.memory
4790

    
4791
      # compute memory used by instances
4792
      pnr = {
4793
        "tags": list(ninfo.GetTags()),
4794
        "total_memory": remote_info['memory_total'],
4795
        "reserved_memory": remote_info['memory_dom0'],
4796
        "free_memory": remote_info['memory_free'],
4797
        "i_pri_memory": i_p_mem,
4798
        "i_pri_up_memory": i_p_up_mem,
4799
        "total_disk": remote_info['vg_size'],
4800
        "free_disk": remote_info['vg_free'],
4801
        "primary_ip": ninfo.primary_ip,
4802
        "secondary_ip": ninfo.secondary_ip,
4803
        "total_cpus": remote_info['cpu_total'],
4804
        }
4805
      node_results[nname] = pnr
4806
    data["nodes"] = node_results
4807

    
4808
    # instance data
4809
    instance_data = {}
4810
    for iinfo in i_list:
4811
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4812
                  for n in iinfo.nics]
4813
      pir = {
4814
        "tags": list(iinfo.GetTags()),
4815
        "should_run": iinfo.status == "up",
4816
        "vcpus": iinfo.vcpus,
4817
        "memory": iinfo.memory,
4818
        "os": iinfo.os,
4819
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4820
        "nics": nic_data,
4821
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4822
        "disk_template": iinfo.disk_template,
4823
        }
4824
      instance_data[iinfo.name] = pir
4825

    
4826
    data["instances"] = instance_data
4827

    
4828
    self.in_data = data
4829

    
4830
  def _AddNewInstance(self):
4831
    """Add new instance data to allocator structure.
4832

4833
    This in combination with _AllocatorGetClusterData will create the
4834
    correct structure needed as input for the allocator.
4835

4836
    The checks for the completeness of the opcode must have already been
4837
    done.
4838

4839
    """
4840
    data = self.in_data
4841
    if len(self.disks) != 2:
4842
      raise errors.OpExecError("Only two-disk configurations supported")
4843

    
4844
    disk_space = _ComputeDiskSize(self.disk_template,
4845
                                  self.disks[0]["size"], self.disks[1]["size"])
4846

    
4847
    if self.disk_template in constants.DTS_NET_MIRROR:
4848
      self.required_nodes = 2
4849
    else:
4850
      self.required_nodes = 1
4851
    request = {
4852
      "type": "allocate",
4853
      "name": self.name,
4854
      "disk_template": self.disk_template,
4855
      "tags": self.tags,
4856
      "os": self.os,
4857
      "vcpus": self.vcpus,
4858
      "memory": self.mem_size,
4859
      "disks": self.disks,
4860
      "disk_space_total": disk_space,
4861
      "nics": self.nics,
4862
      "required_nodes": self.required_nodes,
4863
      }
4864
    data["request"] = request
4865

    
4866
  def _AddRelocateInstance(self):
4867
    """Add relocate instance data to allocator structure.
4868

4869
    This in combination with _IAllocatorGetClusterData will create the
4870
    correct structure needed as input for the allocator.
4871

4872
    The checks for the completeness of the opcode must have already been
4873
    done.
4874

4875
    """
4876
    instance = self.cfg.GetInstanceInfo(self.name)
4877
    if instance is None:
4878
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4879
                                   " IAllocator" % self.name)
4880

    
4881
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4882
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4883

    
4884
    if len(instance.secondary_nodes) != 1:
4885
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4886

    
4887
    self.required_nodes = 1
4888

    
4889
    disk_space = _ComputeDiskSize(instance.disk_template,
4890
                                  instance.disks[0].size,
4891
                                  instance.disks[1].size)
4892

    
4893
    request = {
4894
      "type": "relocate",
4895
      "name": self.name,
4896
      "disk_space_total": disk_space,
4897
      "required_nodes": self.required_nodes,
4898
      "relocate_from": self.relocate_from,
4899
      }
4900
    self.in_data["request"] = request
4901

    
4902
  def _BuildInputData(self):
4903
    """Build input data structures.
4904

4905
    """
4906
    self._ComputeClusterData()
4907

    
4908
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4909
      self._AddNewInstance()
4910
    else:
4911
      self._AddRelocateInstance()
4912

    
4913
    self.in_text = serializer.Dump(self.in_data)
4914

    
4915
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4916
    """Run an instance allocator and return the results.
4917

4918
    """
4919
    data = self.in_text
4920

    
4921
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4922

    
4923
    if not isinstance(result, tuple) or len(result) != 4:
4924
      raise errors.OpExecError("Invalid result from master iallocator runner")
4925

    
4926
    rcode, stdout, stderr, fail = result
4927

    
4928
    if rcode == constants.IARUN_NOTFOUND:
4929
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4930
    elif rcode == constants.IARUN_FAILURE:
4931
      raise errors.OpExecError("Instance allocator call failed: %s,"
4932
                               " output: %s" % (fail, stdout+stderr))
4933
    self.out_text = stdout
4934
    if validate:
4935
      self._ValidateResult()
4936

    
4937
  def _ValidateResult(self):
4938
    """Process the allocator results.
4939

4940
    This will process and if successful save the result in
4941
    self.out_data and the other parameters.
4942

4943
    """
4944
    try:
4945
      rdict = serializer.Load(self.out_text)
4946
    except Exception, err:
4947
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4948

    
4949
    if not isinstance(rdict, dict):
4950
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4951

    
4952
    for key in "success", "info", "nodes":
4953
      if key not in rdict:
4954
        raise errors.OpExecError("Can't parse iallocator results:"
4955
                                 " missing key '%s'" % key)
4956
      setattr(self, key, rdict[key])
4957

    
4958
    if not isinstance(rdict["nodes"], list):
4959
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4960
                               " is not a list")
4961
    self.out_data = rdict
4962

    
4963

    
4964
class LUTestAllocator(NoHooksLU):
4965
  """Run allocator tests.
4966

4967
  This LU runs the allocator tests
4968

4969
  """
4970
  _OP_REQP = ["direction", "mode", "name"]
4971

    
4972
  def CheckPrereq(self):
4973
    """Check prerequisites.
4974

4975
    This checks the opcode parameters depending on the director and mode test.
4976

4977
    """
4978
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4979
      for attr in ["name", "mem_size", "disks", "disk_template",
4980
                   "os", "tags", "nics", "vcpus"]:
4981
        if not hasattr(self.op, attr):
4982
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4983
                                     attr)
4984
      iname = self.cfg.ExpandInstanceName(self.op.name)
4985
      if iname is not None:
4986
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4987
                                   iname)
4988
      if not isinstance(self.op.nics, list):
4989
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4990
      for row in self.op.nics:
4991
        if (not isinstance(row, dict) or
4992
            "mac" not in row or
4993
            "ip" not in row or
4994
            "bridge" not in row):
4995
          raise errors.OpPrereqError("Invalid contents of the"
4996
                                     " 'nics' parameter")
4997
      if not isinstance(self.op.disks, list):
4998
        raise errors.OpPrereqError("Invalid parameter 'disks'")
4999
      if len(self.op.disks) != 2:
5000
        raise errors.OpPrereqError("Only two-disk configurations supported")
5001
      for row in self.op.disks:
5002
        if (not isinstance(row, dict) or
5003
            "size" not in row or
5004
            not isinstance(row["size"], int) or
5005
            "mode" not in row or
5006
            row["mode"] not in ['r', 'w']):
5007
          raise errors.OpPrereqError("Invalid contents of the"
5008
                                     " 'disks' parameter")
5009
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5010
      if not hasattr(self.op, "name"):
5011
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5012
      fname = self.cfg.ExpandInstanceName(self.op.name)
5013
      if fname is None:
5014
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5015
                                   self.op.name)
5016
      self.op.name = fname
5017
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5018
    else:
5019
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5020
                                 self.op.mode)
5021

    
5022
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5023
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5024
        raise errors.OpPrereqError("Missing allocator name")
5025
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5026
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5027
                                 self.op.direction)
5028

    
5029
  def Exec(self, feedback_fn):
5030
    """Run the allocator test.
5031

5032
    """
5033
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5034
      ial = IAllocator(self.cfg, self.sstore,
5035
                       mode=self.op.mode,
5036
                       name=self.op.name,
5037
                       mem_size=self.op.mem_size,
5038
                       disks=self.op.disks,
5039
                       disk_template=self.op.disk_template,
5040
                       os=self.op.os,
5041
                       tags=self.op.tags,
5042
                       nics=self.op.nics,
5043
                       vcpus=self.op.vcpus,
5044
                       )
5045
    else:
5046
      ial = IAllocator(self.cfg, self.sstore,
5047
                       mode=self.op.mode,
5048
                       name=self.op.name,
5049
                       relocate_from=list(self.relocate_from),
5050
                       )
5051

    
5052
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5053
      result = ial.in_text
5054
    else:
5055
      ial.Run(self.op.allocator, validate=False)
5056
      result = ial.out_text
5057
    return result