Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 35705d8f

History | View | Annotate | Download (173.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    # Used to force good behavior when calling helper functions
86
    self.recalculate_locks = {}
87
    self.__ssh = None
88

    
89
    for attr_name in self._OP_REQP:
90
      attr_val = getattr(op, attr_name, None)
91
      if attr_val is None:
92
        raise errors.OpPrereqError("Required parameter '%s' missing" %
93
                                   attr_name)
94

    
95
    if not self.cfg.IsCluster():
96
      raise errors.OpPrereqError("Cluster not initialized yet,"
97
                                 " use 'gnt-cluster init' first.")
98
    if self.REQ_MASTER:
99
      master = sstore.GetMasterNode()
100
      if master != utils.HostInfo().name:
101
        raise errors.OpPrereqError("Commands must be run on the master"
102
                                   " node %s" % master)
103

    
104
  def __GetSSH(self):
105
    """Returns the SshRunner object
106

107
    """
108
    if not self.__ssh:
109
      self.__ssh = ssh.SshRunner(self.sstore)
110
    return self.__ssh
111

    
112
  ssh = property(fget=__GetSSH)
113

    
114
  def ExpandNames(self):
115
    """Expand names for this LU.
116

117
    This method is called before starting to execute the opcode, and it should
118
    update all the parameters of the opcode to their canonical form (e.g. a
119
    short node name must be fully expanded after this method has successfully
120
    completed). This way locking, hooks, logging, ecc. can work correctly.
121

122
    LUs which implement this method must also populate the self.needed_locks
123
    member, as a dict with lock levels as keys, and a list of needed lock names
124
    as values. Rules:
125
      - Use an empty dict if you don't need any lock
126
      - If you don't need any lock at a particular level omit that level
127
      - Don't put anything for the BGL level
128
      - If you want all locks at a level use None as a value
129
        (this reflects what LockSet does, and will be replaced before
130
        CheckPrereq with the full list of nodes that have been locked)
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: None,
140
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265
  def _LockInstancesNodes(self):
266
    """Helper function to declare instances' nodes for locking.
267

268
    This function should be called after locking one or more instances to lock
269
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270
    with all primary or secondary nodes for instances already locked and
271
    present in self.needed_locks[locking.LEVEL_INSTANCE].
272

273
    It should be called from DeclareLocks, and for safety only works if
274
    self.recalculate_locks[locking.LEVEL_NODE] is set.
275

276
    In the future it may grow parameters to just lock some instance's nodes, or
277
    to just lock primaries or secondary nodes, if needed.
278

279
    If should be called in DeclareLocks in a way similar to:
280

281
    if level == locking.LEVEL_NODE:
282
      self._LockInstancesNodes()
283

284
    """
285
    assert locking.LEVEL_NODE in self.recalculate_locks, \
286
      "_LockInstancesNodes helper function called with no nodes to recalculate"
287

    
288
    # TODO: check if we're really been called with the instance locks held
289

    
290
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
291
    # future we might want to have different behaviors depending on the value
292
    # of self.recalculate_locks[locking.LEVEL_NODE]
293
    wanted_nodes = []
294
    for instance_name in self.needed_locks[locking.LEVEL_INSTANCE]:
295
      instance = self.context.cfg.GetInstanceInfo(instance_name)
296
      wanted_nodes.append(instance.primary_node)
297
      wanted_nodes.extend(instance.secondary_nodes)
298
    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
299

    
300
    del self.recalculate_locks[locking.LEVEL_NODE]
301

    
302

    
303
class NoHooksLU(LogicalUnit):
304
  """Simple LU which runs no hooks.
305

306
  This LU is intended as a parent for other LogicalUnits which will
307
  run no hooks, in order to reduce duplicate code.
308

309
  """
310
  HPATH = None
311
  HTYPE = None
312

    
313

    
314
def _GetWantedNodes(lu, nodes):
315
  """Returns list of checked and expanded node names.
316

317
  Args:
318
    nodes: List of nodes (strings) or None for all
319

320
  """
321
  if not isinstance(nodes, list):
322
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
323

    
324
  if nodes:
325
    wanted = []
326

    
327
    for name in nodes:
328
      node = lu.cfg.ExpandNodeName(name)
329
      if node is None:
330
        raise errors.OpPrereqError("No such node name '%s'" % name)
331
      wanted.append(node)
332

    
333
  else:
334
    wanted = lu.cfg.GetNodeList()
335
  return utils.NiceSort(wanted)
336

    
337

    
338
def _GetWantedInstances(lu, instances):
339
  """Returns list of checked and expanded instance names.
340

341
  Args:
342
    instances: List of instances (strings) or None for all
343

344
  """
345
  if not isinstance(instances, list):
346
    raise errors.OpPrereqError("Invalid argument type 'instances'")
347

    
348
  if instances:
349
    wanted = []
350

    
351
    for name in instances:
352
      instance = lu.cfg.ExpandInstanceName(name)
353
      if instance is None:
354
        raise errors.OpPrereqError("No such instance name '%s'" % name)
355
      wanted.append(instance)
356

    
357
  else:
358
    wanted = lu.cfg.GetInstanceList()
359
  return utils.NiceSort(wanted)
360

    
361

    
362
def _CheckOutputFields(static, dynamic, selected):
363
  """Checks whether all selected fields are valid.
364

365
  Args:
366
    static: Static fields
367
    dynamic: Dynamic fields
368

369
  """
370
  static_fields = frozenset(static)
371
  dynamic_fields = frozenset(dynamic)
372

    
373
  all_fields = static_fields | dynamic_fields
374

    
375
  if not all_fields.issuperset(selected):
376
    raise errors.OpPrereqError("Unknown output fields selected: %s"
377
                               % ",".join(frozenset(selected).
378
                                          difference(all_fields)))
379

    
380

    
381
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
382
                          memory, vcpus, nics):
383
  """Builds instance related env variables for hooks from single variables.
384

385
  Args:
386
    secondary_nodes: List of secondary nodes as strings
387
  """
388
  env = {
389
    "OP_TARGET": name,
390
    "INSTANCE_NAME": name,
391
    "INSTANCE_PRIMARY": primary_node,
392
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
393
    "INSTANCE_OS_TYPE": os_type,
394
    "INSTANCE_STATUS": status,
395
    "INSTANCE_MEMORY": memory,
396
    "INSTANCE_VCPUS": vcpus,
397
  }
398

    
399
  if nics:
400
    nic_count = len(nics)
401
    for idx, (ip, bridge, mac) in enumerate(nics):
402
      if ip is None:
403
        ip = ""
404
      env["INSTANCE_NIC%d_IP" % idx] = ip
405
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
406
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
407
  else:
408
    nic_count = 0
409

    
410
  env["INSTANCE_NIC_COUNT"] = nic_count
411

    
412
  return env
413

    
414

    
415
def _BuildInstanceHookEnvByObject(instance, override=None):
416
  """Builds instance related env variables for hooks from an object.
417

418
  Args:
419
    instance: objects.Instance object of instance
420
    override: dict of values to override
421
  """
422
  args = {
423
    'name': instance.name,
424
    'primary_node': instance.primary_node,
425
    'secondary_nodes': instance.secondary_nodes,
426
    'os_type': instance.os,
427
    'status': instance.os,
428
    'memory': instance.memory,
429
    'vcpus': instance.vcpus,
430
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
431
  }
432
  if override:
433
    args.update(override)
434
  return _BuildInstanceHookEnv(**args)
435

    
436

    
437
def _CheckInstanceBridgesExist(instance):
438
  """Check that the brigdes needed by an instance exist.
439

440
  """
441
  # check bridges existance
442
  brlist = [nic.bridge for nic in instance.nics]
443
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
444
    raise errors.OpPrereqError("one or more target bridges %s does not"
445
                               " exist on destination node '%s'" %
446
                               (brlist, instance.primary_node))
447

    
448

    
449
class LUDestroyCluster(NoHooksLU):
450
  """Logical unit for destroying the cluster.
451

452
  """
453
  _OP_REQP = []
454

    
455
  def CheckPrereq(self):
456
    """Check prerequisites.
457

458
    This checks whether the cluster is empty.
459

460
    Any errors are signalled by raising errors.OpPrereqError.
461

462
    """
463
    master = self.sstore.GetMasterNode()
464

    
465
    nodelist = self.cfg.GetNodeList()
466
    if len(nodelist) != 1 or nodelist[0] != master:
467
      raise errors.OpPrereqError("There are still %d node(s) in"
468
                                 " this cluster." % (len(nodelist) - 1))
469
    instancelist = self.cfg.GetInstanceList()
470
    if instancelist:
471
      raise errors.OpPrereqError("There are still %d instance(s) in"
472
                                 " this cluster." % len(instancelist))
473

    
474
  def Exec(self, feedback_fn):
475
    """Destroys the cluster.
476

477
    """
478
    master = self.sstore.GetMasterNode()
479
    if not rpc.call_node_stop_master(master, False):
480
      raise errors.OpExecError("Could not disable the master role")
481
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
482
    utils.CreateBackup(priv_key)
483
    utils.CreateBackup(pub_key)
484
    return master
485

    
486

    
487
class LUVerifyCluster(LogicalUnit):
488
  """Verifies the cluster status.
489

490
  """
491
  HPATH = "cluster-verify"
492
  HTYPE = constants.HTYPE_CLUSTER
493
  _OP_REQP = ["skip_checks"]
494

    
495
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
496
                  remote_version, feedback_fn):
497
    """Run multiple tests against a node.
498

499
    Test list:
500
      - compares ganeti version
501
      - checks vg existance and size > 20G
502
      - checks config file checksum
503
      - checks ssh to other nodes
504

505
    Args:
506
      node: name of the node to check
507
      file_list: required list of files
508
      local_cksum: dictionary of local files and their checksums
509

510
    """
511
    # compares ganeti version
512
    local_version = constants.PROTOCOL_VERSION
513
    if not remote_version:
514
      feedback_fn("  - ERROR: connection to %s failed" % (node))
515
      return True
516

    
517
    if local_version != remote_version:
518
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
519
                      (local_version, node, remote_version))
520
      return True
521

    
522
    # checks vg existance and size > 20G
523

    
524
    bad = False
525
    if not vglist:
526
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
527
                      (node,))
528
      bad = True
529
    else:
530
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
531
                                            constants.MIN_VG_SIZE)
532
      if vgstatus:
533
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
534
        bad = True
535

    
536
    # checks config file checksum
537
    # checks ssh to any
538

    
539
    if 'filelist' not in node_result:
540
      bad = True
541
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
542
    else:
543
      remote_cksum = node_result['filelist']
544
      for file_name in file_list:
545
        if file_name not in remote_cksum:
546
          bad = True
547
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
548
        elif remote_cksum[file_name] != local_cksum[file_name]:
549
          bad = True
550
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
551

    
552
    if 'nodelist' not in node_result:
553
      bad = True
554
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
555
    else:
556
      if node_result['nodelist']:
557
        bad = True
558
        for node in node_result['nodelist']:
559
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
560
                          (node, node_result['nodelist'][node]))
561
    if 'node-net-test' not in node_result:
562
      bad = True
563
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
564
    else:
565
      if node_result['node-net-test']:
566
        bad = True
567
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
568
        for node in nlist:
569
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
570
                          (node, node_result['node-net-test'][node]))
571

    
572
    hyp_result = node_result.get('hypervisor', None)
573
    if hyp_result is not None:
574
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
575
    return bad
576

    
577
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
578
                      node_instance, feedback_fn):
579
    """Verify an instance.
580

581
    This function checks to see if the required block devices are
582
    available on the instance's node.
583

584
    """
585
    bad = False
586

    
587
    node_current = instanceconfig.primary_node
588

    
589
    node_vol_should = {}
590
    instanceconfig.MapLVsByNode(node_vol_should)
591

    
592
    for node in node_vol_should:
593
      for volume in node_vol_should[node]:
594
        if node not in node_vol_is or volume not in node_vol_is[node]:
595
          feedback_fn("  - ERROR: volume %s missing on node %s" %
596
                          (volume, node))
597
          bad = True
598

    
599
    if not instanceconfig.status == 'down':
600
      if (node_current not in node_instance or
601
          not instance in node_instance[node_current]):
602
        feedback_fn("  - ERROR: instance %s not running on node %s" %
603
                        (instance, node_current))
604
        bad = True
605

    
606
    for node in node_instance:
607
      if (not node == node_current):
608
        if instance in node_instance[node]:
609
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
610
                          (instance, node))
611
          bad = True
612

    
613
    return bad
614

    
615
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
616
    """Verify if there are any unknown volumes in the cluster.
617

618
    The .os, .swap and backup volumes are ignored. All other volumes are
619
    reported as unknown.
620

621
    """
622
    bad = False
623

    
624
    for node in node_vol_is:
625
      for volume in node_vol_is[node]:
626
        if node not in node_vol_should or volume not in node_vol_should[node]:
627
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
628
                      (volume, node))
629
          bad = True
630
    return bad
631

    
632
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
633
    """Verify the list of running instances.
634

635
    This checks what instances are running but unknown to the cluster.
636

637
    """
638
    bad = False
639
    for node in node_instance:
640
      for runninginstance in node_instance[node]:
641
        if runninginstance not in instancelist:
642
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
643
                          (runninginstance, node))
644
          bad = True
645
    return bad
646

    
647
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
648
    """Verify N+1 Memory Resilience.
649

650
    Check that if one single node dies we can still start all the instances it
651
    was primary for.
652

653
    """
654
    bad = False
655

    
656
    for node, nodeinfo in node_info.iteritems():
657
      # This code checks that every node which is now listed as secondary has
658
      # enough memory to host all instances it is supposed to should a single
659
      # other node in the cluster fail.
660
      # FIXME: not ready for failover to an arbitrary node
661
      # FIXME: does not support file-backed instances
662
      # WARNING: we currently take into account down instances as well as up
663
      # ones, considering that even if they're down someone might want to start
664
      # them even in the event of a node failure.
665
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
666
        needed_mem = 0
667
        for instance in instances:
668
          needed_mem += instance_cfg[instance].memory
669
        if nodeinfo['mfree'] < needed_mem:
670
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
671
                      " failovers should node %s fail" % (node, prinode))
672
          bad = True
673
    return bad
674

    
675
  def CheckPrereq(self):
676
    """Check prerequisites.
677

678
    Transform the list of checks we're going to skip into a set and check that
679
    all its members are valid.
680

681
    """
682
    self.skip_set = frozenset(self.op.skip_checks)
683
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
684
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
685

    
686
  def BuildHooksEnv(self):
687
    """Build hooks env.
688

689
    Cluster-Verify hooks just rone in the post phase and their failure makes
690
    the output be logged in the verify output and the verification to fail.
691

692
    """
693
    all_nodes = self.cfg.GetNodeList()
694
    # TODO: populate the environment with useful information for verify hooks
695
    env = {}
696
    return env, [], all_nodes
697

    
698
  def Exec(self, feedback_fn):
699
    """Verify integrity of cluster, performing various test on nodes.
700

701
    """
702
    bad = False
703
    feedback_fn("* Verifying global settings")
704
    for msg in self.cfg.VerifyConfig():
705
      feedback_fn("  - ERROR: %s" % msg)
706

    
707
    vg_name = self.cfg.GetVGName()
708
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
709
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
710
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
711
    i_non_redundant = [] # Non redundant instances
712
    node_volume = {}
713
    node_instance = {}
714
    node_info = {}
715
    instance_cfg = {}
716

    
717
    # FIXME: verify OS list
718
    # do local checksums
719
    file_names = list(self.sstore.GetFileList())
720
    file_names.append(constants.SSL_CERT_FILE)
721
    file_names.append(constants.CLUSTER_CONF_FILE)
722
    local_checksums = utils.FingerprintFiles(file_names)
723

    
724
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
725
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
726
    all_instanceinfo = rpc.call_instance_list(nodelist)
727
    all_vglist = rpc.call_vg_list(nodelist)
728
    node_verify_param = {
729
      'filelist': file_names,
730
      'nodelist': nodelist,
731
      'hypervisor': None,
732
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
733
                        for node in nodeinfo]
734
      }
735
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
736
    all_rversion = rpc.call_version(nodelist)
737
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
738

    
739
    for node in nodelist:
740
      feedback_fn("* Verifying node %s" % node)
741
      result = self._VerifyNode(node, file_names, local_checksums,
742
                                all_vglist[node], all_nvinfo[node],
743
                                all_rversion[node], feedback_fn)
744
      bad = bad or result
745

    
746
      # node_volume
747
      volumeinfo = all_volumeinfo[node]
748

    
749
      if isinstance(volumeinfo, basestring):
750
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
751
                    (node, volumeinfo[-400:].encode('string_escape')))
752
        bad = True
753
        node_volume[node] = {}
754
      elif not isinstance(volumeinfo, dict):
755
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
756
        bad = True
757
        continue
758
      else:
759
        node_volume[node] = volumeinfo
760

    
761
      # node_instance
762
      nodeinstance = all_instanceinfo[node]
763
      if type(nodeinstance) != list:
764
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
765
        bad = True
766
        continue
767

    
768
      node_instance[node] = nodeinstance
769

    
770
      # node_info
771
      nodeinfo = all_ninfo[node]
772
      if not isinstance(nodeinfo, dict):
773
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
774
        bad = True
775
        continue
776

    
777
      try:
778
        node_info[node] = {
779
          "mfree": int(nodeinfo['memory_free']),
780
          "dfree": int(nodeinfo['vg_free']),
781
          "pinst": [],
782
          "sinst": [],
783
          # dictionary holding all instances this node is secondary for,
784
          # grouped by their primary node. Each key is a cluster node, and each
785
          # value is a list of instances which have the key as primary and the
786
          # current node as secondary.  this is handy to calculate N+1 memory
787
          # availability if you can only failover from a primary to its
788
          # secondary.
789
          "sinst-by-pnode": {},
790
        }
791
      except ValueError:
792
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
793
        bad = True
794
        continue
795

    
796
    node_vol_should = {}
797

    
798
    for instance in instancelist:
799
      feedback_fn("* Verifying instance %s" % instance)
800
      inst_config = self.cfg.GetInstanceInfo(instance)
801
      result =  self._VerifyInstance(instance, inst_config, node_volume,
802
                                     node_instance, feedback_fn)
803
      bad = bad or result
804

    
805
      inst_config.MapLVsByNode(node_vol_should)
806

    
807
      instance_cfg[instance] = inst_config
808

    
809
      pnode = inst_config.primary_node
810
      if pnode in node_info:
811
        node_info[pnode]['pinst'].append(instance)
812
      else:
813
        feedback_fn("  - ERROR: instance %s, connection to primary node"
814
                    " %s failed" % (instance, pnode))
815
        bad = True
816

    
817
      # If the instance is non-redundant we cannot survive losing its primary
818
      # node, so we are not N+1 compliant. On the other hand we have no disk
819
      # templates with more than one secondary so that situation is not well
820
      # supported either.
821
      # FIXME: does not support file-backed instances
822
      if len(inst_config.secondary_nodes) == 0:
823
        i_non_redundant.append(instance)
824
      elif len(inst_config.secondary_nodes) > 1:
825
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
826
                    % instance)
827

    
828
      for snode in inst_config.secondary_nodes:
829
        if snode in node_info:
830
          node_info[snode]['sinst'].append(instance)
831
          if pnode not in node_info[snode]['sinst-by-pnode']:
832
            node_info[snode]['sinst-by-pnode'][pnode] = []
833
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
834
        else:
835
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
836
                      " %s failed" % (instance, snode))
837

    
838
    feedback_fn("* Verifying orphan volumes")
839
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
840
                                       feedback_fn)
841
    bad = bad or result
842

    
843
    feedback_fn("* Verifying remaining instances")
844
    result = self._VerifyOrphanInstances(instancelist, node_instance,
845
                                         feedback_fn)
846
    bad = bad or result
847

    
848
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
849
      feedback_fn("* Verifying N+1 Memory redundancy")
850
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
851
      bad = bad or result
852

    
853
    feedback_fn("* Other Notes")
854
    if i_non_redundant:
855
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
856
                  % len(i_non_redundant))
857

    
858
    return not bad
859

    
860
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
861
    """Analize the post-hooks' result, handle it, and send some
862
    nicely-formatted feedback back to the user.
863

864
    Args:
865
      phase: the hooks phase that has just been run
866
      hooks_results: the results of the multi-node hooks rpc call
867
      feedback_fn: function to send feedback back to the caller
868
      lu_result: previous Exec result
869

870
    """
871
    # We only really run POST phase hooks, and are only interested in
872
    # their results
873
    if phase == constants.HOOKS_PHASE_POST:
874
      # Used to change hooks' output to proper indentation
875
      indent_re = re.compile('^', re.M)
876
      feedback_fn("* Hooks Results")
877
      if not hooks_results:
878
        feedback_fn("  - ERROR: general communication failure")
879
        lu_result = 1
880
      else:
881
        for node_name in hooks_results:
882
          show_node_header = True
883
          res = hooks_results[node_name]
884
          if res is False or not isinstance(res, list):
885
            feedback_fn("    Communication failure")
886
            lu_result = 1
887
            continue
888
          for script, hkr, output in res:
889
            if hkr == constants.HKR_FAIL:
890
              # The node header is only shown once, if there are
891
              # failing hooks on that node
892
              if show_node_header:
893
                feedback_fn("  Node %s:" % node_name)
894
                show_node_header = False
895
              feedback_fn("    ERROR: Script %s failed, output:" % script)
896
              output = indent_re.sub('      ', output)
897
              feedback_fn("%s" % output)
898
              lu_result = 1
899

    
900
      return lu_result
901

    
902

    
903
class LUVerifyDisks(NoHooksLU):
904
  """Verifies the cluster disks status.
905

906
  """
907
  _OP_REQP = []
908

    
909
  def CheckPrereq(self):
910
    """Check prerequisites.
911

912
    This has no prerequisites.
913

914
    """
915
    pass
916

    
917
  def Exec(self, feedback_fn):
918
    """Verify integrity of cluster disks.
919

920
    """
921
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
922

    
923
    vg_name = self.cfg.GetVGName()
924
    nodes = utils.NiceSort(self.cfg.GetNodeList())
925
    instances = [self.cfg.GetInstanceInfo(name)
926
                 for name in self.cfg.GetInstanceList()]
927

    
928
    nv_dict = {}
929
    for inst in instances:
930
      inst_lvs = {}
931
      if (inst.status != "up" or
932
          inst.disk_template not in constants.DTS_NET_MIRROR):
933
        continue
934
      inst.MapLVsByNode(inst_lvs)
935
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
936
      for node, vol_list in inst_lvs.iteritems():
937
        for vol in vol_list:
938
          nv_dict[(node, vol)] = inst
939

    
940
    if not nv_dict:
941
      return result
942

    
943
    node_lvs = rpc.call_volume_list(nodes, vg_name)
944

    
945
    to_act = set()
946
    for node in nodes:
947
      # node_volume
948
      lvs = node_lvs[node]
949

    
950
      if isinstance(lvs, basestring):
951
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
952
        res_nlvm[node] = lvs
953
      elif not isinstance(lvs, dict):
954
        logger.Info("connection to node %s failed or invalid data returned" %
955
                    (node,))
956
        res_nodes.append(node)
957
        continue
958

    
959
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
960
        inst = nv_dict.pop((node, lv_name), None)
961
        if (not lv_online and inst is not None
962
            and inst.name not in res_instances):
963
          res_instances.append(inst.name)
964

    
965
    # any leftover items in nv_dict are missing LVs, let's arrange the
966
    # data better
967
    for key, inst in nv_dict.iteritems():
968
      if inst.name not in res_missing:
969
        res_missing[inst.name] = []
970
      res_missing[inst.name].append(key)
971

    
972
    return result
973

    
974

    
975
class LURenameCluster(LogicalUnit):
976
  """Rename the cluster.
977

978
  """
979
  HPATH = "cluster-rename"
980
  HTYPE = constants.HTYPE_CLUSTER
981
  _OP_REQP = ["name"]
982
  REQ_WSSTORE = True
983

    
984
  def BuildHooksEnv(self):
985
    """Build hooks env.
986

987
    """
988
    env = {
989
      "OP_TARGET": self.sstore.GetClusterName(),
990
      "NEW_NAME": self.op.name,
991
      }
992
    mn = self.sstore.GetMasterNode()
993
    return env, [mn], [mn]
994

    
995
  def CheckPrereq(self):
996
    """Verify that the passed name is a valid one.
997

998
    """
999
    hostname = utils.HostInfo(self.op.name)
1000

    
1001
    new_name = hostname.name
1002
    self.ip = new_ip = hostname.ip
1003
    old_name = self.sstore.GetClusterName()
1004
    old_ip = self.sstore.GetMasterIP()
1005
    if new_name == old_name and new_ip == old_ip:
1006
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1007
                                 " cluster has changed")
1008
    if new_ip != old_ip:
1009
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1010
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1011
                                   " reachable on the network. Aborting." %
1012
                                   new_ip)
1013

    
1014
    self.op.name = new_name
1015

    
1016
  def Exec(self, feedback_fn):
1017
    """Rename the cluster.
1018

1019
    """
1020
    clustername = self.op.name
1021
    ip = self.ip
1022
    ss = self.sstore
1023

    
1024
    # shutdown the master IP
1025
    master = ss.GetMasterNode()
1026
    if not rpc.call_node_stop_master(master, False):
1027
      raise errors.OpExecError("Could not disable the master role")
1028

    
1029
    try:
1030
      # modify the sstore
1031
      ss.SetKey(ss.SS_MASTER_IP, ip)
1032
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1033

    
1034
      # Distribute updated ss config to all nodes
1035
      myself = self.cfg.GetNodeInfo(master)
1036
      dist_nodes = self.cfg.GetNodeList()
1037
      if myself.name in dist_nodes:
1038
        dist_nodes.remove(myself.name)
1039

    
1040
      logger.Debug("Copying updated ssconf data to all nodes")
1041
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1042
        fname = ss.KeyToFilename(keyname)
1043
        result = rpc.call_upload_file(dist_nodes, fname)
1044
        for to_node in dist_nodes:
1045
          if not result[to_node]:
1046
            logger.Error("copy of file %s to node %s failed" %
1047
                         (fname, to_node))
1048
    finally:
1049
      if not rpc.call_node_start_master(master, False):
1050
        logger.Error("Could not re-enable the master role on the master,"
1051
                     " please restart manually.")
1052

    
1053

    
1054
def _RecursiveCheckIfLVMBased(disk):
1055
  """Check if the given disk or its children are lvm-based.
1056

1057
  Args:
1058
    disk: ganeti.objects.Disk object
1059

1060
  Returns:
1061
    boolean indicating whether a LD_LV dev_type was found or not
1062

1063
  """
1064
  if disk.children:
1065
    for chdisk in disk.children:
1066
      if _RecursiveCheckIfLVMBased(chdisk):
1067
        return True
1068
  return disk.dev_type == constants.LD_LV
1069

    
1070

    
1071
class LUSetClusterParams(LogicalUnit):
1072
  """Change the parameters of the cluster.
1073

1074
  """
1075
  HPATH = "cluster-modify"
1076
  HTYPE = constants.HTYPE_CLUSTER
1077
  _OP_REQP = []
1078

    
1079
  def BuildHooksEnv(self):
1080
    """Build hooks env.
1081

1082
    """
1083
    env = {
1084
      "OP_TARGET": self.sstore.GetClusterName(),
1085
      "NEW_VG_NAME": self.op.vg_name,
1086
      }
1087
    mn = self.sstore.GetMasterNode()
1088
    return env, [mn], [mn]
1089

    
1090
  def CheckPrereq(self):
1091
    """Check prerequisites.
1092

1093
    This checks whether the given params don't conflict and
1094
    if the given volume group is valid.
1095

1096
    """
1097
    if not self.op.vg_name:
1098
      instances = [self.cfg.GetInstanceInfo(name)
1099
                   for name in self.cfg.GetInstanceList()]
1100
      for inst in instances:
1101
        for disk in inst.disks:
1102
          if _RecursiveCheckIfLVMBased(disk):
1103
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1104
                                       " lvm-based instances exist")
1105

    
1106
    # if vg_name not None, checks given volume group on all nodes
1107
    if self.op.vg_name:
1108
      node_list = self.cfg.GetNodeList()
1109
      vglist = rpc.call_vg_list(node_list)
1110
      for node in node_list:
1111
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1112
                                              constants.MIN_VG_SIZE)
1113
        if vgstatus:
1114
          raise errors.OpPrereqError("Error on node '%s': %s" %
1115
                                     (node, vgstatus))
1116

    
1117
  def Exec(self, feedback_fn):
1118
    """Change the parameters of the cluster.
1119

1120
    """
1121
    if self.op.vg_name != self.cfg.GetVGName():
1122
      self.cfg.SetVGName(self.op.vg_name)
1123
    else:
1124
      feedback_fn("Cluster LVM configuration already in desired"
1125
                  " state, not changing")
1126

    
1127

    
1128
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1129
  """Sleep and poll for an instance's disk to sync.
1130

1131
  """
1132
  if not instance.disks:
1133
    return True
1134

    
1135
  if not oneshot:
1136
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1137

    
1138
  node = instance.primary_node
1139

    
1140
  for dev in instance.disks:
1141
    cfgw.SetDiskID(dev, node)
1142

    
1143
  retries = 0
1144
  while True:
1145
    max_time = 0
1146
    done = True
1147
    cumul_degraded = False
1148
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1149
    if not rstats:
1150
      proc.LogWarning("Can't get any data from node %s" % node)
1151
      retries += 1
1152
      if retries >= 10:
1153
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1154
                                 " aborting." % node)
1155
      time.sleep(6)
1156
      continue
1157
    retries = 0
1158
    for i in range(len(rstats)):
1159
      mstat = rstats[i]
1160
      if mstat is None:
1161
        proc.LogWarning("Can't compute data for node %s/%s" %
1162
                        (node, instance.disks[i].iv_name))
1163
        continue
1164
      # we ignore the ldisk parameter
1165
      perc_done, est_time, is_degraded, _ = mstat
1166
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1167
      if perc_done is not None:
1168
        done = False
1169
        if est_time is not None:
1170
          rem_time = "%d estimated seconds remaining" % est_time
1171
          max_time = est_time
1172
        else:
1173
          rem_time = "no time estimate"
1174
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1175
                     (instance.disks[i].iv_name, perc_done, rem_time))
1176
    if done or oneshot:
1177
      break
1178

    
1179
    time.sleep(min(60, max_time))
1180

    
1181
  if done:
1182
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1183
  return not cumul_degraded
1184

    
1185

    
1186
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1187
  """Check that mirrors are not degraded.
1188

1189
  The ldisk parameter, if True, will change the test from the
1190
  is_degraded attribute (which represents overall non-ok status for
1191
  the device(s)) to the ldisk (representing the local storage status).
1192

1193
  """
1194
  cfgw.SetDiskID(dev, node)
1195
  if ldisk:
1196
    idx = 6
1197
  else:
1198
    idx = 5
1199

    
1200
  result = True
1201
  if on_primary or dev.AssembleOnSecondary():
1202
    rstats = rpc.call_blockdev_find(node, dev)
1203
    if not rstats:
1204
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1205
      result = False
1206
    else:
1207
      result = result and (not rstats[idx])
1208
  if dev.children:
1209
    for child in dev.children:
1210
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1211

    
1212
  return result
1213

    
1214

    
1215
class LUDiagnoseOS(NoHooksLU):
1216
  """Logical unit for OS diagnose/query.
1217

1218
  """
1219
  _OP_REQP = ["output_fields", "names"]
1220

    
1221
  def CheckPrereq(self):
1222
    """Check prerequisites.
1223

1224
    This always succeeds, since this is a pure query LU.
1225

1226
    """
1227
    if self.op.names:
1228
      raise errors.OpPrereqError("Selective OS query not supported")
1229

    
1230
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1231
    _CheckOutputFields(static=[],
1232
                       dynamic=self.dynamic_fields,
1233
                       selected=self.op.output_fields)
1234

    
1235
  @staticmethod
1236
  def _DiagnoseByOS(node_list, rlist):
1237
    """Remaps a per-node return list into an a per-os per-node dictionary
1238

1239
      Args:
1240
        node_list: a list with the names of all nodes
1241
        rlist: a map with node names as keys and OS objects as values
1242

1243
      Returns:
1244
        map: a map with osnames as keys and as value another map, with
1245
             nodes as
1246
             keys and list of OS objects as values
1247
             e.g. {"debian-etch": {"node1": [<object>,...],
1248
                                   "node2": [<object>,]}
1249
                  }
1250

1251
    """
1252
    all_os = {}
1253
    for node_name, nr in rlist.iteritems():
1254
      if not nr:
1255
        continue
1256
      for os_obj in nr:
1257
        if os_obj.name not in all_os:
1258
          # build a list of nodes for this os containing empty lists
1259
          # for each node in node_list
1260
          all_os[os_obj.name] = {}
1261
          for nname in node_list:
1262
            all_os[os_obj.name][nname] = []
1263
        all_os[os_obj.name][node_name].append(os_obj)
1264
    return all_os
1265

    
1266
  def Exec(self, feedback_fn):
1267
    """Compute the list of OSes.
1268

1269
    """
1270
    node_list = self.cfg.GetNodeList()
1271
    node_data = rpc.call_os_diagnose(node_list)
1272
    if node_data == False:
1273
      raise errors.OpExecError("Can't gather the list of OSes")
1274
    pol = self._DiagnoseByOS(node_list, node_data)
1275
    output = []
1276
    for os_name, os_data in pol.iteritems():
1277
      row = []
1278
      for field in self.op.output_fields:
1279
        if field == "name":
1280
          val = os_name
1281
        elif field == "valid":
1282
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1283
        elif field == "node_status":
1284
          val = {}
1285
          for node_name, nos_list in os_data.iteritems():
1286
            val[node_name] = [(v.status, v.path) for v in nos_list]
1287
        else:
1288
          raise errors.ParameterError(field)
1289
        row.append(val)
1290
      output.append(row)
1291

    
1292
    return output
1293

    
1294

    
1295
class LURemoveNode(LogicalUnit):
1296
  """Logical unit for removing a node.
1297

1298
  """
1299
  HPATH = "node-remove"
1300
  HTYPE = constants.HTYPE_NODE
1301
  _OP_REQP = ["node_name"]
1302

    
1303
  def BuildHooksEnv(self):
1304
    """Build hooks env.
1305

1306
    This doesn't run on the target node in the pre phase as a failed
1307
    node would then be impossible to remove.
1308

1309
    """
1310
    env = {
1311
      "OP_TARGET": self.op.node_name,
1312
      "NODE_NAME": self.op.node_name,
1313
      }
1314
    all_nodes = self.cfg.GetNodeList()
1315
    all_nodes.remove(self.op.node_name)
1316
    return env, all_nodes, all_nodes
1317

    
1318
  def CheckPrereq(self):
1319
    """Check prerequisites.
1320

1321
    This checks:
1322
     - the node exists in the configuration
1323
     - it does not have primary or secondary instances
1324
     - it's not the master
1325

1326
    Any errors are signalled by raising errors.OpPrereqError.
1327

1328
    """
1329
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1330
    if node is None:
1331
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1332

    
1333
    instance_list = self.cfg.GetInstanceList()
1334

    
1335
    masternode = self.sstore.GetMasterNode()
1336
    if node.name == masternode:
1337
      raise errors.OpPrereqError("Node is the master node,"
1338
                                 " you need to failover first.")
1339

    
1340
    for instance_name in instance_list:
1341
      instance = self.cfg.GetInstanceInfo(instance_name)
1342
      if node.name == instance.primary_node:
1343
        raise errors.OpPrereqError("Instance %s still running on the node,"
1344
                                   " please remove first." % instance_name)
1345
      if node.name in instance.secondary_nodes:
1346
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1347
                                   " please remove first." % instance_name)
1348
    self.op.node_name = node.name
1349
    self.node = node
1350

    
1351
  def Exec(self, feedback_fn):
1352
    """Removes the node from the cluster.
1353

1354
    """
1355
    node = self.node
1356
    logger.Info("stopping the node daemon and removing configs from node %s" %
1357
                node.name)
1358

    
1359
    self.context.RemoveNode(node.name)
1360

    
1361
    rpc.call_node_leave_cluster(node.name)
1362

    
1363

    
1364
class LUQueryNodes(NoHooksLU):
1365
  """Logical unit for querying nodes.
1366

1367
  """
1368
  _OP_REQP = ["output_fields", "names"]
1369
  REQ_BGL = False
1370

    
1371
  def ExpandNames(self):
1372
    self.dynamic_fields = frozenset([
1373
      "dtotal", "dfree",
1374
      "mtotal", "mnode", "mfree",
1375
      "bootid",
1376
      "ctotal",
1377
      ])
1378

    
1379
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1380
                               "pinst_list", "sinst_list",
1381
                               "pip", "sip", "tags"],
1382
                       dynamic=self.dynamic_fields,
1383
                       selected=self.op.output_fields)
1384

    
1385
    self.needed_locks = {}
1386
    self.share_locks[locking.LEVEL_NODE] = 1
1387
    # TODO: we could lock nodes only if the user asked for dynamic fields. For
1388
    # that we need atomic ways to get info for a group of nodes from the
1389
    # config, though.
1390
    if not self.op.names:
1391
      self.needed_locks[locking.LEVEL_NODE] = None
1392
    else:
1393
       self.needed_locks[locking.LEVEL_NODE] = \
1394
         _GetWantedNodes(self, self.op.names)
1395

    
1396
  def CheckPrereq(self):
1397
    """Check prerequisites.
1398

1399
    """
1400
    # This of course is valid only if we locked the nodes
1401
    self.wanted = self.needed_locks[locking.LEVEL_NODE]
1402

    
1403
  def Exec(self, feedback_fn):
1404
    """Computes the list of nodes and their attributes.
1405

1406
    """
1407
    nodenames = self.wanted
1408
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1409

    
1410
    # begin data gathering
1411

    
1412
    if self.dynamic_fields.intersection(self.op.output_fields):
1413
      live_data = {}
1414
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1415
      for name in nodenames:
1416
        nodeinfo = node_data.get(name, None)
1417
        if nodeinfo:
1418
          live_data[name] = {
1419
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1420
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1421
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1422
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1423
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1424
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1425
            "bootid": nodeinfo['bootid'],
1426
            }
1427
        else:
1428
          live_data[name] = {}
1429
    else:
1430
      live_data = dict.fromkeys(nodenames, {})
1431

    
1432
    node_to_primary = dict([(name, set()) for name in nodenames])
1433
    node_to_secondary = dict([(name, set()) for name in nodenames])
1434

    
1435
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1436
                             "sinst_cnt", "sinst_list"))
1437
    if inst_fields & frozenset(self.op.output_fields):
1438
      instancelist = self.cfg.GetInstanceList()
1439

    
1440
      for instance_name in instancelist:
1441
        inst = self.cfg.GetInstanceInfo(instance_name)
1442
        if inst.primary_node in node_to_primary:
1443
          node_to_primary[inst.primary_node].add(inst.name)
1444
        for secnode in inst.secondary_nodes:
1445
          if secnode in node_to_secondary:
1446
            node_to_secondary[secnode].add(inst.name)
1447

    
1448
    # end data gathering
1449

    
1450
    output = []
1451
    for node in nodelist:
1452
      node_output = []
1453
      for field in self.op.output_fields:
1454
        if field == "name":
1455
          val = node.name
1456
        elif field == "pinst_list":
1457
          val = list(node_to_primary[node.name])
1458
        elif field == "sinst_list":
1459
          val = list(node_to_secondary[node.name])
1460
        elif field == "pinst_cnt":
1461
          val = len(node_to_primary[node.name])
1462
        elif field == "sinst_cnt":
1463
          val = len(node_to_secondary[node.name])
1464
        elif field == "pip":
1465
          val = node.primary_ip
1466
        elif field == "sip":
1467
          val = node.secondary_ip
1468
        elif field == "tags":
1469
          val = list(node.GetTags())
1470
        elif field in self.dynamic_fields:
1471
          val = live_data[node.name].get(field, None)
1472
        else:
1473
          raise errors.ParameterError(field)
1474
        node_output.append(val)
1475
      output.append(node_output)
1476

    
1477
    return output
1478

    
1479

    
1480
class LUQueryNodeVolumes(NoHooksLU):
1481
  """Logical unit for getting volumes on node(s).
1482

1483
  """
1484
  _OP_REQP = ["nodes", "output_fields"]
1485

    
1486
  def CheckPrereq(self):
1487
    """Check prerequisites.
1488

1489
    This checks that the fields required are valid output fields.
1490

1491
    """
1492
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1493

    
1494
    _CheckOutputFields(static=["node"],
1495
                       dynamic=["phys", "vg", "name", "size", "instance"],
1496
                       selected=self.op.output_fields)
1497

    
1498

    
1499
  def Exec(self, feedback_fn):
1500
    """Computes the list of nodes and their attributes.
1501

1502
    """
1503
    nodenames = self.nodes
1504
    volumes = rpc.call_node_volumes(nodenames)
1505

    
1506
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1507
             in self.cfg.GetInstanceList()]
1508

    
1509
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1510

    
1511
    output = []
1512
    for node in nodenames:
1513
      if node not in volumes or not volumes[node]:
1514
        continue
1515

    
1516
      node_vols = volumes[node][:]
1517
      node_vols.sort(key=lambda vol: vol['dev'])
1518

    
1519
      for vol in node_vols:
1520
        node_output = []
1521
        for field in self.op.output_fields:
1522
          if field == "node":
1523
            val = node
1524
          elif field == "phys":
1525
            val = vol['dev']
1526
          elif field == "vg":
1527
            val = vol['vg']
1528
          elif field == "name":
1529
            val = vol['name']
1530
          elif field == "size":
1531
            val = int(float(vol['size']))
1532
          elif field == "instance":
1533
            for inst in ilist:
1534
              if node not in lv_by_node[inst]:
1535
                continue
1536
              if vol['name'] in lv_by_node[inst][node]:
1537
                val = inst.name
1538
                break
1539
            else:
1540
              val = '-'
1541
          else:
1542
            raise errors.ParameterError(field)
1543
          node_output.append(str(val))
1544

    
1545
        output.append(node_output)
1546

    
1547
    return output
1548

    
1549

    
1550
class LUAddNode(LogicalUnit):
1551
  """Logical unit for adding node to the cluster.
1552

1553
  """
1554
  HPATH = "node-add"
1555
  HTYPE = constants.HTYPE_NODE
1556
  _OP_REQP = ["node_name"]
1557

    
1558
  def BuildHooksEnv(self):
1559
    """Build hooks env.
1560

1561
    This will run on all nodes before, and on all nodes + the new node after.
1562

1563
    """
1564
    env = {
1565
      "OP_TARGET": self.op.node_name,
1566
      "NODE_NAME": self.op.node_name,
1567
      "NODE_PIP": self.op.primary_ip,
1568
      "NODE_SIP": self.op.secondary_ip,
1569
      }
1570
    nodes_0 = self.cfg.GetNodeList()
1571
    nodes_1 = nodes_0 + [self.op.node_name, ]
1572
    return env, nodes_0, nodes_1
1573

    
1574
  def CheckPrereq(self):
1575
    """Check prerequisites.
1576

1577
    This checks:
1578
     - the new node is not already in the config
1579
     - it is resolvable
1580
     - its parameters (single/dual homed) matches the cluster
1581

1582
    Any errors are signalled by raising errors.OpPrereqError.
1583

1584
    """
1585
    node_name = self.op.node_name
1586
    cfg = self.cfg
1587

    
1588
    dns_data = utils.HostInfo(node_name)
1589

    
1590
    node = dns_data.name
1591
    primary_ip = self.op.primary_ip = dns_data.ip
1592
    secondary_ip = getattr(self.op, "secondary_ip", None)
1593
    if secondary_ip is None:
1594
      secondary_ip = primary_ip
1595
    if not utils.IsValidIP(secondary_ip):
1596
      raise errors.OpPrereqError("Invalid secondary IP given")
1597
    self.op.secondary_ip = secondary_ip
1598

    
1599
    node_list = cfg.GetNodeList()
1600
    if not self.op.readd and node in node_list:
1601
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1602
                                 node)
1603
    elif self.op.readd and node not in node_list:
1604
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1605

    
1606
    for existing_node_name in node_list:
1607
      existing_node = cfg.GetNodeInfo(existing_node_name)
1608

    
1609
      if self.op.readd and node == existing_node_name:
1610
        if (existing_node.primary_ip != primary_ip or
1611
            existing_node.secondary_ip != secondary_ip):
1612
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1613
                                     " address configuration as before")
1614
        continue
1615

    
1616
      if (existing_node.primary_ip == primary_ip or
1617
          existing_node.secondary_ip == primary_ip or
1618
          existing_node.primary_ip == secondary_ip or
1619
          existing_node.secondary_ip == secondary_ip):
1620
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1621
                                   " existing node %s" % existing_node.name)
1622

    
1623
    # check that the type of the node (single versus dual homed) is the
1624
    # same as for the master
1625
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1626
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1627
    newbie_singlehomed = secondary_ip == primary_ip
1628
    if master_singlehomed != newbie_singlehomed:
1629
      if master_singlehomed:
1630
        raise errors.OpPrereqError("The master has no private ip but the"
1631
                                   " new node has one")
1632
      else:
1633
        raise errors.OpPrereqError("The master has a private ip but the"
1634
                                   " new node doesn't have one")
1635

    
1636
    # checks reachablity
1637
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1638
      raise errors.OpPrereqError("Node not reachable by ping")
1639

    
1640
    if not newbie_singlehomed:
1641
      # check reachability from my secondary ip to newbie's secondary ip
1642
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1643
                           source=myself.secondary_ip):
1644
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1645
                                   " based ping to noded port")
1646

    
1647
    self.new_node = objects.Node(name=node,
1648
                                 primary_ip=primary_ip,
1649
                                 secondary_ip=secondary_ip)
1650

    
1651
  def Exec(self, feedback_fn):
1652
    """Adds the new node to the cluster.
1653

1654
    """
1655
    new_node = self.new_node
1656
    node = new_node.name
1657

    
1658
    # check connectivity
1659
    result = rpc.call_version([node])[node]
1660
    if result:
1661
      if constants.PROTOCOL_VERSION == result:
1662
        logger.Info("communication to node %s fine, sw version %s match" %
1663
                    (node, result))
1664
      else:
1665
        raise errors.OpExecError("Version mismatch master version %s,"
1666
                                 " node version %s" %
1667
                                 (constants.PROTOCOL_VERSION, result))
1668
    else:
1669
      raise errors.OpExecError("Cannot get version from the new node")
1670

    
1671
    # setup ssh on node
1672
    logger.Info("copy ssh key to node %s" % node)
1673
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1674
    keyarray = []
1675
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1676
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1677
                priv_key, pub_key]
1678

    
1679
    for i in keyfiles:
1680
      f = open(i, 'r')
1681
      try:
1682
        keyarray.append(f.read())
1683
      finally:
1684
        f.close()
1685

    
1686
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1687
                               keyarray[3], keyarray[4], keyarray[5])
1688

    
1689
    if not result:
1690
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1691

    
1692
    # Add node to our /etc/hosts, and add key to known_hosts
1693
    utils.AddHostToEtcHosts(new_node.name)
1694

    
1695
    if new_node.secondary_ip != new_node.primary_ip:
1696
      if not rpc.call_node_tcp_ping(new_node.name,
1697
                                    constants.LOCALHOST_IP_ADDRESS,
1698
                                    new_node.secondary_ip,
1699
                                    constants.DEFAULT_NODED_PORT,
1700
                                    10, False):
1701
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1702
                                 " you gave (%s). Please fix and re-run this"
1703
                                 " command." % new_node.secondary_ip)
1704

    
1705
    node_verify_list = [self.sstore.GetMasterNode()]
1706
    node_verify_param = {
1707
      'nodelist': [node],
1708
      # TODO: do a node-net-test as well?
1709
    }
1710

    
1711
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1712
    for verifier in node_verify_list:
1713
      if not result[verifier]:
1714
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1715
                                 " for remote verification" % verifier)
1716
      if result[verifier]['nodelist']:
1717
        for failed in result[verifier]['nodelist']:
1718
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1719
                      (verifier, result[verifier]['nodelist'][failed]))
1720
        raise errors.OpExecError("ssh/hostname verification failed.")
1721

    
1722
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1723
    # including the node just added
1724
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1725
    dist_nodes = self.cfg.GetNodeList()
1726
    if not self.op.readd:
1727
      dist_nodes.append(node)
1728
    if myself.name in dist_nodes:
1729
      dist_nodes.remove(myself.name)
1730

    
1731
    logger.Debug("Copying hosts and known_hosts to all nodes")
1732
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1733
      result = rpc.call_upload_file(dist_nodes, fname)
1734
      for to_node in dist_nodes:
1735
        if not result[to_node]:
1736
          logger.Error("copy of file %s to node %s failed" %
1737
                       (fname, to_node))
1738

    
1739
    to_copy = self.sstore.GetFileList()
1740
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1741
      to_copy.append(constants.VNC_PASSWORD_FILE)
1742
    for fname in to_copy:
1743
      result = rpc.call_upload_file([node], fname)
1744
      if not result[node]:
1745
        logger.Error("could not copy file %s to node %s" % (fname, node))
1746

    
1747
    if self.op.readd:
1748
      self.context.ReaddNode(new_node)
1749
    else:
1750
      self.context.AddNode(new_node)
1751

    
1752

    
1753
class LUQueryClusterInfo(NoHooksLU):
1754
  """Query cluster configuration.
1755

1756
  """
1757
  _OP_REQP = []
1758
  REQ_MASTER = False
1759
  REQ_BGL = False
1760

    
1761
  def ExpandNames(self):
1762
    self.needed_locks = {}
1763

    
1764
  def CheckPrereq(self):
1765
    """No prerequsites needed for this LU.
1766

1767
    """
1768
    pass
1769

    
1770
  def Exec(self, feedback_fn):
1771
    """Return cluster config.
1772

1773
    """
1774
    result = {
1775
      "name": self.sstore.GetClusterName(),
1776
      "software_version": constants.RELEASE_VERSION,
1777
      "protocol_version": constants.PROTOCOL_VERSION,
1778
      "config_version": constants.CONFIG_VERSION,
1779
      "os_api_version": constants.OS_API_VERSION,
1780
      "export_version": constants.EXPORT_VERSION,
1781
      "master": self.sstore.GetMasterNode(),
1782
      "architecture": (platform.architecture()[0], platform.machine()),
1783
      "hypervisor_type": self.sstore.GetHypervisorType(),
1784
      }
1785

    
1786
    return result
1787

    
1788

    
1789
class LUDumpClusterConfig(NoHooksLU):
1790
  """Return a text-representation of the cluster-config.
1791

1792
  """
1793
  _OP_REQP = []
1794
  REQ_BGL = False
1795

    
1796
  def ExpandNames(self):
1797
    self.needed_locks = {}
1798

    
1799
  def CheckPrereq(self):
1800
    """No prerequisites.
1801

1802
    """
1803
    pass
1804

    
1805
  def Exec(self, feedback_fn):
1806
    """Dump a representation of the cluster config to the standard output.
1807

1808
    """
1809
    return self.cfg.DumpConfig()
1810

    
1811

    
1812
class LUActivateInstanceDisks(NoHooksLU):
1813
  """Bring up an instance's disks.
1814

1815
  """
1816
  _OP_REQP = ["instance_name"]
1817

    
1818
  def CheckPrereq(self):
1819
    """Check prerequisites.
1820

1821
    This checks that the instance is in the cluster.
1822

1823
    """
1824
    instance = self.cfg.GetInstanceInfo(
1825
      self.cfg.ExpandInstanceName(self.op.instance_name))
1826
    if instance is None:
1827
      raise errors.OpPrereqError("Instance '%s' not known" %
1828
                                 self.op.instance_name)
1829
    self.instance = instance
1830

    
1831

    
1832
  def Exec(self, feedback_fn):
1833
    """Activate the disks.
1834

1835
    """
1836
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1837
    if not disks_ok:
1838
      raise errors.OpExecError("Cannot activate block devices")
1839

    
1840
    return disks_info
1841

    
1842

    
1843
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1844
  """Prepare the block devices for an instance.
1845

1846
  This sets up the block devices on all nodes.
1847

1848
  Args:
1849
    instance: a ganeti.objects.Instance object
1850
    ignore_secondaries: if true, errors on secondary nodes won't result
1851
                        in an error return from the function
1852

1853
  Returns:
1854
    false if the operation failed
1855
    list of (host, instance_visible_name, node_visible_name) if the operation
1856
         suceeded with the mapping from node devices to instance devices
1857
  """
1858
  device_info = []
1859
  disks_ok = True
1860
  iname = instance.name
1861
  # With the two passes mechanism we try to reduce the window of
1862
  # opportunity for the race condition of switching DRBD to primary
1863
  # before handshaking occured, but we do not eliminate it
1864

    
1865
  # The proper fix would be to wait (with some limits) until the
1866
  # connection has been made and drbd transitions from WFConnection
1867
  # into any other network-connected state (Connected, SyncTarget,
1868
  # SyncSource, etc.)
1869

    
1870
  # 1st pass, assemble on all nodes in secondary mode
1871
  for inst_disk in instance.disks:
1872
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1873
      cfg.SetDiskID(node_disk, node)
1874
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1875
      if not result:
1876
        logger.Error("could not prepare block device %s on node %s"
1877
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1878
        if not ignore_secondaries:
1879
          disks_ok = False
1880

    
1881
  # FIXME: race condition on drbd migration to primary
1882

    
1883
  # 2nd pass, do only the primary node
1884
  for inst_disk in instance.disks:
1885
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1886
      if node != instance.primary_node:
1887
        continue
1888
      cfg.SetDiskID(node_disk, node)
1889
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1890
      if not result:
1891
        logger.Error("could not prepare block device %s on node %s"
1892
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1893
        disks_ok = False
1894
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1895

    
1896
  # leave the disks configured for the primary node
1897
  # this is a workaround that would be fixed better by
1898
  # improving the logical/physical id handling
1899
  for disk in instance.disks:
1900
    cfg.SetDiskID(disk, instance.primary_node)
1901

    
1902
  return disks_ok, device_info
1903

    
1904

    
1905
def _StartInstanceDisks(cfg, instance, force):
1906
  """Start the disks of an instance.
1907

1908
  """
1909
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1910
                                           ignore_secondaries=force)
1911
  if not disks_ok:
1912
    _ShutdownInstanceDisks(instance, cfg)
1913
    if force is not None and not force:
1914
      logger.Error("If the message above refers to a secondary node,"
1915
                   " you can retry the operation using '--force'.")
1916
    raise errors.OpExecError("Disk consistency error")
1917

    
1918

    
1919
class LUDeactivateInstanceDisks(NoHooksLU):
1920
  """Shutdown an instance's disks.
1921

1922
  """
1923
  _OP_REQP = ["instance_name"]
1924

    
1925
  def CheckPrereq(self):
1926
    """Check prerequisites.
1927

1928
    This checks that the instance is in the cluster.
1929

1930
    """
1931
    instance = self.cfg.GetInstanceInfo(
1932
      self.cfg.ExpandInstanceName(self.op.instance_name))
1933
    if instance is None:
1934
      raise errors.OpPrereqError("Instance '%s' not known" %
1935
                                 self.op.instance_name)
1936
    self.instance = instance
1937

    
1938
  def Exec(self, feedback_fn):
1939
    """Deactivate the disks
1940

1941
    """
1942
    instance = self.instance
1943
    ins_l = rpc.call_instance_list([instance.primary_node])
1944
    ins_l = ins_l[instance.primary_node]
1945
    if not type(ins_l) is list:
1946
      raise errors.OpExecError("Can't contact node '%s'" %
1947
                               instance.primary_node)
1948

    
1949
    if self.instance.name in ins_l:
1950
      raise errors.OpExecError("Instance is running, can't shutdown"
1951
                               " block devices.")
1952

    
1953
    _ShutdownInstanceDisks(instance, self.cfg)
1954

    
1955

    
1956
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1957
  """Shutdown block devices of an instance.
1958

1959
  This does the shutdown on all nodes of the instance.
1960

1961
  If the ignore_primary is false, errors on the primary node are
1962
  ignored.
1963

1964
  """
1965
  result = True
1966
  for disk in instance.disks:
1967
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1968
      cfg.SetDiskID(top_disk, node)
1969
      if not rpc.call_blockdev_shutdown(node, top_disk):
1970
        logger.Error("could not shutdown block device %s on node %s" %
1971
                     (disk.iv_name, node))
1972
        if not ignore_primary or node != instance.primary_node:
1973
          result = False
1974
  return result
1975

    
1976

    
1977
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1978
  """Checks if a node has enough free memory.
1979

1980
  This function check if a given node has the needed amount of free
1981
  memory. In case the node has less memory or we cannot get the
1982
  information from the node, this function raise an OpPrereqError
1983
  exception.
1984

1985
  Args:
1986
    - cfg: a ConfigWriter instance
1987
    - node: the node name
1988
    - reason: string to use in the error message
1989
    - requested: the amount of memory in MiB
1990

1991
  """
1992
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1993
  if not nodeinfo or not isinstance(nodeinfo, dict):
1994
    raise errors.OpPrereqError("Could not contact node %s for resource"
1995
                             " information" % (node,))
1996

    
1997
  free_mem = nodeinfo[node].get('memory_free')
1998
  if not isinstance(free_mem, int):
1999
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2000
                             " was '%s'" % (node, free_mem))
2001
  if requested > free_mem:
2002
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2003
                             " needed %s MiB, available %s MiB" %
2004
                             (node, reason, requested, free_mem))
2005

    
2006

    
2007
class LUStartupInstance(LogicalUnit):
2008
  """Starts an instance.
2009

2010
  """
2011
  HPATH = "instance-start"
2012
  HTYPE = constants.HTYPE_INSTANCE
2013
  _OP_REQP = ["instance_name", "force"]
2014
  REQ_BGL = False
2015

    
2016
  def ExpandNames(self):
2017
    self._ExpandAndLockInstance()
2018
    self.needed_locks[locking.LEVEL_NODE] = []
2019
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2020

    
2021
  def DeclareLocks(self, level):
2022
    if level == locking.LEVEL_NODE:
2023
      self._LockInstancesNodes()
2024

    
2025
  def BuildHooksEnv(self):
2026
    """Build hooks env.
2027

2028
    This runs on master, primary and secondary nodes of the instance.
2029

2030
    """
2031
    env = {
2032
      "FORCE": self.op.force,
2033
      }
2034
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2035
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2036
          list(self.instance.secondary_nodes))
2037
    return env, nl, nl
2038

    
2039
  def CheckPrereq(self):
2040
    """Check prerequisites.
2041

2042
    This checks that the instance is in the cluster.
2043

2044
    """
2045
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2046
    assert self.instance is not None, \
2047
      "Cannot retrieve locked instance %s" % self.op.instance_name
2048

    
2049
    # check bridges existance
2050
    _CheckInstanceBridgesExist(instance)
2051

    
2052
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2053
                         "starting instance %s" % instance.name,
2054
                         instance.memory)
2055

    
2056
  def Exec(self, feedback_fn):
2057
    """Start the instance.
2058

2059
    """
2060
    instance = self.instance
2061
    force = self.op.force
2062
    extra_args = getattr(self.op, "extra_args", "")
2063

    
2064
    self.cfg.MarkInstanceUp(instance.name)
2065

    
2066
    node_current = instance.primary_node
2067

    
2068
    _StartInstanceDisks(self.cfg, instance, force)
2069

    
2070
    if not rpc.call_instance_start(node_current, instance, extra_args):
2071
      _ShutdownInstanceDisks(instance, self.cfg)
2072
      raise errors.OpExecError("Could not start instance")
2073

    
2074

    
2075
class LURebootInstance(LogicalUnit):
2076
  """Reboot an instance.
2077

2078
  """
2079
  HPATH = "instance-reboot"
2080
  HTYPE = constants.HTYPE_INSTANCE
2081
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2082
  REQ_BGL = False
2083

    
2084
  def ExpandNames(self):
2085
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2086
                                   constants.INSTANCE_REBOOT_HARD,
2087
                                   constants.INSTANCE_REBOOT_FULL]:
2088
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2089
                                  (constants.INSTANCE_REBOOT_SOFT,
2090
                                   constants.INSTANCE_REBOOT_HARD,
2091
                                   constants.INSTANCE_REBOOT_FULL))
2092
    self._ExpandAndLockInstance()
2093
    self.needed_locks[locking.LEVEL_NODE] = []
2094
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2095

    
2096
  def DeclareLocks(self, level):
2097
    if level == locking.LEVEL_NODE:
2098
      # FIXME: lock only primary on (not constants.INSTANCE_REBOOT_FULL)
2099
      self._LockInstancesNodes()
2100

    
2101
  def BuildHooksEnv(self):
2102
    """Build hooks env.
2103

2104
    This runs on master, primary and secondary nodes of the instance.
2105

2106
    """
2107
    env = {
2108
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2109
      }
2110
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2111
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2112
          list(self.instance.secondary_nodes))
2113
    return env, nl, nl
2114

    
2115
  def CheckPrereq(self):
2116
    """Check prerequisites.
2117

2118
    This checks that the instance is in the cluster.
2119

2120
    """
2121
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2122
    assert self.instance is not None, \
2123
      "Cannot retrieve locked instance %s" % self.op.instance_name
2124

    
2125
    # check bridges existance
2126
    _CheckInstanceBridgesExist(instance)
2127

    
2128
  def Exec(self, feedback_fn):
2129
    """Reboot the instance.
2130

2131
    """
2132
    instance = self.instance
2133
    ignore_secondaries = self.op.ignore_secondaries
2134
    reboot_type = self.op.reboot_type
2135
    extra_args = getattr(self.op, "extra_args", "")
2136

    
2137
    node_current = instance.primary_node
2138

    
2139
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2140
                       constants.INSTANCE_REBOOT_HARD]:
2141
      if not rpc.call_instance_reboot(node_current, instance,
2142
                                      reboot_type, extra_args):
2143
        raise errors.OpExecError("Could not reboot instance")
2144
    else:
2145
      if not rpc.call_instance_shutdown(node_current, instance):
2146
        raise errors.OpExecError("could not shutdown instance for full reboot")
2147
      _ShutdownInstanceDisks(instance, self.cfg)
2148
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2149
      if not rpc.call_instance_start(node_current, instance, extra_args):
2150
        _ShutdownInstanceDisks(instance, self.cfg)
2151
        raise errors.OpExecError("Could not start instance for full reboot")
2152

    
2153
    self.cfg.MarkInstanceUp(instance.name)
2154

    
2155

    
2156
class LUShutdownInstance(LogicalUnit):
2157
  """Shutdown an instance.
2158

2159
  """
2160
  HPATH = "instance-stop"
2161
  HTYPE = constants.HTYPE_INSTANCE
2162
  _OP_REQP = ["instance_name"]
2163
  REQ_BGL = False
2164

    
2165
  def ExpandNames(self):
2166
    self._ExpandAndLockInstance()
2167
    self.needed_locks[locking.LEVEL_NODE] = []
2168
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2169

    
2170
  def DeclareLocks(self, level):
2171
    if level == locking.LEVEL_NODE:
2172
      self._LockInstancesNodes()
2173

    
2174
  def BuildHooksEnv(self):
2175
    """Build hooks env.
2176

2177
    This runs on master, primary and secondary nodes of the instance.
2178

2179
    """
2180
    env = _BuildInstanceHookEnvByObject(self.instance)
2181
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2182
          list(self.instance.secondary_nodes))
2183
    return env, nl, nl
2184

    
2185
  def CheckPrereq(self):
2186
    """Check prerequisites.
2187

2188
    This checks that the instance is in the cluster.
2189

2190
    """
2191
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2192
    assert self.instance is not None, \
2193
      "Cannot retrieve locked instance %s" % self.op.instance_name
2194

    
2195
  def Exec(self, feedback_fn):
2196
    """Shutdown the instance.
2197

2198
    """
2199
    instance = self.instance
2200
    node_current = instance.primary_node
2201
    self.cfg.MarkInstanceDown(instance.name)
2202
    if not rpc.call_instance_shutdown(node_current, instance):
2203
      logger.Error("could not shutdown instance")
2204

    
2205
    _ShutdownInstanceDisks(instance, self.cfg)
2206

    
2207

    
2208
class LUReinstallInstance(LogicalUnit):
2209
  """Reinstall an instance.
2210

2211
  """
2212
  HPATH = "instance-reinstall"
2213
  HTYPE = constants.HTYPE_INSTANCE
2214
  _OP_REQP = ["instance_name"]
2215
  REQ_BGL = False
2216

    
2217
  def ExpandNames(self):
2218
    self._ExpandAndLockInstance()
2219
    self.needed_locks[locking.LEVEL_NODE] = []
2220
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2221

    
2222
  def DeclareLocks(self, level):
2223
    if level == locking.LEVEL_NODE:
2224
      self._LockInstancesNodes()
2225

    
2226
  def BuildHooksEnv(self):
2227
    """Build hooks env.
2228

2229
    This runs on master, primary and secondary nodes of the instance.
2230

2231
    """
2232
    env = _BuildInstanceHookEnvByObject(self.instance)
2233
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2234
          list(self.instance.secondary_nodes))
2235
    return env, nl, nl
2236

    
2237
  def CheckPrereq(self):
2238
    """Check prerequisites.
2239

2240
    This checks that the instance is in the cluster and is not running.
2241

2242
    """
2243
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2244
    assert instance is not None, \
2245
      "Cannot retrieve locked instance %s" % self.op.instance_name
2246

    
2247
    if instance.disk_template == constants.DT_DISKLESS:
2248
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2249
                                 self.op.instance_name)
2250
    if instance.status != "down":
2251
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2252
                                 self.op.instance_name)
2253
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2254
    if remote_info:
2255
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2256
                                 (self.op.instance_name,
2257
                                  instance.primary_node))
2258

    
2259
    self.op.os_type = getattr(self.op, "os_type", None)
2260
    if self.op.os_type is not None:
2261
      # OS verification
2262
      pnode = self.cfg.GetNodeInfo(
2263
        self.cfg.ExpandNodeName(instance.primary_node))
2264
      if pnode is None:
2265
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2266
                                   self.op.pnode)
2267
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2268
      if not os_obj:
2269
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2270
                                   " primary node"  % self.op.os_type)
2271

    
2272
    self.instance = instance
2273

    
2274
  def Exec(self, feedback_fn):
2275
    """Reinstall the instance.
2276

2277
    """
2278
    inst = self.instance
2279

    
2280
    if self.op.os_type is not None:
2281
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2282
      inst.os = self.op.os_type
2283
      self.cfg.AddInstance(inst)
2284

    
2285
    _StartInstanceDisks(self.cfg, inst, None)
2286
    try:
2287
      feedback_fn("Running the instance OS create scripts...")
2288
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2289
        raise errors.OpExecError("Could not install OS for instance %s"
2290
                                 " on node %s" %
2291
                                 (inst.name, inst.primary_node))
2292
    finally:
2293
      _ShutdownInstanceDisks(inst, self.cfg)
2294

    
2295

    
2296
class LURenameInstance(LogicalUnit):
2297
  """Rename an instance.
2298

2299
  """
2300
  HPATH = "instance-rename"
2301
  HTYPE = constants.HTYPE_INSTANCE
2302
  _OP_REQP = ["instance_name", "new_name"]
2303

    
2304
  def BuildHooksEnv(self):
2305
    """Build hooks env.
2306

2307
    This runs on master, primary and secondary nodes of the instance.
2308

2309
    """
2310
    env = _BuildInstanceHookEnvByObject(self.instance)
2311
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2312
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2313
          list(self.instance.secondary_nodes))
2314
    return env, nl, nl
2315

    
2316
  def CheckPrereq(self):
2317
    """Check prerequisites.
2318

2319
    This checks that the instance is in the cluster and is not running.
2320

2321
    """
2322
    instance = self.cfg.GetInstanceInfo(
2323
      self.cfg.ExpandInstanceName(self.op.instance_name))
2324
    if instance is None:
2325
      raise errors.OpPrereqError("Instance '%s' not known" %
2326
                                 self.op.instance_name)
2327
    if instance.status != "down":
2328
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2329
                                 self.op.instance_name)
2330
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2331
    if remote_info:
2332
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2333
                                 (self.op.instance_name,
2334
                                  instance.primary_node))
2335
    self.instance = instance
2336

    
2337
    # new name verification
2338
    name_info = utils.HostInfo(self.op.new_name)
2339

    
2340
    self.op.new_name = new_name = name_info.name
2341
    instance_list = self.cfg.GetInstanceList()
2342
    if new_name in instance_list:
2343
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2344
                                 new_name)
2345

    
2346
    if not getattr(self.op, "ignore_ip", False):
2347
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2348
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2349
                                   (name_info.ip, new_name))
2350

    
2351

    
2352
  def Exec(self, feedback_fn):
2353
    """Reinstall the instance.
2354

2355
    """
2356
    inst = self.instance
2357
    old_name = inst.name
2358

    
2359
    if inst.disk_template == constants.DT_FILE:
2360
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2361

    
2362
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2363
    # Change the instance lock. This is definitely safe while we hold the BGL
2364
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2365
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2366

    
2367
    # re-read the instance from the configuration after rename
2368
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2369

    
2370
    if inst.disk_template == constants.DT_FILE:
2371
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2372
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2373
                                                old_file_storage_dir,
2374
                                                new_file_storage_dir)
2375

    
2376
      if not result:
2377
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2378
                                 " directory '%s' to '%s' (but the instance"
2379
                                 " has been renamed in Ganeti)" % (
2380
                                 inst.primary_node, old_file_storage_dir,
2381
                                 new_file_storage_dir))
2382

    
2383
      if not result[0]:
2384
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2385
                                 " (but the instance has been renamed in"
2386
                                 " Ganeti)" % (old_file_storage_dir,
2387
                                               new_file_storage_dir))
2388

    
2389
    _StartInstanceDisks(self.cfg, inst, None)
2390
    try:
2391
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2392
                                          "sda", "sdb"):
2393
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2394
               " instance has been renamed in Ganeti)" %
2395
               (inst.name, inst.primary_node))
2396
        logger.Error(msg)
2397
    finally:
2398
      _ShutdownInstanceDisks(inst, self.cfg)
2399

    
2400

    
2401
class LURemoveInstance(LogicalUnit):
2402
  """Remove an instance.
2403

2404
  """
2405
  HPATH = "instance-remove"
2406
  HTYPE = constants.HTYPE_INSTANCE
2407
  _OP_REQP = ["instance_name", "ignore_failures"]
2408

    
2409
  def BuildHooksEnv(self):
2410
    """Build hooks env.
2411

2412
    This runs on master, primary and secondary nodes of the instance.
2413

2414
    """
2415
    env = _BuildInstanceHookEnvByObject(self.instance)
2416
    nl = [self.sstore.GetMasterNode()]
2417
    return env, nl, nl
2418

    
2419
  def CheckPrereq(self):
2420
    """Check prerequisites.
2421

2422
    This checks that the instance is in the cluster.
2423

2424
    """
2425
    instance = self.cfg.GetInstanceInfo(
2426
      self.cfg.ExpandInstanceName(self.op.instance_name))
2427
    if instance is None:
2428
      raise errors.OpPrereqError("Instance '%s' not known" %
2429
                                 self.op.instance_name)
2430
    self.instance = instance
2431

    
2432
  def Exec(self, feedback_fn):
2433
    """Remove the instance.
2434

2435
    """
2436
    instance = self.instance
2437
    logger.Info("shutting down instance %s on node %s" %
2438
                (instance.name, instance.primary_node))
2439

    
2440
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2441
      if self.op.ignore_failures:
2442
        feedback_fn("Warning: can't shutdown instance")
2443
      else:
2444
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2445
                                 (instance.name, instance.primary_node))
2446

    
2447
    logger.Info("removing block devices for instance %s" % instance.name)
2448

    
2449
    if not _RemoveDisks(instance, self.cfg):
2450
      if self.op.ignore_failures:
2451
        feedback_fn("Warning: can't remove instance's disks")
2452
      else:
2453
        raise errors.OpExecError("Can't remove instance's disks")
2454

    
2455
    logger.Info("removing instance %s out of cluster config" % instance.name)
2456

    
2457
    self.cfg.RemoveInstance(instance.name)
2458
    # Remove the new instance from the Ganeti Lock Manager
2459
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2460

    
2461

    
2462
class LUQueryInstances(NoHooksLU):
2463
  """Logical unit for querying instances.
2464

2465
  """
2466
  _OP_REQP = ["output_fields", "names"]
2467
  REQ_BGL = False
2468

    
2469
  def ExpandNames(self):
2470
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2471
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2472
                               "admin_state", "admin_ram",
2473
                               "disk_template", "ip", "mac", "bridge",
2474
                               "sda_size", "sdb_size", "vcpus", "tags"],
2475
                       dynamic=self.dynamic_fields,
2476
                       selected=self.op.output_fields)
2477

    
2478
    self.needed_locks = {}
2479
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2480
    self.share_locks[locking.LEVEL_NODE] = 1
2481

    
2482
    # TODO: we could lock instances (and nodes) only if the user asked for
2483
    # dynamic fields. For that we need atomic ways to get info for a group of
2484
    # instances from the config, though.
2485
    if not self.op.names:
2486
      self.needed_locks[locking.LEVEL_INSTANCE] = None # Acquire all
2487
    else:
2488
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2489
        _GetWantedInstances(self, self.op.names)
2490

    
2491
    self.needed_locks[locking.LEVEL_NODE] = []
2492
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2493

    
2494
  def DeclareLocks(self, level):
2495
    # TODO: locking of nodes could be avoided when not querying them
2496
    if level == locking.LEVEL_NODE:
2497
      self._LockInstancesNodes()
2498

    
2499
  def CheckPrereq(self):
2500
    """Check prerequisites.
2501

2502
    """
2503
    # This of course is valid only if we locked the instances
2504
    self.wanted = self.needed_locks[locking.LEVEL_INSTANCE]
2505

    
2506
  def Exec(self, feedback_fn):
2507
    """Computes the list of nodes and their attributes.
2508

2509
    """
2510
    instance_names = self.wanted
2511
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2512
                     in instance_names]
2513

    
2514
    # begin data gathering
2515

    
2516
    nodes = frozenset([inst.primary_node for inst in instance_list])
2517

    
2518
    bad_nodes = []
2519
    if self.dynamic_fields.intersection(self.op.output_fields):
2520
      live_data = {}
2521
      node_data = rpc.call_all_instances_info(nodes)
2522
      for name in nodes:
2523
        result = node_data[name]
2524
        if result:
2525
          live_data.update(result)
2526
        elif result == False:
2527
          bad_nodes.append(name)
2528
        # else no instance is alive
2529
    else:
2530
      live_data = dict([(name, {}) for name in instance_names])
2531

    
2532
    # end data gathering
2533

    
2534
    output = []
2535
    for instance in instance_list:
2536
      iout = []
2537
      for field in self.op.output_fields:
2538
        if field == "name":
2539
          val = instance.name
2540
        elif field == "os":
2541
          val = instance.os
2542
        elif field == "pnode":
2543
          val = instance.primary_node
2544
        elif field == "snodes":
2545
          val = list(instance.secondary_nodes)
2546
        elif field == "admin_state":
2547
          val = (instance.status != "down")
2548
        elif field == "oper_state":
2549
          if instance.primary_node in bad_nodes:
2550
            val = None
2551
          else:
2552
            val = bool(live_data.get(instance.name))
2553
        elif field == "status":
2554
          if instance.primary_node in bad_nodes:
2555
            val = "ERROR_nodedown"
2556
          else:
2557
            running = bool(live_data.get(instance.name))
2558
            if running:
2559
              if instance.status != "down":
2560
                val = "running"
2561
              else:
2562
                val = "ERROR_up"
2563
            else:
2564
              if instance.status != "down":
2565
                val = "ERROR_down"
2566
              else:
2567
                val = "ADMIN_down"
2568
        elif field == "admin_ram":
2569
          val = instance.memory
2570
        elif field == "oper_ram":
2571
          if instance.primary_node in bad_nodes:
2572
            val = None
2573
          elif instance.name in live_data:
2574
            val = live_data[instance.name].get("memory", "?")
2575
          else:
2576
            val = "-"
2577
        elif field == "disk_template":
2578
          val = instance.disk_template
2579
        elif field == "ip":
2580
          val = instance.nics[0].ip
2581
        elif field == "bridge":
2582
          val = instance.nics[0].bridge
2583
        elif field == "mac":
2584
          val = instance.nics[0].mac
2585
        elif field == "sda_size" or field == "sdb_size":
2586
          disk = instance.FindDisk(field[:3])
2587
          if disk is None:
2588
            val = None
2589
          else:
2590
            val = disk.size
2591
        elif field == "vcpus":
2592
          val = instance.vcpus
2593
        elif field == "tags":
2594
          val = list(instance.GetTags())
2595
        else:
2596
          raise errors.ParameterError(field)
2597
        iout.append(val)
2598
      output.append(iout)
2599

    
2600
    return output
2601

    
2602

    
2603
class LUFailoverInstance(LogicalUnit):
2604
  """Failover an instance.
2605

2606
  """
2607
  HPATH = "instance-failover"
2608
  HTYPE = constants.HTYPE_INSTANCE
2609
  _OP_REQP = ["instance_name", "ignore_consistency"]
2610
  REQ_BGL = False
2611

    
2612
  def ExpandNames(self):
2613
    self._ExpandAndLockInstance()
2614
    self.needed_locks[locking.LEVEL_NODE] = []
2615
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2616

    
2617
  def DeclareLocks(self, level):
2618
    if level == locking.LEVEL_NODE:
2619
      self._LockInstancesNodes()
2620

    
2621
  def BuildHooksEnv(self):
2622
    """Build hooks env.
2623

2624
    This runs on master, primary and secondary nodes of the instance.
2625

2626
    """
2627
    env = {
2628
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2629
      }
2630
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2631
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2632
    return env, nl, nl
2633

    
2634
  def CheckPrereq(self):
2635
    """Check prerequisites.
2636

2637
    This checks that the instance is in the cluster.
2638

2639
    """
2640
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2641
    assert self.instance is not None, \
2642
      "Cannot retrieve locked instance %s" % self.op.instance_name
2643

    
2644
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2645
      raise errors.OpPrereqError("Instance's disk layout is not"
2646
                                 " network mirrored, cannot failover.")
2647

    
2648
    secondary_nodes = instance.secondary_nodes
2649
    if not secondary_nodes:
2650
      raise errors.ProgrammerError("no secondary node but using "
2651
                                   "a mirrored disk template")
2652

    
2653
    target_node = secondary_nodes[0]
2654
    # check memory requirements on the secondary node
2655
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2656
                         instance.name, instance.memory)
2657

    
2658
    # check bridge existance
2659
    brlist = [nic.bridge for nic in instance.nics]
2660
    if not rpc.call_bridges_exist(target_node, brlist):
2661
      raise errors.OpPrereqError("One or more target bridges %s does not"
2662
                                 " exist on destination node '%s'" %
2663
                                 (brlist, target_node))
2664

    
2665
  def Exec(self, feedback_fn):
2666
    """Failover an instance.
2667

2668
    The failover is done by shutting it down on its present node and
2669
    starting it on the secondary.
2670

2671
    """
2672
    instance = self.instance
2673

    
2674
    source_node = instance.primary_node
2675
    target_node = instance.secondary_nodes[0]
2676

    
2677
    feedback_fn("* checking disk consistency between source and target")
2678
    for dev in instance.disks:
2679
      # for drbd, these are drbd over lvm
2680
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2681
        if instance.status == "up" and not self.op.ignore_consistency:
2682
          raise errors.OpExecError("Disk %s is degraded on target node,"
2683
                                   " aborting failover." % dev.iv_name)
2684

    
2685
    feedback_fn("* shutting down instance on source node")
2686
    logger.Info("Shutting down instance %s on node %s" %
2687
                (instance.name, source_node))
2688

    
2689
    if not rpc.call_instance_shutdown(source_node, instance):
2690
      if self.op.ignore_consistency:
2691
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2692
                     " anyway. Please make sure node %s is down"  %
2693
                     (instance.name, source_node, source_node))
2694
      else:
2695
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2696
                                 (instance.name, source_node))
2697

    
2698
    feedback_fn("* deactivating the instance's disks on source node")
2699
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2700
      raise errors.OpExecError("Can't shut down the instance's disks.")
2701

    
2702
    instance.primary_node = target_node
2703
    # distribute new instance config to the other nodes
2704
    self.cfg.Update(instance)
2705

    
2706
    # Only start the instance if it's marked as up
2707
    if instance.status == "up":
2708
      feedback_fn("* activating the instance's disks on target node")
2709
      logger.Info("Starting instance %s on node %s" %
2710
                  (instance.name, target_node))
2711

    
2712
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2713
                                               ignore_secondaries=True)
2714
      if not disks_ok:
2715
        _ShutdownInstanceDisks(instance, self.cfg)
2716
        raise errors.OpExecError("Can't activate the instance's disks")
2717

    
2718
      feedback_fn("* starting the instance on the target node")
2719
      if not rpc.call_instance_start(target_node, instance, None):
2720
        _ShutdownInstanceDisks(instance, self.cfg)
2721
        raise errors.OpExecError("Could not start instance %s on node %s." %
2722
                                 (instance.name, target_node))
2723

    
2724

    
2725
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2726
  """Create a tree of block devices on the primary node.
2727

2728
  This always creates all devices.
2729

2730
  """
2731
  if device.children:
2732
    for child in device.children:
2733
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2734
        return False
2735

    
2736
  cfg.SetDiskID(device, node)
2737
  new_id = rpc.call_blockdev_create(node, device, device.size,
2738
                                    instance.name, True, info)
2739
  if not new_id:
2740
    return False
2741
  if device.physical_id is None:
2742
    device.physical_id = new_id
2743
  return True
2744

    
2745

    
2746
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2747
  """Create a tree of block devices on a secondary node.
2748

2749
  If this device type has to be created on secondaries, create it and
2750
  all its children.
2751

2752
  If not, just recurse to children keeping the same 'force' value.
2753

2754
  """
2755
  if device.CreateOnSecondary():
2756
    force = True
2757
  if device.children:
2758
    for child in device.children:
2759
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2760
                                        child, force, info):
2761
        return False
2762

    
2763
  if not force:
2764
    return True
2765
  cfg.SetDiskID(device, node)
2766
  new_id = rpc.call_blockdev_create(node, device, device.size,
2767
                                    instance.name, False, info)
2768
  if not new_id:
2769
    return False
2770
  if device.physical_id is None:
2771
    device.physical_id = new_id
2772
  return True
2773

    
2774

    
2775
def _GenerateUniqueNames(cfg, exts):
2776
  """Generate a suitable LV name.
2777

2778
  This will generate a logical volume name for the given instance.
2779

2780
  """
2781
  results = []
2782
  for val in exts:
2783
    new_id = cfg.GenerateUniqueID()
2784
    results.append("%s%s" % (new_id, val))
2785
  return results
2786

    
2787

    
2788
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2789
  """Generate a drbd8 device complete with its children.
2790

2791
  """
2792
  port = cfg.AllocatePort()
2793
  vgname = cfg.GetVGName()
2794
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2795
                          logical_id=(vgname, names[0]))
2796
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2797
                          logical_id=(vgname, names[1]))
2798
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2799
                          logical_id = (primary, secondary, port),
2800
                          children = [dev_data, dev_meta],
2801
                          iv_name=iv_name)
2802
  return drbd_dev
2803

    
2804

    
2805
def _GenerateDiskTemplate(cfg, template_name,
2806
                          instance_name, primary_node,
2807
                          secondary_nodes, disk_sz, swap_sz,
2808
                          file_storage_dir, file_driver):
2809
  """Generate the entire disk layout for a given template type.
2810

2811
  """
2812
  #TODO: compute space requirements
2813

    
2814
  vgname = cfg.GetVGName()
2815
  if template_name == constants.DT_DISKLESS:
2816
    disks = []
2817
  elif template_name == constants.DT_PLAIN:
2818
    if len(secondary_nodes) != 0:
2819
      raise errors.ProgrammerError("Wrong template configuration")
2820

    
2821
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2822
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2823
                           logical_id=(vgname, names[0]),
2824
                           iv_name = "sda")
2825
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2826
                           logical_id=(vgname, names[1]),
2827
                           iv_name = "sdb")
2828
    disks = [sda_dev, sdb_dev]
2829
  elif template_name == constants.DT_DRBD8:
2830
    if len(secondary_nodes) != 1:
2831
      raise errors.ProgrammerError("Wrong template configuration")
2832
    remote_node = secondary_nodes[0]
2833
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2834
                                       ".sdb_data", ".sdb_meta"])
2835
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2836
                                         disk_sz, names[0:2], "sda")
2837
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2838
                                         swap_sz, names[2:4], "sdb")
2839
    disks = [drbd_sda_dev, drbd_sdb_dev]
2840
  elif template_name == constants.DT_FILE:
2841
    if len(secondary_nodes) != 0:
2842
      raise errors.ProgrammerError("Wrong template configuration")
2843

    
2844
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2845
                                iv_name="sda", logical_id=(file_driver,
2846
                                "%s/sda" % file_storage_dir))
2847
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2848
                                iv_name="sdb", logical_id=(file_driver,
2849
                                "%s/sdb" % file_storage_dir))
2850
    disks = [file_sda_dev, file_sdb_dev]
2851
  else:
2852
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2853
  return disks
2854

    
2855

    
2856
def _GetInstanceInfoText(instance):
2857
  """Compute that text that should be added to the disk's metadata.
2858

2859
  """
2860
  return "originstname+%s" % instance.name
2861

    
2862

    
2863
def _CreateDisks(cfg, instance):
2864
  """Create all disks for an instance.
2865

2866
  This abstracts away some work from AddInstance.
2867

2868
  Args:
2869
    instance: the instance object
2870

2871
  Returns:
2872
    True or False showing the success of the creation process
2873

2874
  """
2875
  info = _GetInstanceInfoText(instance)
2876

    
2877
  if instance.disk_template == constants.DT_FILE:
2878
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2879
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2880
                                              file_storage_dir)
2881

    
2882
    if not result:
2883
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2884
      return False
2885

    
2886
    if not result[0]:
2887
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2888
      return False
2889

    
2890
  for device in instance.disks:
2891
    logger.Info("creating volume %s for instance %s" %
2892
                (device.iv_name, instance.name))
2893
    #HARDCODE
2894
    for secondary_node in instance.secondary_nodes:
2895
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2896
                                        device, False, info):
2897
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2898
                     (device.iv_name, device, secondary_node))
2899
        return False
2900
    #HARDCODE
2901
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2902
                                    instance, device, info):
2903
      logger.Error("failed to create volume %s on primary!" %
2904
                   device.iv_name)
2905
      return False
2906

    
2907
  return True
2908

    
2909

    
2910
def _RemoveDisks(instance, cfg):
2911
  """Remove all disks for an instance.
2912

2913
  This abstracts away some work from `AddInstance()` and
2914
  `RemoveInstance()`. Note that in case some of the devices couldn't
2915
  be removed, the removal will continue with the other ones (compare
2916
  with `_CreateDisks()`).
2917

2918
  Args:
2919
    instance: the instance object
2920

2921
  Returns:
2922
    True or False showing the success of the removal proces
2923

2924
  """
2925
  logger.Info("removing block devices for instance %s" % instance.name)
2926

    
2927
  result = True
2928
  for device in instance.disks:
2929
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2930
      cfg.SetDiskID(disk, node)
2931
      if not rpc.call_blockdev_remove(node, disk):
2932
        logger.Error("could not remove block device %s on node %s,"
2933
                     " continuing anyway" %
2934
                     (device.iv_name, node))
2935
        result = False
2936

    
2937
  if instance.disk_template == constants.DT_FILE:
2938
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2939
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2940
                                            file_storage_dir):
2941
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2942
      result = False
2943

    
2944
  return result
2945

    
2946

    
2947
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2948
  """Compute disk size requirements in the volume group
2949

2950
  This is currently hard-coded for the two-drive layout.
2951

2952
  """
2953
  # Required free disk space as a function of disk and swap space
2954
  req_size_dict = {
2955
    constants.DT_DISKLESS: None,
2956
    constants.DT_PLAIN: disk_size + swap_size,
2957
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2958
    constants.DT_DRBD8: disk_size + swap_size + 256,
2959
    constants.DT_FILE: None,
2960
  }
2961

    
2962
  if disk_template not in req_size_dict:
2963
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2964
                                 " is unknown" %  disk_template)
2965

    
2966
  return req_size_dict[disk_template]
2967

    
2968

    
2969
class LUCreateInstance(LogicalUnit):
2970
  """Create an instance.
2971

2972
  """
2973
  HPATH = "instance-add"
2974
  HTYPE = constants.HTYPE_INSTANCE
2975
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2976
              "disk_template", "swap_size", "mode", "start", "vcpus",
2977
              "wait_for_sync", "ip_check", "mac"]
2978

    
2979
  def _RunAllocator(self):
2980
    """Run the allocator based on input opcode.
2981

2982
    """
2983
    disks = [{"size": self.op.disk_size, "mode": "w"},
2984
             {"size": self.op.swap_size, "mode": "w"}]
2985
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2986
             "bridge": self.op.bridge}]
2987
    ial = IAllocator(self.cfg, self.sstore,
2988
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2989
                     name=self.op.instance_name,
2990
                     disk_template=self.op.disk_template,
2991
                     tags=[],
2992
                     os=self.op.os_type,
2993
                     vcpus=self.op.vcpus,
2994
                     mem_size=self.op.mem_size,
2995
                     disks=disks,
2996
                     nics=nics,
2997
                     )
2998

    
2999
    ial.Run(self.op.iallocator)
3000

    
3001
    if not ial.success:
3002
      raise errors.OpPrereqError("Can't compute nodes using"
3003
                                 " iallocator '%s': %s" % (self.op.iallocator,
3004
                                                           ial.info))
3005
    if len(ial.nodes) != ial.required_nodes:
3006
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3007
                                 " of nodes (%s), required %s" %
3008
                                 (len(ial.nodes), ial.required_nodes))
3009
    self.op.pnode = ial.nodes[0]
3010
    logger.ToStdout("Selected nodes for the instance: %s" %
3011
                    (", ".join(ial.nodes),))
3012
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3013
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3014
    if ial.required_nodes == 2:
3015
      self.op.snode = ial.nodes[1]
3016

    
3017
  def BuildHooksEnv(self):
3018
    """Build hooks env.
3019

3020
    This runs on master, primary and secondary nodes of the instance.
3021

3022
    """
3023
    env = {
3024
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3025
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3026
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3027
      "INSTANCE_ADD_MODE": self.op.mode,
3028
      }
3029
    if self.op.mode == constants.INSTANCE_IMPORT:
3030
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3031
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3032
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3033

    
3034
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3035
      primary_node=self.op.pnode,
3036
      secondary_nodes=self.secondaries,
3037
      status=self.instance_status,
3038
      os_type=self.op.os_type,
3039
      memory=self.op.mem_size,
3040
      vcpus=self.op.vcpus,
3041
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3042
    ))
3043

    
3044
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3045
          self.secondaries)
3046
    return env, nl, nl
3047

    
3048

    
3049
  def CheckPrereq(self):
3050
    """Check prerequisites.
3051

3052
    """
3053
    # set optional parameters to none if they don't exist
3054
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3055
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3056
                 "vnc_bind_address"]:
3057
      if not hasattr(self.op, attr):
3058
        setattr(self.op, attr, None)
3059

    
3060
    if self.op.mode not in (constants.INSTANCE_CREATE,
3061
                            constants.INSTANCE_IMPORT):
3062
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3063
                                 self.op.mode)
3064

    
3065
    if (not self.cfg.GetVGName() and
3066
        self.op.disk_template not in constants.DTS_NOT_LVM):
3067
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3068
                                 " instances")
3069

    
3070
    if self.op.mode == constants.INSTANCE_IMPORT:
3071
      src_node = getattr(self.op, "src_node", None)
3072
      src_path = getattr(self.op, "src_path", None)
3073
      if src_node is None or src_path is None:
3074
        raise errors.OpPrereqError("Importing an instance requires source"
3075
                                   " node and path options")
3076
      src_node_full = self.cfg.ExpandNodeName(src_node)
3077
      if src_node_full is None:
3078
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3079
      self.op.src_node = src_node = src_node_full
3080

    
3081
      if not os.path.isabs(src_path):
3082
        raise errors.OpPrereqError("The source path must be absolute")
3083

    
3084
      export_info = rpc.call_export_info(src_node, src_path)
3085

    
3086
      if not export_info:
3087
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3088

    
3089
      if not export_info.has_section(constants.INISECT_EXP):
3090
        raise errors.ProgrammerError("Corrupted export config")
3091

    
3092
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3093
      if (int(ei_version) != constants.EXPORT_VERSION):
3094
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3095
                                   (ei_version, constants.EXPORT_VERSION))
3096

    
3097
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3098
        raise errors.OpPrereqError("Can't import instance with more than"
3099
                                   " one data disk")
3100

    
3101
      # FIXME: are the old os-es, disk sizes, etc. useful?
3102
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3103
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3104
                                                         'disk0_dump'))
3105
      self.src_image = diskimage
3106
    else: # INSTANCE_CREATE
3107
      if getattr(self.op, "os_type", None) is None:
3108
        raise errors.OpPrereqError("No guest OS specified")
3109

    
3110
    #### instance parameters check
3111

    
3112
    # disk template and mirror node verification
3113
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3114
      raise errors.OpPrereqError("Invalid disk template name")
3115

    
3116
    # instance name verification
3117
    hostname1 = utils.HostInfo(self.op.instance_name)
3118

    
3119
    self.op.instance_name = instance_name = hostname1.name
3120
    instance_list = self.cfg.GetInstanceList()
3121
    if instance_name in instance_list:
3122
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3123
                                 instance_name)
3124

    
3125
    # ip validity checks
3126
    ip = getattr(self.op, "ip", None)
3127
    if ip is None or ip.lower() == "none":
3128
      inst_ip = None
3129
    elif ip.lower() == "auto":
3130
      inst_ip = hostname1.ip
3131
    else:
3132
      if not utils.IsValidIP(ip):
3133
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3134
                                   " like a valid IP" % ip)
3135
      inst_ip = ip
3136
    self.inst_ip = self.op.ip = inst_ip
3137

    
3138
    if self.op.start and not self.op.ip_check:
3139
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3140
                                 " adding an instance in start mode")
3141

    
3142
    if self.op.ip_check:
3143
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3144
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3145
                                   (hostname1.ip, instance_name))
3146

    
3147
    # MAC address verification
3148
    if self.op.mac != "auto":
3149
      if not utils.IsValidMac(self.op.mac.lower()):
3150
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3151
                                   self.op.mac)
3152

    
3153
    # bridge verification
3154
    bridge = getattr(self.op, "bridge", None)
3155
    if bridge is None:
3156
      self.op.bridge = self.cfg.GetDefBridge()
3157
    else:
3158
      self.op.bridge = bridge
3159

    
3160
    # boot order verification
3161
    if self.op.hvm_boot_order is not None:
3162
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3163
        raise errors.OpPrereqError("invalid boot order specified,"
3164
                                   " must be one or more of [acdn]")
3165
    # file storage checks
3166
    if (self.op.file_driver and
3167
        not self.op.file_driver in constants.FILE_DRIVER):
3168
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3169
                                 self.op.file_driver)
3170

    
3171
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3172
      raise errors.OpPrereqError("File storage directory not a relative"
3173
                                 " path")
3174
    #### allocator run
3175

    
3176
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3177
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3178
                                 " node must be given")
3179

    
3180
    if self.op.iallocator is not None:
3181
      self._RunAllocator()
3182

    
3183
    #### node related checks
3184

    
3185
    # check primary node
3186
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3187
    if pnode is None:
3188
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3189
                                 self.op.pnode)
3190
    self.op.pnode = pnode.name
3191
    self.pnode = pnode
3192
    self.secondaries = []
3193

    
3194
    # mirror node verification
3195
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3196
      if getattr(self.op, "snode", None) is None:
3197
        raise errors.OpPrereqError("The networked disk templates need"
3198
                                   " a mirror node")
3199

    
3200
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3201
      if snode_name is None:
3202
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3203
                                   self.op.snode)
3204
      elif snode_name == pnode.name:
3205
        raise errors.OpPrereqError("The secondary node cannot be"
3206
                                   " the primary node.")
3207
      self.secondaries.append(snode_name)
3208

    
3209
    req_size = _ComputeDiskSize(self.op.disk_template,
3210
                                self.op.disk_size, self.op.swap_size)
3211

    
3212
    # Check lv size requirements
3213
    if req_size is not None:
3214
      nodenames = [pnode.name] + self.secondaries
3215
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3216
      for node in nodenames:
3217
        info = nodeinfo.get(node, None)
3218
        if not info:
3219
          raise errors.OpPrereqError("Cannot get current information"
3220
                                     " from node '%s'" % node)
3221
        vg_free = info.get('vg_free', None)
3222
        if not isinstance(vg_free, int):
3223
          raise errors.OpPrereqError("Can't compute free disk space on"
3224
                                     " node %s" % node)
3225
        if req_size > info['vg_free']:
3226
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3227
                                     " %d MB available, %d MB required" %
3228
                                     (node, info['vg_free'], req_size))
3229

    
3230
    # os verification
3231
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3232
    if not os_obj:
3233
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3234
                                 " primary node"  % self.op.os_type)
3235

    
3236
    if self.op.kernel_path == constants.VALUE_NONE:
3237
      raise errors.OpPrereqError("Can't set instance kernel to none")
3238

    
3239

    
3240
    # bridge check on primary node
3241
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3242
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3243
                                 " destination node '%s'" %
3244
                                 (self.op.bridge, pnode.name))
3245

    
3246
    # memory check on primary node
3247
    if self.op.start:
3248
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3249
                           "creating instance %s" % self.op.instance_name,
3250
                           self.op.mem_size)
3251

    
3252
    # hvm_cdrom_image_path verification
3253
    if self.op.hvm_cdrom_image_path is not None:
3254
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3255
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3256
                                   " be an absolute path or None, not %s" %
3257
                                   self.op.hvm_cdrom_image_path)
3258
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3259
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3260
                                   " regular file or a symlink pointing to"
3261
                                   " an existing regular file, not %s" %
3262
                                   self.op.hvm_cdrom_image_path)
3263

    
3264
    # vnc_bind_address verification
3265
    if self.op.vnc_bind_address is not None:
3266
      if not utils.IsValidIP(self.op.vnc_bind_address):
3267
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3268
                                   " like a valid IP address" %
3269
                                   self.op.vnc_bind_address)
3270

    
3271
    if self.op.start:
3272
      self.instance_status = 'up'
3273
    else:
3274
      self.instance_status = 'down'
3275

    
3276
  def Exec(self, feedback_fn):
3277
    """Create and add the instance to the cluster.
3278

3279
    """
3280
    instance = self.op.instance_name
3281
    pnode_name = self.pnode.name
3282

    
3283
    if self.op.mac == "auto":
3284
      mac_address = self.cfg.GenerateMAC()
3285
    else:
3286
      mac_address = self.op.mac
3287

    
3288
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3289
    if self.inst_ip is not None:
3290
      nic.ip = self.inst_ip
3291

    
3292
    ht_kind = self.sstore.GetHypervisorType()
3293
    if ht_kind in constants.HTS_REQ_PORT:
3294
      network_port = self.cfg.AllocatePort()
3295
    else:
3296
      network_port = None
3297

    
3298
    if self.op.vnc_bind_address is None:
3299
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3300

    
3301
    # this is needed because os.path.join does not accept None arguments
3302
    if self.op.file_storage_dir is None:
3303
      string_file_storage_dir = ""
3304
    else:
3305
      string_file_storage_dir = self.op.file_storage_dir
3306

    
3307
    # build the full file storage dir path
3308
    file_storage_dir = os.path.normpath(os.path.join(
3309
                                        self.sstore.GetFileStorageDir(),
3310
                                        string_file_storage_dir, instance))
3311

    
3312

    
3313
    disks = _GenerateDiskTemplate(self.cfg,
3314
                                  self.op.disk_template,
3315
                                  instance, pnode_name,
3316
                                  self.secondaries, self.op.disk_size,
3317
                                  self.op.swap_size,
3318
                                  file_storage_dir,
3319
                                  self.op.file_driver)
3320

    
3321
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3322
                            primary_node=pnode_name,
3323
                            memory=self.op.mem_size,
3324
                            vcpus=self.op.vcpus,
3325
                            nics=[nic], disks=disks,
3326
                            disk_template=self.op.disk_template,
3327
                            status=self.instance_status,
3328
                            network_port=network_port,
3329
                            kernel_path=self.op.kernel_path,
3330
                            initrd_path=self.op.initrd_path,
3331
                            hvm_boot_order=self.op.hvm_boot_order,
3332
                            hvm_acpi=self.op.hvm_acpi,
3333
                            hvm_pae=self.op.hvm_pae,
3334
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3335
                            vnc_bind_address=self.op.vnc_bind_address,
3336
                            )
3337

    
3338
    feedback_fn("* creating instance disks...")
3339
    if not _CreateDisks(self.cfg, iobj):
3340
      _RemoveDisks(iobj, self.cfg)
3341
      raise errors.OpExecError("Device creation failed, reverting...")
3342

    
3343
    feedback_fn("adding instance %s to cluster config" % instance)
3344

    
3345
    self.cfg.AddInstance(iobj)
3346
    # Add the new instance to the Ganeti Lock Manager
3347
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3348

    
3349
    if self.op.wait_for_sync:
3350
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3351
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3352
      # make sure the disks are not degraded (still sync-ing is ok)
3353
      time.sleep(15)
3354
      feedback_fn("* checking mirrors status")
3355
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3356
    else:
3357
      disk_abort = False
3358

    
3359
    if disk_abort:
3360
      _RemoveDisks(iobj, self.cfg)
3361
      self.cfg.RemoveInstance(iobj.name)
3362
      # Remove the new instance from the Ganeti Lock Manager
3363
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3364
      raise errors.OpExecError("There are some degraded disks for"
3365
                               " this instance")
3366

    
3367
    feedback_fn("creating os for instance %s on node %s" %
3368
                (instance, pnode_name))
3369

    
3370
    if iobj.disk_template != constants.DT_DISKLESS:
3371
      if self.op.mode == constants.INSTANCE_CREATE:
3372
        feedback_fn("* running the instance OS create scripts...")
3373
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3374
          raise errors.OpExecError("could not add os for instance %s"
3375
                                   " on node %s" %
3376
                                   (instance, pnode_name))
3377

    
3378
      elif self.op.mode == constants.INSTANCE_IMPORT:
3379
        feedback_fn("* running the instance OS import scripts...")
3380
        src_node = self.op.src_node
3381
        src_image = self.src_image
3382
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3383
                                                src_node, src_image):
3384
          raise errors.OpExecError("Could not import os for instance"
3385
                                   " %s on node %s" %
3386
                                   (instance, pnode_name))
3387
      else:
3388
        # also checked in the prereq part
3389
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3390
                                     % self.op.mode)
3391

    
3392
    if self.op.start:
3393
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3394
      feedback_fn("* starting instance...")
3395
      if not rpc.call_instance_start(pnode_name, iobj, None):
3396
        raise errors.OpExecError("Could not start instance")
3397

    
3398

    
3399
class LUConnectConsole(NoHooksLU):
3400
  """Connect to an instance's console.
3401

3402
  This is somewhat special in that it returns the command line that
3403
  you need to run on the master node in order to connect to the
3404
  console.
3405

3406
  """
3407
  _OP_REQP = ["instance_name"]
3408
  REQ_BGL = False
3409

    
3410
  def ExpandNames(self):
3411
    self._ExpandAndLockInstance()
3412

    
3413
  def CheckPrereq(self):
3414
    """Check prerequisites.
3415

3416
    This checks that the instance is in the cluster.
3417

3418
    """
3419
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3420
    assert self.instance is not None, \
3421
      "Cannot retrieve locked instance %s" % self.op.instance_name
3422

    
3423
  def Exec(self, feedback_fn):
3424
    """Connect to the console of an instance
3425

3426
    """
3427
    instance = self.instance
3428
    node = instance.primary_node
3429

    
3430
    node_insts = rpc.call_instance_list([node])[node]
3431
    if node_insts is False:
3432
      raise errors.OpExecError("Can't connect to node %s." % node)
3433

    
3434
    if instance.name not in node_insts:
3435
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3436

    
3437
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3438

    
3439
    hyper = hypervisor.GetHypervisor()
3440
    console_cmd = hyper.GetShellCommandForConsole(instance)
3441

    
3442
    # build ssh cmdline
3443
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3444

    
3445

    
3446
class LUReplaceDisks(LogicalUnit):
3447
  """Replace the disks of an instance.
3448

3449
  """
3450
  HPATH = "mirrors-replace"
3451
  HTYPE = constants.HTYPE_INSTANCE
3452
  _OP_REQP = ["instance_name", "mode", "disks"]
3453

    
3454
  def _RunAllocator(self):
3455
    """Compute a new secondary node using an IAllocator.
3456

3457
    """
3458
    ial = IAllocator(self.cfg, self.sstore,
3459
                     mode=constants.IALLOCATOR_MODE_RELOC,
3460
                     name=self.op.instance_name,
3461
                     relocate_from=[self.sec_node])
3462

    
3463
    ial.Run(self.op.iallocator)
3464

    
3465
    if not ial.success:
3466
      raise errors.OpPrereqError("Can't compute nodes using"
3467
                                 " iallocator '%s': %s" % (self.op.iallocator,
3468
                                                           ial.info))
3469
    if len(ial.nodes) != ial.required_nodes:
3470
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3471
                                 " of nodes (%s), required %s" %
3472
                                 (len(ial.nodes), ial.required_nodes))
3473
    self.op.remote_node = ial.nodes[0]
3474
    logger.ToStdout("Selected new secondary for the instance: %s" %
3475
                    self.op.remote_node)
3476

    
3477
  def BuildHooksEnv(self):
3478
    """Build hooks env.
3479

3480
    This runs on the master, the primary and all the secondaries.
3481

3482
    """
3483
    env = {
3484
      "MODE": self.op.mode,
3485
      "NEW_SECONDARY": self.op.remote_node,
3486
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3487
      }
3488
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3489
    nl = [
3490
      self.sstore.GetMasterNode(),
3491
      self.instance.primary_node,
3492
      ]
3493
    if self.op.remote_node is not None:
3494
      nl.append(self.op.remote_node)
3495
    return env, nl, nl
3496

    
3497
  def CheckPrereq(self):
3498
    """Check prerequisites.
3499

3500
    This checks that the instance is in the cluster.
3501

3502
    """
3503
    if not hasattr(self.op, "remote_node"):
3504
      self.op.remote_node = None
3505

    
3506
    instance = self.cfg.GetInstanceInfo(
3507
      self.cfg.ExpandInstanceName(self.op.instance_name))
3508
    if instance is None:
3509
      raise errors.OpPrereqError("Instance '%s' not known" %
3510
                                 self.op.instance_name)
3511
    self.instance = instance
3512
    self.op.instance_name = instance.name
3513

    
3514
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3515
      raise errors.OpPrereqError("Instance's disk layout is not"
3516
                                 " network mirrored.")
3517

    
3518
    if len(instance.secondary_nodes) != 1:
3519
      raise errors.OpPrereqError("The instance has a strange layout,"
3520
                                 " expected one secondary but found %d" %
3521
                                 len(instance.secondary_nodes))
3522

    
3523
    self.sec_node = instance.secondary_nodes[0]
3524

    
3525
    ia_name = getattr(self.op, "iallocator", None)
3526
    if ia_name is not None:
3527
      if self.op.remote_node is not None:
3528
        raise errors.OpPrereqError("Give either the iallocator or the new"
3529
                                   " secondary, not both")
3530
      self.op.remote_node = self._RunAllocator()
3531

    
3532
    remote_node = self.op.remote_node
3533
    if remote_node is not None:
3534
      remote_node = self.cfg.ExpandNodeName(remote_node)
3535
      if remote_node is None:
3536
        raise errors.OpPrereqError("Node '%s' not known" %
3537
                                   self.op.remote_node)
3538
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3539
    else:
3540
      self.remote_node_info = None
3541
    if remote_node == instance.primary_node:
3542
      raise errors.OpPrereqError("The specified node is the primary node of"
3543
                                 " the instance.")
3544
    elif remote_node == self.sec_node:
3545
      if self.op.mode == constants.REPLACE_DISK_SEC:
3546
        # this is for DRBD8, where we can't execute the same mode of
3547
        # replacement as for drbd7 (no different port allocated)
3548
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3549
                                   " replacement")
3550
    if instance.disk_template == constants.DT_DRBD8:
3551
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3552
          remote_node is not None):
3553
        # switch to replace secondary mode
3554
        self.op.mode = constants.REPLACE_DISK_SEC
3555

    
3556
      if self.op.mode == constants.REPLACE_DISK_ALL:
3557
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3558
                                   " secondary disk replacement, not"
3559
                                   " both at once")
3560
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3561
        if remote_node is not None:
3562
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3563
                                     " the secondary while doing a primary"
3564
                                     " node disk replacement")
3565
        self.tgt_node = instance.primary_node
3566
        self.oth_node = instance.secondary_nodes[0]
3567
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3568
        self.new_node = remote_node # this can be None, in which case
3569
                                    # we don't change the secondary
3570
        self.tgt_node = instance.secondary_nodes[0]
3571
        self.oth_node = instance.primary_node
3572
      else:
3573
        raise errors.ProgrammerError("Unhandled disk replace mode")
3574

    
3575
    for name in self.op.disks:
3576
      if instance.FindDisk(name) is None:
3577
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3578
                                   (name, instance.name))
3579
    self.op.remote_node = remote_node
3580

    
3581
  def _ExecD8DiskOnly(self, feedback_fn):
3582
    """Replace a disk on the primary or secondary for dbrd8.
3583

3584
    The algorithm for replace is quite complicated:
3585
      - for each disk to be replaced:
3586
        - create new LVs on the target node with unique names
3587
        - detach old LVs from the drbd device
3588
        - rename old LVs to name_replaced.<time_t>
3589
        - rename new LVs to old LVs
3590
        - attach the new LVs (with the old names now) to the drbd device
3591
      - wait for sync across all devices
3592
      - for each modified disk:
3593
        - remove old LVs (which have the name name_replaces.<time_t>)
3594

3595
    Failures are not very well handled.
3596

3597
    """
3598
    steps_total = 6
3599
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3600
    instance = self.instance
3601
    iv_names = {}
3602
    vgname = self.cfg.GetVGName()
3603
    # start of work
3604
    cfg = self.cfg
3605
    tgt_node = self.tgt_node
3606
    oth_node = self.oth_node
3607

    
3608
    # Step: check device activation
3609
    self.proc.LogStep(1, steps_total, "check device existence")
3610
    info("checking volume groups")
3611
    my_vg = cfg.GetVGName()
3612
    results = rpc.call_vg_list([oth_node, tgt_node])
3613
    if not results:
3614
      raise errors.OpExecError("Can't list volume groups on the nodes")
3615
    for node in oth_node, tgt_node:
3616
      res = results.get(node, False)
3617
      if not res or my_vg not in res:
3618
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3619
                                 (my_vg, node))
3620
    for dev in instance.disks:
3621
      if not dev.iv_name in self.op.disks:
3622
        continue
3623
      for node in tgt_node, oth_node:
3624
        info("checking %s on %s" % (dev.iv_name, node))
3625
        cfg.SetDiskID(dev, node)
3626
        if not rpc.call_blockdev_find(node, dev):
3627
          raise errors.OpExecError("Can't find device %s on node %s" %
3628
                                   (dev.iv_name, node))
3629

    
3630
    # Step: check other node consistency
3631
    self.proc.LogStep(2, steps_total, "check peer consistency")
3632
    for dev in instance.disks:
3633
      if not dev.iv_name in self.op.disks:
3634
        continue
3635
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3636
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3637
                                   oth_node==instance.primary_node):
3638
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3639
                                 " to replace disks on this node (%s)" %
3640
                                 (oth_node, tgt_node))
3641

    
3642
    # Step: create new storage
3643
    self.proc.LogStep(3, steps_total, "allocate new storage")
3644
    for dev in instance.disks:
3645
      if not dev.iv_name in self.op.disks:
3646
        continue
3647
      size = dev.size
3648
      cfg.SetDiskID(dev, tgt_node)
3649
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3650
      names = _GenerateUniqueNames(cfg, lv_names)
3651
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3652
                             logical_id=(vgname, names[0]))
3653
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3654
                             logical_id=(vgname, names[1]))
3655
      new_lvs = [lv_data, lv_meta]
3656
      old_lvs = dev.children
3657
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3658
      info("creating new local storage on %s for %s" %
3659
           (tgt_node, dev.iv_name))
3660
      # since we *always* want to create this LV, we use the
3661
      # _Create...OnPrimary (which forces the creation), even if we
3662
      # are talking about the secondary node
3663
      for new_lv in new_lvs:
3664
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3665
                                        _GetInstanceInfoText(instance)):
3666
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3667
                                   " node '%s'" %
3668
                                   (new_lv.logical_id[1], tgt_node))
3669

    
3670
    # Step: for each lv, detach+rename*2+attach
3671
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3672
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3673
      info("detaching %s drbd from local storage" % dev.iv_name)
3674
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3675
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3676
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3677
      #dev.children = []
3678
      #cfg.Update(instance)
3679

    
3680
      # ok, we created the new LVs, so now we know we have the needed
3681
      # storage; as such, we proceed on the target node to rename
3682
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3683
      # using the assumption that logical_id == physical_id (which in
3684
      # turn is the unique_id on that node)
3685

    
3686
      # FIXME(iustin): use a better name for the replaced LVs
3687
      temp_suffix = int(time.time())
3688
      ren_fn = lambda d, suff: (d.physical_id[0],
3689
                                d.physical_id[1] + "_replaced-%s" % suff)
3690
      # build the rename list based on what LVs exist on the node
3691
      rlist = []
3692
      for to_ren in old_lvs:
3693
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3694
        if find_res is not None: # device exists
3695
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3696

    
3697
      info("renaming the old LVs on the target node")
3698
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3699
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3700
      # now we rename the new LVs to the old LVs
3701
      info("renaming the new LVs on the target node")
3702
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3703
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3704
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3705

    
3706
      for old, new in zip(old_lvs, new_lvs):
3707
        new.logical_id = old.logical_id
3708
        cfg.SetDiskID(new, tgt_node)
3709

    
3710
      for disk in old_lvs:
3711
        disk.logical_id = ren_fn(disk, temp_suffix)
3712
        cfg.SetDiskID(disk, tgt_node)
3713

    
3714
      # now that the new lvs have the old name, we can add them to the device
3715
      info("adding new mirror component on %s" % tgt_node)
3716
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3717
        for new_lv in new_lvs:
3718
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3719
            warning("Can't rollback device %s", hint="manually cleanup unused"
3720
                    " logical volumes")
3721
        raise errors.OpExecError("Can't add local storage to drbd")
3722

    
3723
      dev.children = new_lvs
3724
      cfg.Update(instance)
3725

    
3726
    # Step: wait for sync
3727

    
3728
    # this can fail as the old devices are degraded and _WaitForSync
3729
    # does a combined result over all disks, so we don't check its
3730
    # return value
3731
    self.proc.LogStep(5, steps_total, "sync devices")
3732
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3733

    
3734
    # so check manually all the devices
3735
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3736
      cfg.SetDiskID(dev, instance.primary_node)
3737
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3738
      if is_degr:
3739
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3740

    
3741
    # Step: remove old storage
3742
    self.proc.LogStep(6, steps_total, "removing old storage")
3743
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3744
      info("remove logical volumes for %s" % name)
3745
      for lv in old_lvs:
3746
        cfg.SetDiskID(lv, tgt_node)
3747
        if not rpc.call_blockdev_remove(tgt_node, lv):
3748
          warning("Can't remove old LV", hint="manually remove unused LVs")
3749
          continue
3750

    
3751
  def _ExecD8Secondary(self, feedback_fn):
3752
    """Replace the secondary node for drbd8.
3753

3754
    The algorithm for replace is quite complicated:
3755
      - for all disks of the instance:
3756
        - create new LVs on the new node with same names
3757
        - shutdown the drbd device on the old secondary
3758
        - disconnect the drbd network on the primary
3759
        - create the drbd device on the new secondary
3760
        - network attach the drbd on the primary, using an artifice:
3761
          the drbd code for Attach() will connect to the network if it
3762
          finds a device which is connected to the good local disks but
3763
          not network enabled
3764
      - wait for sync across all devices
3765
      - remove all disks from the old secondary
3766

3767
    Failures are not very well handled.
3768

3769
    """
3770
    steps_total = 6
3771
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3772
    instance = self.instance
3773
    iv_names = {}
3774
    vgname = self.cfg.GetVGName()
3775
    # start of work
3776
    cfg = self.cfg
3777
    old_node = self.tgt_node
3778
    new_node = self.new_node
3779
    pri_node = instance.primary_node
3780

    
3781
    # Step: check device activation
3782
    self.proc.LogStep(1, steps_total, "check device existence")
3783
    info("checking volume groups")
3784
    my_vg = cfg.GetVGName()
3785
    results = rpc.call_vg_list([pri_node, new_node])
3786
    if not results:
3787
      raise errors.OpExecError("Can't list volume groups on the nodes")
3788
    for node in pri_node, new_node:
3789
      res = results.get(node, False)
3790
      if not res or my_vg not in res:
3791
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3792
                                 (my_vg, node))
3793
    for dev in instance.disks:
3794
      if not dev.iv_name in self.op.disks:
3795
        continue
3796
      info("checking %s on %s" % (dev.iv_name, pri_node))
3797
      cfg.SetDiskID(dev, pri_node)
3798
      if not rpc.call_blockdev_find(pri_node, dev):
3799
        raise errors.OpExecError("Can't find device %s on node %s" %
3800
                                 (dev.iv_name, pri_node))
3801

    
3802
    # Step: check other node consistency
3803
    self.proc.LogStep(2, steps_total, "check peer consistency")
3804
    for dev in instance.disks:
3805
      if not dev.iv_name in self.op.disks:
3806
        continue
3807
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3808
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3809
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3810
                                 " unsafe to replace the secondary" %
3811
                                 pri_node)
3812

    
3813
    # Step: create new storage
3814
    self.proc.LogStep(3, steps_total, "allocate new storage")
3815
    for dev in instance.disks:
3816
      size = dev.size
3817
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3818
      # since we *always* want to create this LV, we use the
3819
      # _Create...OnPrimary (which forces the creation), even if we
3820
      # are talking about the secondary node
3821
      for new_lv in dev.children:
3822
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3823
                                        _GetInstanceInfoText(instance)):
3824
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3825
                                   " node '%s'" %
3826
                                   (new_lv.logical_id[1], new_node))
3827

    
3828
      iv_names[dev.iv_name] = (dev, dev.children)
3829

    
3830
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3831
    for dev in instance.disks:
3832
      size = dev.size
3833
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3834
      # create new devices on new_node
3835
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3836
                              logical_id=(pri_node, new_node,
3837
                                          dev.logical_id[2]),
3838
                              children=dev.children)
3839
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3840
                                        new_drbd, False,
3841
                                      _GetInstanceInfoText(instance)):
3842
        raise errors.OpExecError("Failed to create new DRBD on"
3843
                                 " node '%s'" % new_node)
3844

    
3845
    for dev in instance.disks:
3846
      # we have new devices, shutdown the drbd on the old secondary
3847
      info("shutting down drbd for %s on old node" % dev.iv_name)
3848
      cfg.SetDiskID(dev, old_node)
3849
      if not rpc.call_blockdev_shutdown(old_node, dev):
3850
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3851
                hint="Please cleanup this device manually as soon as possible")
3852

    
3853
    info("detaching primary drbds from the network (=> standalone)")
3854
    done = 0
3855
    for dev in instance.disks:
3856
      cfg.SetDiskID(dev, pri_node)
3857
      # set the physical (unique in bdev terms) id to None, meaning
3858
      # detach from network
3859
      dev.physical_id = (None,) * len(dev.physical_id)
3860
      # and 'find' the device, which will 'fix' it to match the
3861
      # standalone state
3862
      if rpc.call_blockdev_find(pri_node, dev):
3863
        done += 1
3864
      else:
3865
        warning("Failed to detach drbd %s from network, unusual case" %
3866
                dev.iv_name)
3867

    
3868
    if not done:
3869
      # no detaches succeeded (very unlikely)
3870
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3871

    
3872
    # if we managed to detach at least one, we update all the disks of
3873
    # the instance to point to the new secondary
3874
    info("updating instance configuration")
3875
    for dev in instance.disks:
3876
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3877
      cfg.SetDiskID(dev, pri_node)
3878
    cfg.Update(instance)
3879

    
3880
    # and now perform the drbd attach
3881
    info("attaching primary drbds to new secondary (standalone => connected)")
3882
    failures = []
3883
    for dev in instance.disks:
3884
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3885
      # since the attach is smart, it's enough to 'find' the device,
3886
      # it will automatically activate the network, if the physical_id
3887
      # is correct
3888
      cfg.SetDiskID(dev, pri_node)
3889
      if not rpc.call_blockdev_find(pri_node, dev):
3890
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3891
                "please do a gnt-instance info to see the status of disks")
3892

    
3893
    # this can fail as the old devices are degraded and _WaitForSync
3894
    # does a combined result over all disks, so we don't check its
3895
    # return value
3896
    self.proc.LogStep(5, steps_total, "sync devices")
3897
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3898

    
3899
    # so check manually all the devices
3900
    for name, (dev, old_lvs) in iv_names.iteritems():
3901
      cfg.SetDiskID(dev, pri_node)
3902
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3903
      if is_degr:
3904
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3905

    
3906
    self.proc.LogStep(6, steps_total, "removing old storage")
3907
    for name, (dev, old_lvs) in iv_names.iteritems():
3908
      info("remove logical volumes for %s" % name)
3909
      for lv in old_lvs:
3910
        cfg.SetDiskID(lv, old_node)
3911
        if not rpc.call_blockdev_remove(old_node, lv):
3912
          warning("Can't remove LV on old secondary",
3913
                  hint="Cleanup stale volumes by hand")
3914

    
3915
  def Exec(self, feedback_fn):
3916
    """Execute disk replacement.
3917

3918
    This dispatches the disk replacement to the appropriate handler.
3919

3920
    """
3921
    instance = self.instance
3922

    
3923
    # Activate the instance disks if we're replacing them on a down instance
3924
    if instance.status == "down":
3925
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3926
      self.proc.ChainOpCode(op)
3927

    
3928
    if instance.disk_template == constants.DT_DRBD8:
3929
      if self.op.remote_node is None:
3930
        fn = self._ExecD8DiskOnly
3931
      else:
3932
        fn = self._ExecD8Secondary
3933
    else:
3934
      raise errors.ProgrammerError("Unhandled disk replacement case")
3935

    
3936
    ret = fn(feedback_fn)
3937

    
3938
    # Deactivate the instance disks if we're replacing them on a down instance
3939
    if instance.status == "down":
3940
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3941
      self.proc.ChainOpCode(op)
3942

    
3943
    return ret
3944

    
3945

    
3946
class LUGrowDisk(LogicalUnit):
3947
  """Grow a disk of an instance.
3948

3949
  """
3950
  HPATH = "disk-grow"
3951
  HTYPE = constants.HTYPE_INSTANCE
3952
  _OP_REQP = ["instance_name", "disk", "amount"]
3953

    
3954
  def BuildHooksEnv(self):
3955
    """Build hooks env.
3956

3957
    This runs on the master, the primary and all the secondaries.
3958

3959
    """
3960
    env = {
3961
      "DISK": self.op.disk,
3962
      "AMOUNT": self.op.amount,
3963
      }
3964
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3965
    nl = [
3966
      self.sstore.GetMasterNode(),
3967
      self.instance.primary_node,
3968
      ]
3969
    return env, nl, nl
3970

    
3971
  def CheckPrereq(self):
3972
    """Check prerequisites.
3973

3974
    This checks that the instance is in the cluster.
3975

3976
    """
3977
    instance = self.cfg.GetInstanceInfo(
3978
      self.cfg.ExpandInstanceName(self.op.instance_name))
3979
    if instance is None:
3980
      raise errors.OpPrereqError("Instance '%s' not known" %
3981
                                 self.op.instance_name)
3982
    self.instance = instance
3983
    self.op.instance_name = instance.name
3984

    
3985
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3986
      raise errors.OpPrereqError("Instance's disk layout does not support"
3987
                                 " growing.")
3988

    
3989
    if instance.FindDisk(self.op.disk) is None:
3990
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3991
                                 (self.op.disk, instance.name))
3992

    
3993
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3994
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3995
    for node in nodenames:
3996
      info = nodeinfo.get(node, None)
3997
      if not info:
3998
        raise errors.OpPrereqError("Cannot get current information"
3999
                                   " from node '%s'" % node)
4000
      vg_free = info.get('vg_free', None)
4001
      if not isinstance(vg_free, int):
4002
        raise errors.OpPrereqError("Can't compute free disk space on"
4003
                                   " node %s" % node)
4004
      if self.op.amount > info['vg_free']:
4005
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4006
                                   " %d MiB available, %d MiB required" %
4007
                                   (node, info['vg_free'], self.op.amount))
4008

    
4009
  def Exec(self, feedback_fn):
4010
    """Execute disk grow.
4011

4012
    """
4013
    instance = self.instance
4014
    disk = instance.FindDisk(self.op.disk)
4015
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4016
      self.cfg.SetDiskID(disk, node)
4017
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4018
      if not result or not isinstance(result, tuple) or len(result) != 2:
4019
        raise errors.OpExecError("grow request failed to node %s" % node)
4020
      elif not result[0]:
4021
        raise errors.OpExecError("grow request failed to node %s: %s" %
4022
                                 (node, result[1]))
4023
    disk.RecordGrow(self.op.amount)
4024
    self.cfg.Update(instance)
4025
    return
4026

    
4027

    
4028
class LUQueryInstanceData(NoHooksLU):
4029
  """Query runtime instance data.
4030

4031
  """
4032
  _OP_REQP = ["instances"]
4033

    
4034
  def CheckPrereq(self):
4035
    """Check prerequisites.
4036

4037
    This only checks the optional instance list against the existing names.
4038

4039
    """
4040
    if not isinstance(self.op.instances, list):
4041
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4042
    if self.op.instances:
4043
      self.wanted_instances = []
4044
      names = self.op.instances
4045
      for name in names:
4046
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4047
        if instance is None:
4048
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4049
        self.wanted_instances.append(instance)
4050
    else:
4051
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4052
                               in self.cfg.GetInstanceList()]
4053
    return
4054

    
4055

    
4056
  def _ComputeDiskStatus(self, instance, snode, dev):
4057
    """Compute block device status.
4058

4059
    """
4060
    self.cfg.SetDiskID(dev, instance.primary_node)
4061
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4062
    if dev.dev_type in constants.LDS_DRBD:
4063
      # we change the snode then (otherwise we use the one passed in)
4064
      if dev.logical_id[0] == instance.primary_node:
4065
        snode = dev.logical_id[1]
4066
      else:
4067
        snode = dev.logical_id[0]
4068

    
4069
    if snode:
4070
      self.cfg.SetDiskID(dev, snode)
4071
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4072
    else:
4073
      dev_sstatus = None
4074

    
4075
    if dev.children:
4076
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4077
                      for child in dev.children]
4078
    else:
4079
      dev_children = []
4080

    
4081
    data = {
4082
      "iv_name": dev.iv_name,
4083
      "dev_type": dev.dev_type,
4084
      "logical_id": dev.logical_id,
4085
      "physical_id": dev.physical_id,
4086
      "pstatus": dev_pstatus,
4087
      "sstatus": dev_sstatus,
4088
      "children": dev_children,
4089
      }
4090

    
4091
    return data
4092

    
4093
  def Exec(self, feedback_fn):
4094
    """Gather and return data"""
4095
    result = {}
4096
    for instance in self.wanted_instances:
4097
      remote_info = rpc.call_instance_info(instance.primary_node,
4098
                                                instance.name)
4099
      if remote_info and "state" in remote_info:
4100
        remote_state = "up"
4101
      else:
4102
        remote_state = "down"
4103
      if instance.status == "down":
4104
        config_state = "down"
4105
      else:
4106
        config_state = "up"
4107

    
4108
      disks = [self._ComputeDiskStatus(instance, None, device)
4109
               for device in instance.disks]
4110

    
4111
      idict = {
4112
        "name": instance.name,
4113
        "config_state": config_state,
4114
        "run_state": remote_state,
4115
        "pnode": instance.primary_node,
4116
        "snodes": instance.secondary_nodes,
4117
        "os": instance.os,
4118
        "memory": instance.memory,
4119
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4120
        "disks": disks,
4121
        "vcpus": instance.vcpus,
4122
        }
4123

    
4124
      htkind = self.sstore.GetHypervisorType()
4125
      if htkind == constants.HT_XEN_PVM30:
4126
        idict["kernel_path"] = instance.kernel_path
4127
        idict["initrd_path"] = instance.initrd_path
4128

    
4129
      if htkind == constants.HT_XEN_HVM31:
4130
        idict["hvm_boot_order"] = instance.hvm_boot_order
4131
        idict["hvm_acpi"] = instance.hvm_acpi
4132
        idict["hvm_pae"] = instance.hvm_pae
4133
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4134

    
4135
      if htkind in constants.HTS_REQ_PORT:
4136
        idict["vnc_bind_address"] = instance.vnc_bind_address
4137
        idict["network_port"] = instance.network_port
4138

    
4139
      result[instance.name] = idict
4140

    
4141
    return result
4142

    
4143

    
4144
class LUSetInstanceParams(LogicalUnit):
4145
  """Modifies an instances's parameters.
4146

4147
  """
4148
  HPATH = "instance-modify"
4149
  HTYPE = constants.HTYPE_INSTANCE
4150
  _OP_REQP = ["instance_name"]
4151
  REQ_BGL = False
4152

    
4153
  def ExpandNames(self):
4154
    self._ExpandAndLockInstance()
4155

    
4156
  def BuildHooksEnv(self):
4157
    """Build hooks env.
4158

4159
    This runs on the master, primary and secondaries.
4160

4161
    """
4162
    args = dict()
4163
    if self.mem:
4164
      args['memory'] = self.mem
4165
    if self.vcpus:
4166
      args['vcpus'] = self.vcpus
4167
    if self.do_ip or self.do_bridge or self.mac:
4168
      if self.do_ip:
4169
        ip = self.ip
4170
      else:
4171
        ip = self.instance.nics[0].ip
4172
      if self.bridge:
4173
        bridge = self.bridge
4174
      else:
4175
        bridge = self.instance.nics[0].bridge
4176
      if self.mac:
4177
        mac = self.mac
4178
      else:
4179
        mac = self.instance.nics[0].mac
4180
      args['nics'] = [(ip, bridge, mac)]
4181
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4182
    nl = [self.sstore.GetMasterNode(),
4183
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4184
    return env, nl, nl
4185

    
4186
  def CheckPrereq(self):
4187
    """Check prerequisites.
4188

4189
    This only checks the instance list against the existing names.
4190

4191
    """
4192
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4193
    # a separate CheckArguments function, if we implement one, so the operation
4194
    # can be aborted without waiting for any lock, should it have an error...
4195
    self.mem = getattr(self.op, "mem", None)
4196
    self.vcpus = getattr(self.op, "vcpus", None)
4197
    self.ip = getattr(self.op, "ip", None)
4198
    self.mac = getattr(self.op, "mac", None)
4199
    self.bridge = getattr(self.op, "bridge", None)
4200
    self.kernel_path = getattr(self.op, "kernel_path", None)
4201
    self.initrd_path = getattr(self.op, "initrd_path", None)
4202
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4203
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4204
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4205
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4206
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4207
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4208
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4209
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4210
                 self.vnc_bind_address]
4211
    if all_parms.count(None) == len(all_parms):
4212
      raise errors.OpPrereqError("No changes submitted")
4213
    if self.mem is not None:
4214
      try:
4215
        self.mem = int(self.mem)
4216
      except ValueError, err:
4217
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4218
    if self.vcpus is not None:
4219
      try:
4220
        self.vcpus = int(self.vcpus)
4221
      except ValueError, err:
4222
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4223
    if self.ip is not None:
4224
      self.do_ip = True
4225
      if self.ip.lower() == "none":
4226
        self.ip = None
4227
      else:
4228
        if not utils.IsValidIP(self.ip):
4229
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4230
    else:
4231
      self.do_ip = False
4232
    self.do_bridge = (self.bridge is not None)
4233
    if self.mac is not None:
4234
      if self.cfg.IsMacInUse(self.mac):
4235
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4236
                                   self.mac)
4237
      if not utils.IsValidMac(self.mac):
4238
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4239

    
4240
    if self.kernel_path is not None:
4241
      self.do_kernel_path = True
4242
      if self.kernel_path == constants.VALUE_NONE:
4243
        raise errors.OpPrereqError("Can't set instance to no kernel")
4244

    
4245
      if self.kernel_path != constants.VALUE_DEFAULT:
4246
        if not os.path.isabs(self.kernel_path):
4247
          raise errors.OpPrereqError("The kernel path must be an absolute"
4248
                                    " filename")
4249
    else:
4250
      self.do_kernel_path = False
4251

    
4252
    if self.initrd_path is not None:
4253
      self.do_initrd_path = True
4254
      if self.initrd_path not in (constants.VALUE_NONE,
4255
                                  constants.VALUE_DEFAULT):
4256
        if not os.path.isabs(self.initrd_path):
4257
          raise errors.OpPrereqError("The initrd path must be an absolute"
4258
                                    " filename")
4259
    else:
4260
      self.do_initrd_path = False
4261

    
4262
    # boot order verification
4263
    if self.hvm_boot_order is not None:
4264
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4265
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4266
          raise errors.OpPrereqError("invalid boot order specified,"
4267
                                     " must be one or more of [acdn]"
4268
                                     " or 'default'")
4269

    
4270
    # hvm_cdrom_image_path verification
4271
    if self.op.hvm_cdrom_image_path is not None:
4272
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4273
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4274
                                   " be an absolute path or None, not %s" %
4275
                                   self.op.hvm_cdrom_image_path)
4276
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4277
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4278
                                   " regular file or a symlink pointing to"
4279
                                   " an existing regular file, not %s" %
4280
                                   self.op.hvm_cdrom_image_path)
4281

    
4282
    # vnc_bind_address verification
4283
    if self.op.vnc_bind_address is not None:
4284
      if not utils.IsValidIP(self.op.vnc_bind_address):
4285
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4286
                                   " like a valid IP address" %
4287
                                   self.op.vnc_bind_address)
4288

    
4289
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4290
    assert self.instance is not None, \
4291
      "Cannot retrieve locked instance %s" % self.op.instance_name
4292
    return
4293

    
4294
  def Exec(self, feedback_fn):
4295
    """Modifies an instance.
4296

4297
    All parameters take effect only at the next restart of the instance.
4298
    """
4299
    result = []
4300
    instance = self.instance
4301
    if self.mem:
4302
      instance.memory = self.mem
4303
      result.append(("mem", self.mem))
4304
    if self.vcpus:
4305
      instance.vcpus = self.vcpus
4306
      result.append(("vcpus",  self.vcpus))
4307
    if self.do_ip:
4308
      instance.nics[0].ip = self.ip
4309
      result.append(("ip", self.ip))
4310
    if self.bridge:
4311
      instance.nics[0].bridge = self.bridge
4312
      result.append(("bridge", self.bridge))
4313
    if self.mac:
4314
      instance.nics[0].mac = self.mac
4315
      result.append(("mac", self.mac))
4316
    if self.do_kernel_path:
4317
      instance.kernel_path = self.kernel_path
4318
      result.append(("kernel_path", self.kernel_path))
4319
    if self.do_initrd_path:
4320
      instance.initrd_path = self.initrd_path
4321
      result.append(("initrd_path", self.initrd_path))
4322
    if self.hvm_boot_order:
4323
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4324
        instance.hvm_boot_order = None
4325
      else:
4326
        instance.hvm_boot_order = self.hvm_boot_order
4327
      result.append(("hvm_boot_order", self.hvm_boot_order))
4328
    if self.hvm_acpi:
4329
      instance.hvm_acpi = self.hvm_acpi
4330
      result.append(("hvm_acpi", self.hvm_acpi))
4331
    if self.hvm_pae:
4332
      instance.hvm_pae = self.hvm_pae
4333
      result.append(("hvm_pae", self.hvm_pae))
4334
    if self.hvm_cdrom_image_path:
4335
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4336
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4337
    if self.vnc_bind_address:
4338
      instance.vnc_bind_address = self.vnc_bind_address
4339
      result.append(("vnc_bind_address", self.vnc_bind_address))
4340

    
4341
    self.cfg.Update(instance)
4342

    
4343
    return result
4344

    
4345

    
4346
class LUQueryExports(NoHooksLU):
4347
  """Query the exports list
4348

4349
  """
4350
  _OP_REQP = []
4351

    
4352
  def CheckPrereq(self):
4353
    """Check that the nodelist contains only existing nodes.
4354

4355
    """
4356
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4357

    
4358
  def Exec(self, feedback_fn):
4359
    """Compute the list of all the exported system images.
4360

4361
    Returns:
4362
      a dictionary with the structure node->(export-list)
4363
      where export-list is a list of the instances exported on
4364
      that node.
4365

4366
    """
4367
    return rpc.call_export_list(self.nodes)
4368

    
4369

    
4370
class LUExportInstance(LogicalUnit):
4371
  """Export an instance to an image in the cluster.
4372

4373
  """
4374
  HPATH = "instance-export"
4375
  HTYPE = constants.HTYPE_INSTANCE
4376
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4377

    
4378
  def BuildHooksEnv(self):
4379
    """Build hooks env.
4380

4381
    This will run on the master, primary node and target node.
4382

4383
    """
4384
    env = {
4385
      "EXPORT_NODE": self.op.target_node,
4386
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4387
      }
4388
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4389
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4390
          self.op.target_node]
4391
    return env, nl, nl
4392

    
4393
  def CheckPrereq(self):
4394
    """Check prerequisites.
4395

4396
    This checks that the instance and node names are valid.
4397

4398
    """
4399
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4400
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4401
    if self.instance is None:
4402
      raise errors.OpPrereqError("Instance '%s' not found" %
4403
                                 self.op.instance_name)
4404

    
4405
    # node verification
4406
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4407
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4408

    
4409
    if self.dst_node is None:
4410
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4411
                                 self.op.target_node)
4412
    self.op.target_node = self.dst_node.name
4413

    
4414
    # instance disk type verification
4415
    for disk in self.instance.disks:
4416
      if disk.dev_type == constants.LD_FILE:
4417
        raise errors.OpPrereqError("Export not supported for instances with"
4418
                                   " file-based disks")
4419

    
4420
  def Exec(self, feedback_fn):
4421
    """Export an instance to an image in the cluster.
4422

4423
    """
4424
    instance = self.instance
4425
    dst_node = self.dst_node
4426
    src_node = instance.primary_node
4427
    if self.op.shutdown:
4428
      # shutdown the instance, but not the disks
4429
      if not rpc.call_instance_shutdown(src_node, instance):
4430
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4431
                                 (instance.name, src_node))
4432

    
4433
    vgname = self.cfg.GetVGName()
4434

    
4435
    snap_disks = []
4436

    
4437
    try:
4438
      for disk in instance.disks:
4439
        if disk.iv_name == "sda":
4440
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4441
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4442

    
4443
          if not new_dev_name:
4444
            logger.Error("could not snapshot block device %s on node %s" %
4445
                         (disk.logical_id[1], src_node))
4446
          else:
4447
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4448
                                      logical_id=(vgname, new_dev_name),
4449
                                      physical_id=(vgname, new_dev_name),
4450
                                      iv_name=disk.iv_name)
4451
            snap_disks.append(new_dev)
4452

    
4453
    finally:
4454
      if self.op.shutdown and instance.status == "up":
4455
        if not rpc.call_instance_start(src_node, instance, None):
4456
          _ShutdownInstanceDisks(instance, self.cfg)
4457
          raise errors.OpExecError("Could not start instance")
4458

    
4459
    # TODO: check for size
4460

    
4461
    for dev in snap_disks:
4462
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4463
        logger.Error("could not export block device %s from node %s to node %s"
4464
                     % (dev.logical_id[1], src_node, dst_node.name))
4465
      if not rpc.call_blockdev_remove(src_node, dev):
4466
        logger.Error("could not remove snapshot block device %s from node %s" %
4467
                     (dev.logical_id[1], src_node))
4468

    
4469
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4470
      logger.Error("could not finalize export for instance %s on node %s" %
4471
                   (instance.name, dst_node.name))
4472

    
4473
    nodelist = self.cfg.GetNodeList()
4474
    nodelist.remove(dst_node.name)
4475

    
4476
    # on one-node clusters nodelist will be empty after the removal
4477
    # if we proceed the backup would be removed because OpQueryExports
4478
    # substitutes an empty list with the full cluster node list.
4479
    if nodelist:
4480
      op = opcodes.OpQueryExports(nodes=nodelist)
4481
      exportlist = self.proc.ChainOpCode(op)
4482
      for node in exportlist:
4483
        if instance.name in exportlist[node]:
4484
          if not rpc.call_export_remove(node, instance.name):
4485
            logger.Error("could not remove older export for instance %s"
4486
                         " on node %s" % (instance.name, node))
4487

    
4488

    
4489
class LURemoveExport(NoHooksLU):
4490
  """Remove exports related to the named instance.
4491

4492
  """
4493
  _OP_REQP = ["instance_name"]
4494

    
4495
  def CheckPrereq(self):
4496
    """Check prerequisites.
4497
    """
4498
    pass
4499

    
4500
  def Exec(self, feedback_fn):
4501
    """Remove any export.
4502

4503
    """
4504
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4505
    # If the instance was not found we'll try with the name that was passed in.
4506
    # This will only work if it was an FQDN, though.
4507
    fqdn_warn = False
4508
    if not instance_name:
4509
      fqdn_warn = True
4510
      instance_name = self.op.instance_name
4511

    
4512
    op = opcodes.OpQueryExports(nodes=[])
4513
    exportlist = self.proc.ChainOpCode(op)
4514
    found = False
4515
    for node in exportlist:
4516
      if instance_name in exportlist[node]:
4517
        found = True
4518
        if not rpc.call_export_remove(node, instance_name):
4519
          logger.Error("could not remove export for instance %s"
4520
                       " on node %s" % (instance_name, node))
4521

    
4522
    if fqdn_warn and not found:
4523
      feedback_fn("Export not found. If trying to remove an export belonging"
4524
                  " to a deleted instance please use its Fully Qualified"
4525
                  " Domain Name.")
4526

    
4527

    
4528
class TagsLU(NoHooksLU):
4529
  """Generic tags LU.
4530

4531
  This is an abstract class which is the parent of all the other tags LUs.
4532

4533
  """
4534
  def CheckPrereq(self):
4535
    """Check prerequisites.
4536

4537
    """
4538
    if self.op.kind == constants.TAG_CLUSTER:
4539
      self.target = self.cfg.GetClusterInfo()
4540
    elif self.op.kind == constants.TAG_NODE:
4541
      name = self.cfg.ExpandNodeName(self.op.name)
4542
      if name is None:
4543
        raise errors.OpPrereqError("Invalid node name (%s)" %
4544
                                   (self.op.name,))
4545
      self.op.name = name
4546
      self.target = self.cfg.GetNodeInfo(name)
4547
    elif self.op.kind == constants.TAG_INSTANCE:
4548
      name = self.cfg.ExpandInstanceName(self.op.name)
4549
      if name is None:
4550
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4551
                                   (self.op.name,))
4552
      self.op.name = name
4553
      self.target = self.cfg.GetInstanceInfo(name)
4554
    else:
4555
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4556
                                 str(self.op.kind))
4557

    
4558

    
4559
class LUGetTags(TagsLU):
4560
  """Returns the tags of a given object.
4561

4562
  """
4563
  _OP_REQP = ["kind", "name"]
4564

    
4565
  def Exec(self, feedback_fn):
4566
    """Returns the tag list.
4567

4568
    """
4569
    return list(self.target.GetTags())
4570

    
4571

    
4572
class LUSearchTags(NoHooksLU):
4573
  """Searches the tags for a given pattern.
4574

4575
  """
4576
  _OP_REQP = ["pattern"]
4577

    
4578
  def CheckPrereq(self):
4579
    """Check prerequisites.
4580

4581
    This checks the pattern passed for validity by compiling it.
4582

4583
    """
4584
    try:
4585
      self.re = re.compile(self.op.pattern)
4586
    except re.error, err:
4587
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4588
                                 (self.op.pattern, err))
4589

    
4590
  def Exec(self, feedback_fn):
4591
    """Returns the tag list.
4592

4593
    """
4594
    cfg = self.cfg
4595
    tgts = [("/cluster", cfg.GetClusterInfo())]
4596
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4597
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4598
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4599
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4600
    results = []
4601
    for path, target in tgts:
4602
      for tag in target.GetTags():
4603
        if self.re.search(tag):
4604
          results.append((path, tag))
4605
    return results
4606

    
4607

    
4608
class LUAddTags(TagsLU):
4609
  """Sets a tag on a given object.
4610

4611
  """
4612
  _OP_REQP = ["kind", "name", "tags"]
4613

    
4614
  def CheckPrereq(self):
4615
    """Check prerequisites.
4616

4617
    This checks the type and length of the tag name and value.
4618

4619
    """
4620
    TagsLU.CheckPrereq(self)
4621
    for tag in self.op.tags:
4622
      objects.TaggableObject.ValidateTag(tag)
4623

    
4624
  def Exec(self, feedback_fn):
4625
    """Sets the tag.
4626

4627
    """
4628
    try:
4629
      for tag in self.op.tags:
4630
        self.target.AddTag(tag)
4631
    except errors.TagError, err:
4632
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4633
    try:
4634
      self.cfg.Update(self.target)
4635
    except errors.ConfigurationError:
4636
      raise errors.OpRetryError("There has been a modification to the"
4637
                                " config file and the operation has been"
4638
                                " aborted. Please retry.")
4639

    
4640

    
4641
class LUDelTags(TagsLU):
4642
  """Delete a list of tags from a given object.
4643

4644
  """
4645
  _OP_REQP = ["kind", "name", "tags"]
4646

    
4647
  def CheckPrereq(self):
4648
    """Check prerequisites.
4649

4650
    This checks that we have the given tag.
4651

4652
    """
4653
    TagsLU.CheckPrereq(self)
4654
    for tag in self.op.tags:
4655
      objects.TaggableObject.ValidateTag(tag)
4656
    del_tags = frozenset(self.op.tags)
4657
    cur_tags = self.target.GetTags()
4658
    if not del_tags <= cur_tags:
4659
      diff_tags = del_tags - cur_tags
4660
      diff_names = ["'%s'" % tag for tag in diff_tags]
4661
      diff_names.sort()
4662
      raise errors.OpPrereqError("Tag(s) %s not found" %
4663
                                 (",".join(diff_names)))
4664

    
4665
  def Exec(self, feedback_fn):
4666
    """Remove the tag from the object.
4667

4668
    """
4669
    for tag in self.op.tags:
4670
      self.target.RemoveTag(tag)
4671
    try:
4672
      self.cfg.Update(self.target)
4673
    except errors.ConfigurationError:
4674
      raise errors.OpRetryError("There has been a modification to the"
4675
                                " config file and the operation has been"
4676
                                " aborted. Please retry.")
4677

    
4678

    
4679
class LUTestDelay(NoHooksLU):
4680
  """Sleep for a specified amount of time.
4681

4682
  This LU sleeps on the master and/or nodes for a specified amount of
4683
  time.
4684

4685
  """
4686
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4687
  REQ_BGL = False
4688

    
4689
  def ExpandNames(self):
4690
    """Expand names and set required locks.
4691

4692
    This expands the node list, if any.
4693

4694
    """
4695
    self.needed_locks = {}
4696
    if self.op.on_nodes:
4697
      # _GetWantedNodes can be used here, but is not always appropriate to use
4698
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4699
      # more information.
4700
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4701
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4702

    
4703
  def CheckPrereq(self):
4704
    """Check prerequisites.
4705

4706
    """
4707

    
4708
  def Exec(self, feedback_fn):
4709
    """Do the actual sleep.
4710

4711
    """
4712
    if self.op.on_master:
4713
      if not utils.TestDelay(self.op.duration):
4714
        raise errors.OpExecError("Error during master delay test")
4715
    if self.op.on_nodes:
4716
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4717
      if not result:
4718
        raise errors.OpExecError("Complete failure from rpc call")
4719
      for node, node_result in result.items():
4720
        if not node_result:
4721
          raise errors.OpExecError("Failure during rpc call to node %s,"
4722
                                   " result: %s" % (node, node_result))
4723

    
4724

    
4725
class IAllocator(object):
4726
  """IAllocator framework.
4727

4728
  An IAllocator instance has three sets of attributes:
4729
    - cfg/sstore that are needed to query the cluster
4730
    - input data (all members of the _KEYS class attribute are required)
4731
    - four buffer attributes (in|out_data|text), that represent the
4732
      input (to the external script) in text and data structure format,
4733
      and the output from it, again in two formats
4734
    - the result variables from the script (success, info, nodes) for
4735
      easy usage
4736

4737
  """
4738
  _ALLO_KEYS = [
4739
    "mem_size", "disks", "disk_template",
4740
    "os", "tags", "nics", "vcpus",
4741
    ]
4742
  _RELO_KEYS = [
4743
    "relocate_from",
4744
    ]
4745

    
4746
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4747
    self.cfg = cfg
4748
    self.sstore = sstore
4749
    # init buffer variables
4750
    self.in_text = self.out_text = self.in_data = self.out_data = None
4751
    # init all input fields so that pylint is happy
4752
    self.mode = mode
4753
    self.name = name
4754
    self.mem_size = self.disks = self.disk_template = None
4755
    self.os = self.tags = self.nics = self.vcpus = None
4756
    self.relocate_from = None
4757
    # computed fields
4758
    self.required_nodes = None
4759
    # init result fields
4760
    self.success = self.info = self.nodes = None
4761
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4762
      keyset = self._ALLO_KEYS
4763
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4764
      keyset = self._RELO_KEYS
4765
    else:
4766
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4767
                                   " IAllocator" % self.mode)
4768
    for key in kwargs:
4769
      if key not in keyset:
4770
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4771
                                     " IAllocator" % key)
4772
      setattr(self, key, kwargs[key])
4773
    for key in keyset:
4774
      if key not in kwargs:
4775
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4776
                                     " IAllocator" % key)
4777
    self._BuildInputData()
4778

    
4779
  def _ComputeClusterData(self):
4780
    """Compute the generic allocator input data.
4781

4782
    This is the data that is independent of the actual operation.
4783

4784
    """
4785
    cfg = self.cfg
4786
    # cluster data
4787
    data = {
4788
      "version": 1,
4789
      "cluster_name": self.sstore.GetClusterName(),
4790
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4791
      "hypervisor_type": self.sstore.GetHypervisorType(),
4792
      # we don't have job IDs
4793
      }
4794

    
4795
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4796

    
4797
    # node data
4798
    node_results = {}
4799
    node_list = cfg.GetNodeList()
4800
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4801
    for nname in node_list:
4802
      ninfo = cfg.GetNodeInfo(nname)
4803
      if nname not in node_data or not isinstance(node_data[nname], dict):
4804
        raise errors.OpExecError("Can't get data for node %s" % nname)
4805
      remote_info = node_data[nname]
4806
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4807
                   'vg_size', 'vg_free', 'cpu_total']:
4808
        if attr not in remote_info:
4809
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4810
                                   (nname, attr))
4811
        try:
4812
          remote_info[attr] = int(remote_info[attr])
4813
        except ValueError, err:
4814
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4815
                                   " %s" % (nname, attr, str(err)))
4816
      # compute memory used by primary instances
4817
      i_p_mem = i_p_up_mem = 0
4818
      for iinfo in i_list:
4819
        if iinfo.primary_node == nname:
4820
          i_p_mem += iinfo.memory
4821
          if iinfo.status == "up":
4822
            i_p_up_mem += iinfo.memory
4823

    
4824
      # compute memory used by instances
4825
      pnr = {
4826
        "tags": list(ninfo.GetTags()),
4827
        "total_memory": remote_info['memory_total'],
4828
        "reserved_memory": remote_info['memory_dom0'],
4829
        "free_memory": remote_info['memory_free'],
4830
        "i_pri_memory": i_p_mem,
4831
        "i_pri_up_memory": i_p_up_mem,
4832
        "total_disk": remote_info['vg_size'],
4833
        "free_disk": remote_info['vg_free'],
4834
        "primary_ip": ninfo.primary_ip,
4835
        "secondary_ip": ninfo.secondary_ip,
4836
        "total_cpus": remote_info['cpu_total'],
4837
        }
4838
      node_results[nname] = pnr
4839
    data["nodes"] = node_results
4840

    
4841
    # instance data
4842
    instance_data = {}
4843
    for iinfo in i_list:
4844
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4845
                  for n in iinfo.nics]
4846
      pir = {
4847
        "tags": list(iinfo.GetTags()),
4848
        "should_run": iinfo.status == "up",
4849
        "vcpus": iinfo.vcpus,
4850
        "memory": iinfo.memory,
4851
        "os": iinfo.os,
4852
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4853
        "nics": nic_data,
4854
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4855
        "disk_template": iinfo.disk_template,
4856
        }
4857
      instance_data[iinfo.name] = pir
4858

    
4859
    data["instances"] = instance_data
4860

    
4861
    self.in_data = data
4862

    
4863
  def _AddNewInstance(self):
4864
    """Add new instance data to allocator structure.
4865

4866
    This in combination with _AllocatorGetClusterData will create the
4867
    correct structure needed as input for the allocator.
4868

4869
    The checks for the completeness of the opcode must have already been
4870
    done.
4871

4872
    """
4873
    data = self.in_data
4874
    if len(self.disks) != 2:
4875
      raise errors.OpExecError("Only two-disk configurations supported")
4876

    
4877
    disk_space = _ComputeDiskSize(self.disk_template,
4878
                                  self.disks[0]["size"], self.disks[1]["size"])
4879

    
4880
    if self.disk_template in constants.DTS_NET_MIRROR:
4881
      self.required_nodes = 2
4882
    else:
4883
      self.required_nodes = 1
4884
    request = {
4885
      "type": "allocate",
4886
      "name": self.name,
4887
      "disk_template": self.disk_template,
4888
      "tags": self.tags,
4889
      "os": self.os,
4890
      "vcpus": self.vcpus,
4891
      "memory": self.mem_size,
4892
      "disks": self.disks,
4893
      "disk_space_total": disk_space,
4894
      "nics": self.nics,
4895
      "required_nodes": self.required_nodes,
4896
      }
4897
    data["request"] = request
4898

    
4899
  def _AddRelocateInstance(self):
4900
    """Add relocate instance data to allocator structure.
4901

4902
    This in combination with _IAllocatorGetClusterData will create the
4903
    correct structure needed as input for the allocator.
4904

4905
    The checks for the completeness of the opcode must have already been
4906
    done.
4907

4908
    """
4909
    instance = self.cfg.GetInstanceInfo(self.name)
4910
    if instance is None:
4911
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4912
                                   " IAllocator" % self.name)
4913

    
4914
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4915
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4916

    
4917
    if len(instance.secondary_nodes) != 1:
4918
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4919

    
4920
    self.required_nodes = 1
4921

    
4922
    disk_space = _ComputeDiskSize(instance.disk_template,
4923
                                  instance.disks[0].size,
4924
                                  instance.disks[1].size)
4925

    
4926
    request = {
4927
      "type": "relocate",
4928
      "name": self.name,
4929
      "disk_space_total": disk_space,
4930
      "required_nodes": self.required_nodes,
4931
      "relocate_from": self.relocate_from,
4932
      }
4933
    self.in_data["request"] = request
4934

    
4935
  def _BuildInputData(self):
4936
    """Build input data structures.
4937

4938
    """
4939
    self._ComputeClusterData()
4940

    
4941
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4942
      self._AddNewInstance()
4943
    else:
4944
      self._AddRelocateInstance()
4945

    
4946
    self.in_text = serializer.Dump(self.in_data)
4947

    
4948
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4949
    """Run an instance allocator and return the results.
4950

4951
    """
4952
    data = self.in_text
4953

    
4954
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4955

    
4956
    if not isinstance(result, tuple) or len(result) != 4:
4957
      raise errors.OpExecError("Invalid result from master iallocator runner")
4958

    
4959
    rcode, stdout, stderr, fail = result
4960

    
4961
    if rcode == constants.IARUN_NOTFOUND:
4962
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4963
    elif rcode == constants.IARUN_FAILURE:
4964
      raise errors.OpExecError("Instance allocator call failed: %s,"
4965
                               " output: %s" % (fail, stdout+stderr))
4966
    self.out_text = stdout
4967
    if validate:
4968
      self._ValidateResult()
4969

    
4970
  def _ValidateResult(self):
4971
    """Process the allocator results.
4972

4973
    This will process and if successful save the result in
4974
    self.out_data and the other parameters.
4975

4976
    """
4977
    try:
4978
      rdict = serializer.Load(self.out_text)
4979
    except Exception, err:
4980
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4981

    
4982
    if not isinstance(rdict, dict):
4983
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4984

    
4985
    for key in "success", "info", "nodes":
4986
      if key not in rdict:
4987
        raise errors.OpExecError("Can't parse iallocator results:"
4988
                                 " missing key '%s'" % key)
4989
      setattr(self, key, rdict[key])
4990

    
4991
    if not isinstance(rdict["nodes"], list):
4992
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4993
                               " is not a list")
4994
    self.out_data = rdict
4995

    
4996

    
4997
class LUTestAllocator(NoHooksLU):
4998
  """Run allocator tests.
4999

5000
  This LU runs the allocator tests
5001

5002
  """
5003
  _OP_REQP = ["direction", "mode", "name"]
5004

    
5005
  def CheckPrereq(self):
5006
    """Check prerequisites.
5007

5008
    This checks the opcode parameters depending on the director and mode test.
5009

5010
    """
5011
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5012
      for attr in ["name", "mem_size", "disks", "disk_template",
5013
                   "os", "tags", "nics", "vcpus"]:
5014
        if not hasattr(self.op, attr):
5015
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5016
                                     attr)
5017
      iname = self.cfg.ExpandInstanceName(self.op.name)
5018
      if iname is not None:
5019
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5020
                                   iname)
5021
      if not isinstance(self.op.nics, list):
5022
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5023
      for row in self.op.nics:
5024
        if (not isinstance(row, dict) or
5025
            "mac" not in row or
5026
            "ip" not in row or
5027
            "bridge" not in row):
5028
          raise errors.OpPrereqError("Invalid contents of the"
5029
                                     " 'nics' parameter")
5030
      if not isinstance(self.op.disks, list):
5031
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5032
      if len(self.op.disks) != 2:
5033
        raise errors.OpPrereqError("Only two-disk configurations supported")
5034
      for row in self.op.disks:
5035
        if (not isinstance(row, dict) or
5036
            "size" not in row or
5037
            not isinstance(row["size"], int) or
5038
            "mode" not in row or
5039
            row["mode"] not in ['r', 'w']):
5040
          raise errors.OpPrereqError("Invalid contents of the"
5041
                                     " 'disks' parameter")
5042
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5043
      if not hasattr(self.op, "name"):
5044
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5045
      fname = self.cfg.ExpandInstanceName(self.op.name)
5046
      if fname is None:
5047
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5048
                                   self.op.name)
5049
      self.op.name = fname
5050
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5051
    else:
5052
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5053
                                 self.op.mode)
5054

    
5055
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5056
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5057
        raise errors.OpPrereqError("Missing allocator name")
5058
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5059
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5060
                                 self.op.direction)
5061

    
5062
  def Exec(self, feedback_fn):
5063
    """Run the allocator test.
5064

5065
    """
5066
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5067
      ial = IAllocator(self.cfg, self.sstore,
5068
                       mode=self.op.mode,
5069
                       name=self.op.name,
5070
                       mem_size=self.op.mem_size,
5071
                       disks=self.op.disks,
5072
                       disk_template=self.op.disk_template,
5073
                       os=self.op.os,
5074
                       tags=self.op.tags,
5075
                       nics=self.op.nics,
5076
                       vcpus=self.op.vcpus,
5077
                       )
5078
    else:
5079
      ial = IAllocator(self.cfg, self.sstore,
5080
                       mode=self.op.mode,
5081
                       name=self.op.name,
5082
                       relocate_from=list(self.relocate_from),
5083
                       )
5084

    
5085
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5086
      result = ial.in_text
5087
    else:
5088
      ial.Run(self.op.allocator, validate=False)
5089
      result = ial.out_text
5090
    return result