Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 6c0af70e

History | View | Annotate | Download (190.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_MASTER = True
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = self.cfg.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130
      - Use an empty dict if you don't need any lock
131
      - If you don't need any lock at a particular level omit that level
132
      - Don't put anything for the BGL level
133
      - If you want all locks at a level use locking.ALL_SET as a value
134

135
    If you need to share locks (rather than acquire them exclusively) at one
136
    level you can modify self.share_locks, setting a true value (usually 1) for
137
    that level. By default locks are not shared.
138

139
    Examples:
140
    # Acquire all nodes and one instance
141
    self.needed_locks = {
142
      locking.LEVEL_NODE: locking.ALL_SET,
143
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144
    }
145
    # Acquire just two nodes
146
    self.needed_locks = {
147
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148
    }
149
    # Acquire no locks
150
    self.needed_locks = {} # No, you can't leave it to the default value None
151

152
    """
153
    # The implementation of this method is mandatory only if the new LU is
154
    # concurrent, so that old LUs don't need to be changed all at the same
155
    # time.
156
    if self.REQ_BGL:
157
      self.needed_locks = {} # Exclusive LUs don't need locks.
158
    else:
159
      raise NotImplementedError
160

    
161
  def DeclareLocks(self, level):
162
    """Declare LU locking needs for a level
163

164
    While most LUs can just declare their locking needs at ExpandNames time,
165
    sometimes there's the need to calculate some locks after having acquired
166
    the ones before. This function is called just before acquiring locks at a
167
    particular level, but after acquiring the ones at lower levels, and permits
168
    such calculations. It can be used to modify self.needed_locks, and by
169
    default it does nothing.
170

171
    This function is only called if you have something already set in
172
    self.needed_locks for the level.
173

174
    @param level: Locking level which is going to be locked
175
    @type level: member of ganeti.locking.LEVELS
176

177
    """
178

    
179
  def CheckPrereq(self):
180
    """Check prerequisites for this LU.
181

182
    This method should check that the prerequisites for the execution
183
    of this LU are fulfilled. It can do internode communication, but
184
    it should be idempotent - no cluster or system changes are
185
    allowed.
186

187
    The method should raise errors.OpPrereqError in case something is
188
    not fulfilled. Its return value is ignored.
189

190
    This method should also update all the parameters of the opcode to
191
    their canonical form if it hasn't been done by ExpandNames before.
192

193
    """
194
    raise NotImplementedError
195

    
196
  def Exec(self, feedback_fn):
197
    """Execute the LU.
198

199
    This method should implement the actual work. It should raise
200
    errors.OpExecError for failures that are somewhat dealt with in
201
    code, or expected.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def BuildHooksEnv(self):
207
    """Build hooks environment for this LU.
208

209
    This method should return a three-node tuple consisting of: a dict
210
    containing the environment that will be used for running the
211
    specific hook for this LU, a list of node names on which the hook
212
    should run before the execution, and a list of node names on which
213
    the hook should run after the execution.
214

215
    The keys of the dict must not have 'GANETI_' prefixed as this will
216
    be handled in the hooks runner. Also note additional keys will be
217
    added by the hooks runner. If the LU doesn't define any
218
    environment, an empty dict (and not None) should be returned.
219

220
    No nodes should be returned as an empty list (and not None).
221

222
    Note that if the HPATH for a LU class is None, this function will
223
    not be called.
224

225
    """
226
    raise NotImplementedError
227

    
228
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229
    """Notify the LU about the results of its hooks.
230

231
    This method is called every time a hooks phase is executed, and notifies
232
    the Logical Unit about the hooks' result. The LU can then use it to alter
233
    its result based on the hooks.  By default the method does nothing and the
234
    previous result is passed back unchanged but any LU can define it if it
235
    wants to use the local cluster hook-scripts somehow.
236

237
    Args:
238
      phase: the hooks phase that has just been run
239
      hooks_results: the results of the multi-node hooks rpc call
240
      feedback_fn: function to send feedback back to the caller
241
      lu_result: the previous result this LU had, or None in the PRE phase.
242

243
    """
244
    return lu_result
245

    
246
  def _ExpandAndLockInstance(self):
247
    """Helper function to expand and lock an instance.
248

249
    Many LUs that work on an instance take its name in self.op.instance_name
250
    and need to expand it and then declare the expanded name for locking. This
251
    function does it, and then updates self.op.instance_name to the expanded
252
    name. It also initializes needed_locks as a dict, if this hasn't been done
253
    before.
254

255
    """
256
    if self.needed_locks is None:
257
      self.needed_locks = {}
258
    else:
259
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260
        "_ExpandAndLockInstance called with instance-level locks set"
261
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262
    if expanded_name is None:
263
      raise errors.OpPrereqError("Instance '%s' not known" %
264
                                  self.op.instance_name)
265
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266
    self.op.instance_name = expanded_name
267

    
268
  def _LockInstancesNodes(self, primary_only=False):
269
    """Helper function to declare instances' nodes for locking.
270

271
    This function should be called after locking one or more instances to lock
272
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273
    with all primary or secondary nodes for instances already locked and
274
    present in self.needed_locks[locking.LEVEL_INSTANCE].
275

276
    It should be called from DeclareLocks, and for safety only works if
277
    self.recalculate_locks[locking.LEVEL_NODE] is set.
278

279
    In the future it may grow parameters to just lock some instance's nodes, or
280
    to just lock primaries or secondary nodes, if needed.
281

282
    If should be called in DeclareLocks in a way similar to:
283

284
    if level == locking.LEVEL_NODE:
285
      self._LockInstancesNodes()
286

287
    @type primary_only: boolean
288
    @param primary_only: only lock primary nodes of locked instances
289

290
    """
291
    assert locking.LEVEL_NODE in self.recalculate_locks, \
292
      "_LockInstancesNodes helper function called with no nodes to recalculate"
293

    
294
    # TODO: check if we're really been called with the instance locks held
295

    
296
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297
    # future we might want to have different behaviors depending on the value
298
    # of self.recalculate_locks[locking.LEVEL_NODE]
299
    wanted_nodes = []
300
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301
      instance = self.context.cfg.GetInstanceInfo(instance_name)
302
      wanted_nodes.append(instance.primary_node)
303
      if not primary_only:
304
        wanted_nodes.extend(instance.secondary_nodes)
305

    
306
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310

    
311
    del self.recalculate_locks[locking.LEVEL_NODE]
312

    
313

    
314
class NoHooksLU(LogicalUnit):
315
  """Simple LU which runs no hooks.
316

317
  This LU is intended as a parent for other LogicalUnits which will
318
  run no hooks, in order to reduce duplicate code.
319

320
  """
321
  HPATH = None
322
  HTYPE = None
323

    
324

    
325
def _GetWantedNodes(lu, nodes):
326
  """Returns list of checked and expanded node names.
327

328
  Args:
329
    nodes: List of nodes (strings) or None for all
330

331
  """
332
  if not isinstance(nodes, list):
333
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
334

    
335
  if not nodes:
336
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337
      " non-empty list of nodes whose name is to be expanded.")
338

    
339
  wanted = []
340
  for name in nodes:
341
    node = lu.cfg.ExpandNodeName(name)
342
    if node is None:
343
      raise errors.OpPrereqError("No such node name '%s'" % name)
344
    wanted.append(node)
345

    
346
  return utils.NiceSort(wanted)
347

    
348

    
349
def _GetWantedInstances(lu, instances):
350
  """Returns list of checked and expanded instance names.
351

352
  Args:
353
    instances: List of instances (strings) or None for all
354

355
  """
356
  if not isinstance(instances, list):
357
    raise errors.OpPrereqError("Invalid argument type 'instances'")
358

    
359
  if instances:
360
    wanted = []
361

    
362
    for name in instances:
363
      instance = lu.cfg.ExpandInstanceName(name)
364
      if instance is None:
365
        raise errors.OpPrereqError("No such instance name '%s'" % name)
366
      wanted.append(instance)
367

    
368
  else:
369
    wanted = lu.cfg.GetInstanceList()
370
  return utils.NiceSort(wanted)
371

    
372

    
373
def _CheckOutputFields(static, dynamic, selected):
374
  """Checks whether all selected fields are valid.
375

376
  Args:
377
    static: Static fields
378
    dynamic: Dynamic fields
379

380
  """
381
  static_fields = frozenset(static)
382
  dynamic_fields = frozenset(dynamic)
383

    
384
  all_fields = static_fields | dynamic_fields
385

    
386
  if not all_fields.issuperset(selected):
387
    raise errors.OpPrereqError("Unknown output fields selected: %s"
388
                               % ",".join(frozenset(selected).
389
                                          difference(all_fields)))
390

    
391

    
392
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393
                          memory, vcpus, nics):
394
  """Builds instance related env variables for hooks from single variables.
395

396
  Args:
397
    secondary_nodes: List of secondary nodes as strings
398
  """
399
  env = {
400
    "OP_TARGET": name,
401
    "INSTANCE_NAME": name,
402
    "INSTANCE_PRIMARY": primary_node,
403
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404
    "INSTANCE_OS_TYPE": os_type,
405
    "INSTANCE_STATUS": status,
406
    "INSTANCE_MEMORY": memory,
407
    "INSTANCE_VCPUS": vcpus,
408
  }
409

    
410
  if nics:
411
    nic_count = len(nics)
412
    for idx, (ip, bridge, mac) in enumerate(nics):
413
      if ip is None:
414
        ip = ""
415
      env["INSTANCE_NIC%d_IP" % idx] = ip
416
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418
  else:
419
    nic_count = 0
420

    
421
  env["INSTANCE_NIC_COUNT"] = nic_count
422

    
423
  return env
424

    
425

    
426
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
427
  """Builds instance related env variables for hooks from an object.
428

429
  Args:
430
    instance: objects.Instance object of instance
431
    override: dict of values to override
432
  """
433
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
434
  args = {
435
    'name': instance.name,
436
    'primary_node': instance.primary_node,
437
    'secondary_nodes': instance.secondary_nodes,
438
    'os_type': instance.os,
439
    'status': instance.os,
440
    'memory': bep[constants.BE_MEMORY],
441
    'vcpus': bep[constants.BE_VCPUS],
442
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
443
  }
444
  if override:
445
    args.update(override)
446
  return _BuildInstanceHookEnv(**args)
447

    
448

    
449
def _CheckInstanceBridgesExist(lu, instance):
450
  """Check that the brigdes needed by an instance exist.
451

452
  """
453
  # check bridges existance
454
  brlist = [nic.bridge for nic in instance.nics]
455
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
456
    raise errors.OpPrereqError("one or more target bridges %s does not"
457
                               " exist on destination node '%s'" %
458
                               (brlist, instance.primary_node))
459

    
460

    
461
class LUDestroyCluster(NoHooksLU):
462
  """Logical unit for destroying the cluster.
463

464
  """
465
  _OP_REQP = []
466

    
467
  def CheckPrereq(self):
468
    """Check prerequisites.
469

470
    This checks whether the cluster is empty.
471

472
    Any errors are signalled by raising errors.OpPrereqError.
473

474
    """
475
    master = self.cfg.GetMasterNode()
476

    
477
    nodelist = self.cfg.GetNodeList()
478
    if len(nodelist) != 1 or nodelist[0] != master:
479
      raise errors.OpPrereqError("There are still %d node(s) in"
480
                                 " this cluster." % (len(nodelist) - 1))
481
    instancelist = self.cfg.GetInstanceList()
482
    if instancelist:
483
      raise errors.OpPrereqError("There are still %d instance(s) in"
484
                                 " this cluster." % len(instancelist))
485

    
486
  def Exec(self, feedback_fn):
487
    """Destroys the cluster.
488

489
    """
490
    master = self.cfg.GetMasterNode()
491
    if not self.rpc.call_node_stop_master(master, False):
492
      raise errors.OpExecError("Could not disable the master role")
493
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
494
    utils.CreateBackup(priv_key)
495
    utils.CreateBackup(pub_key)
496
    return master
497

    
498

    
499
class LUVerifyCluster(LogicalUnit):
500
  """Verifies the cluster status.
501

502
  """
503
  HPATH = "cluster-verify"
504
  HTYPE = constants.HTYPE_CLUSTER
505
  _OP_REQP = ["skip_checks"]
506
  REQ_BGL = False
507

    
508
  def ExpandNames(self):
509
    self.needed_locks = {
510
      locking.LEVEL_NODE: locking.ALL_SET,
511
      locking.LEVEL_INSTANCE: locking.ALL_SET,
512
    }
513
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
514

    
515
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
516
                  remote_version, feedback_fn):
517
    """Run multiple tests against a node.
518

519
    Test list:
520
      - compares ganeti version
521
      - checks vg existance and size > 20G
522
      - checks config file checksum
523
      - checks ssh to other nodes
524

525
    Args:
526
      node: name of the node to check
527
      file_list: required list of files
528
      local_cksum: dictionary of local files and their checksums
529

530
    """
531
    # compares ganeti version
532
    local_version = constants.PROTOCOL_VERSION
533
    if not remote_version:
534
      feedback_fn("  - ERROR: connection to %s failed" % (node))
535
      return True
536

    
537
    if local_version != remote_version:
538
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
539
                      (local_version, node, remote_version))
540
      return True
541

    
542
    # checks vg existance and size > 20G
543

    
544
    bad = False
545
    if not vglist:
546
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
547
                      (node,))
548
      bad = True
549
    else:
550
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
551
                                            constants.MIN_VG_SIZE)
552
      if vgstatus:
553
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
554
        bad = True
555

    
556
    if not node_result:
557
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
558
      return True
559

    
560
    # checks config file checksum
561
    # checks ssh to any
562

    
563
    if 'filelist' not in node_result:
564
      bad = True
565
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
566
    else:
567
      remote_cksum = node_result['filelist']
568
      for file_name in file_list:
569
        if file_name not in remote_cksum:
570
          bad = True
571
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
572
        elif remote_cksum[file_name] != local_cksum[file_name]:
573
          bad = True
574
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
575

    
576
    if 'nodelist' not in node_result:
577
      bad = True
578
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
579
    else:
580
      if node_result['nodelist']:
581
        bad = True
582
        for node in node_result['nodelist']:
583
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
584
                          (node, node_result['nodelist'][node]))
585
    if 'node-net-test' not in node_result:
586
      bad = True
587
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
588
    else:
589
      if node_result['node-net-test']:
590
        bad = True
591
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
592
        for node in nlist:
593
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
594
                          (node, node_result['node-net-test'][node]))
595

    
596
    hyp_result = node_result.get('hypervisor', None)
597
    if isinstance(hyp_result, dict):
598
      for hv_name, hv_result in hyp_result.iteritems():
599
        if hv_result is not None:
600
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
601
                      (hv_name, hv_result))
602
    return bad
603

    
604
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
605
                      node_instance, feedback_fn):
606
    """Verify an instance.
607

608
    This function checks to see if the required block devices are
609
    available on the instance's node.
610

611
    """
612
    bad = False
613

    
614
    node_current = instanceconfig.primary_node
615

    
616
    node_vol_should = {}
617
    instanceconfig.MapLVsByNode(node_vol_should)
618

    
619
    for node in node_vol_should:
620
      for volume in node_vol_should[node]:
621
        if node not in node_vol_is or volume not in node_vol_is[node]:
622
          feedback_fn("  - ERROR: volume %s missing on node %s" %
623
                          (volume, node))
624
          bad = True
625

    
626
    if not instanceconfig.status == 'down':
627
      if (node_current not in node_instance or
628
          not instance in node_instance[node_current]):
629
        feedback_fn("  - ERROR: instance %s not running on node %s" %
630
                        (instance, node_current))
631
        bad = True
632

    
633
    for node in node_instance:
634
      if (not node == node_current):
635
        if instance in node_instance[node]:
636
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
637
                          (instance, node))
638
          bad = True
639

    
640
    return bad
641

    
642
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
643
    """Verify if there are any unknown volumes in the cluster.
644

645
    The .os, .swap and backup volumes are ignored. All other volumes are
646
    reported as unknown.
647

648
    """
649
    bad = False
650

    
651
    for node in node_vol_is:
652
      for volume in node_vol_is[node]:
653
        if node not in node_vol_should or volume not in node_vol_should[node]:
654
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
655
                      (volume, node))
656
          bad = True
657
    return bad
658

    
659
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
660
    """Verify the list of running instances.
661

662
    This checks what instances are running but unknown to the cluster.
663

664
    """
665
    bad = False
666
    for node in node_instance:
667
      for runninginstance in node_instance[node]:
668
        if runninginstance not in instancelist:
669
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
670
                          (runninginstance, node))
671
          bad = True
672
    return bad
673

    
674
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
675
    """Verify N+1 Memory Resilience.
676

677
    Check that if one single node dies we can still start all the instances it
678
    was primary for.
679

680
    """
681
    bad = False
682

    
683
    for node, nodeinfo in node_info.iteritems():
684
      # This code checks that every node which is now listed as secondary has
685
      # enough memory to host all instances it is supposed to should a single
686
      # other node in the cluster fail.
687
      # FIXME: not ready for failover to an arbitrary node
688
      # FIXME: does not support file-backed instances
689
      # WARNING: we currently take into account down instances as well as up
690
      # ones, considering that even if they're down someone might want to start
691
      # them even in the event of a node failure.
692
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
693
        needed_mem = 0
694
        for instance in instances:
695
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
696
          if bep[constants.BE_AUTO_BALANCE]:
697
            needed_mem += bep[constants.BE_MEMORY]
698
        if nodeinfo['mfree'] < needed_mem:
699
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
700
                      " failovers should node %s fail" % (node, prinode))
701
          bad = True
702
    return bad
703

    
704
  def CheckPrereq(self):
705
    """Check prerequisites.
706

707
    Transform the list of checks we're going to skip into a set and check that
708
    all its members are valid.
709

710
    """
711
    self.skip_set = frozenset(self.op.skip_checks)
712
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
713
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
714

    
715
  def BuildHooksEnv(self):
716
    """Build hooks env.
717

718
    Cluster-Verify hooks just rone in the post phase and their failure makes
719
    the output be logged in the verify output and the verification to fail.
720

721
    """
722
    all_nodes = self.cfg.GetNodeList()
723
    # TODO: populate the environment with useful information for verify hooks
724
    env = {}
725
    return env, [], all_nodes
726

    
727
  def Exec(self, feedback_fn):
728
    """Verify integrity of cluster, performing various test on nodes.
729

730
    """
731
    bad = False
732
    feedback_fn("* Verifying global settings")
733
    for msg in self.cfg.VerifyConfig():
734
      feedback_fn("  - ERROR: %s" % msg)
735

    
736
    vg_name = self.cfg.GetVGName()
737
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
738
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
739
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
740
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
741
    i_non_redundant = [] # Non redundant instances
742
    i_non_a_balanced = [] # Non auto-balanced instances
743
    node_volume = {}
744
    node_instance = {}
745
    node_info = {}
746
    instance_cfg = {}
747

    
748
    # FIXME: verify OS list
749
    # do local checksums
750
    file_names = []
751
    file_names.append(constants.SSL_CERT_FILE)
752
    file_names.append(constants.CLUSTER_CONF_FILE)
753
    local_checksums = utils.FingerprintFiles(file_names)
754

    
755
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
756
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
757
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
758
    all_vglist = self.rpc.call_vg_list(nodelist)
759
    node_verify_param = {
760
      'filelist': file_names,
761
      'nodelist': nodelist,
762
      'hypervisor': hypervisors,
763
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
764
                        for node in nodeinfo]
765
      }
766
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
767
                                           self.cfg.GetClusterName())
768
    all_rversion = self.rpc.call_version(nodelist)
769
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
770
                                        self.cfg.GetHypervisorType())
771

    
772
    cluster = self.cfg.GetClusterInfo()
773
    for node in nodelist:
774
      feedback_fn("* Verifying node %s" % node)
775
      result = self._VerifyNode(node, file_names, local_checksums,
776
                                all_vglist[node], all_nvinfo[node],
777
                                all_rversion[node], feedback_fn)
778
      bad = bad or result
779

    
780
      # node_volume
781
      volumeinfo = all_volumeinfo[node]
782

    
783
      if isinstance(volumeinfo, basestring):
784
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
785
                    (node, volumeinfo[-400:].encode('string_escape')))
786
        bad = True
787
        node_volume[node] = {}
788
      elif not isinstance(volumeinfo, dict):
789
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
790
        bad = True
791
        continue
792
      else:
793
        node_volume[node] = volumeinfo
794

    
795
      # node_instance
796
      nodeinstance = all_instanceinfo[node]
797
      if type(nodeinstance) != list:
798
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
799
        bad = True
800
        continue
801

    
802
      node_instance[node] = nodeinstance
803

    
804
      # node_info
805
      nodeinfo = all_ninfo[node]
806
      if not isinstance(nodeinfo, dict):
807
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
808
        bad = True
809
        continue
810

    
811
      try:
812
        node_info[node] = {
813
          "mfree": int(nodeinfo['memory_free']),
814
          "dfree": int(nodeinfo['vg_free']),
815
          "pinst": [],
816
          "sinst": [],
817
          # dictionary holding all instances this node is secondary for,
818
          # grouped by their primary node. Each key is a cluster node, and each
819
          # value is a list of instances which have the key as primary and the
820
          # current node as secondary.  this is handy to calculate N+1 memory
821
          # availability if you can only failover from a primary to its
822
          # secondary.
823
          "sinst-by-pnode": {},
824
        }
825
      except ValueError:
826
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
827
        bad = True
828
        continue
829

    
830
    node_vol_should = {}
831

    
832
    for instance in instancelist:
833
      feedback_fn("* Verifying instance %s" % instance)
834
      inst_config = self.cfg.GetInstanceInfo(instance)
835
      result =  self._VerifyInstance(instance, inst_config, node_volume,
836
                                     node_instance, feedback_fn)
837
      bad = bad or result
838

    
839
      inst_config.MapLVsByNode(node_vol_should)
840

    
841
      instance_cfg[instance] = inst_config
842

    
843
      pnode = inst_config.primary_node
844
      if pnode in node_info:
845
        node_info[pnode]['pinst'].append(instance)
846
      else:
847
        feedback_fn("  - ERROR: instance %s, connection to primary node"
848
                    " %s failed" % (instance, pnode))
849
        bad = True
850

    
851
      # If the instance is non-redundant we cannot survive losing its primary
852
      # node, so we are not N+1 compliant. On the other hand we have no disk
853
      # templates with more than one secondary so that situation is not well
854
      # supported either.
855
      # FIXME: does not support file-backed instances
856
      if len(inst_config.secondary_nodes) == 0:
857
        i_non_redundant.append(instance)
858
      elif len(inst_config.secondary_nodes) > 1:
859
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
860
                    % instance)
861

    
862
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
863
        i_non_a_balanced.append(instance)
864

    
865
      for snode in inst_config.secondary_nodes:
866
        if snode in node_info:
867
          node_info[snode]['sinst'].append(instance)
868
          if pnode not in node_info[snode]['sinst-by-pnode']:
869
            node_info[snode]['sinst-by-pnode'][pnode] = []
870
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
871
        else:
872
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
873
                      " %s failed" % (instance, snode))
874

    
875
    feedback_fn("* Verifying orphan volumes")
876
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
877
                                       feedback_fn)
878
    bad = bad or result
879

    
880
    feedback_fn("* Verifying remaining instances")
881
    result = self._VerifyOrphanInstances(instancelist, node_instance,
882
                                         feedback_fn)
883
    bad = bad or result
884

    
885
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
886
      feedback_fn("* Verifying N+1 Memory redundancy")
887
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
888
      bad = bad or result
889

    
890
    feedback_fn("* Other Notes")
891
    if i_non_redundant:
892
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
893
                  % len(i_non_redundant))
894

    
895
    if i_non_a_balanced:
896
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
897
                  % len(i_non_a_balanced))
898

    
899
    return not bad
900

    
901
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
902
    """Analize the post-hooks' result, handle it, and send some
903
    nicely-formatted feedback back to the user.
904

905
    Args:
906
      phase: the hooks phase that has just been run
907
      hooks_results: the results of the multi-node hooks rpc call
908
      feedback_fn: function to send feedback back to the caller
909
      lu_result: previous Exec result
910

911
    """
912
    # We only really run POST phase hooks, and are only interested in
913
    # their results
914
    if phase == constants.HOOKS_PHASE_POST:
915
      # Used to change hooks' output to proper indentation
916
      indent_re = re.compile('^', re.M)
917
      feedback_fn("* Hooks Results")
918
      if not hooks_results:
919
        feedback_fn("  - ERROR: general communication failure")
920
        lu_result = 1
921
      else:
922
        for node_name in hooks_results:
923
          show_node_header = True
924
          res = hooks_results[node_name]
925
          if res is False or not isinstance(res, list):
926
            feedback_fn("    Communication failure")
927
            lu_result = 1
928
            continue
929
          for script, hkr, output in res:
930
            if hkr == constants.HKR_FAIL:
931
              # The node header is only shown once, if there are
932
              # failing hooks on that node
933
              if show_node_header:
934
                feedback_fn("  Node %s:" % node_name)
935
                show_node_header = False
936
              feedback_fn("    ERROR: Script %s failed, output:" % script)
937
              output = indent_re.sub('      ', output)
938
              feedback_fn("%s" % output)
939
              lu_result = 1
940

    
941
      return lu_result
942

    
943

    
944
class LUVerifyDisks(NoHooksLU):
945
  """Verifies the cluster disks status.
946

947
  """
948
  _OP_REQP = []
949
  REQ_BGL = False
950

    
951
  def ExpandNames(self):
952
    self.needed_locks = {
953
      locking.LEVEL_NODE: locking.ALL_SET,
954
      locking.LEVEL_INSTANCE: locking.ALL_SET,
955
    }
956
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
957

    
958
  def CheckPrereq(self):
959
    """Check prerequisites.
960

961
    This has no prerequisites.
962

963
    """
964
    pass
965

    
966
  def Exec(self, feedback_fn):
967
    """Verify integrity of cluster disks.
968

969
    """
970
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
971

    
972
    vg_name = self.cfg.GetVGName()
973
    nodes = utils.NiceSort(self.cfg.GetNodeList())
974
    instances = [self.cfg.GetInstanceInfo(name)
975
                 for name in self.cfg.GetInstanceList()]
976

    
977
    nv_dict = {}
978
    for inst in instances:
979
      inst_lvs = {}
980
      if (inst.status != "up" or
981
          inst.disk_template not in constants.DTS_NET_MIRROR):
982
        continue
983
      inst.MapLVsByNode(inst_lvs)
984
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
985
      for node, vol_list in inst_lvs.iteritems():
986
        for vol in vol_list:
987
          nv_dict[(node, vol)] = inst
988

    
989
    if not nv_dict:
990
      return result
991

    
992
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
993

    
994
    to_act = set()
995
    for node in nodes:
996
      # node_volume
997
      lvs = node_lvs[node]
998

    
999
      if isinstance(lvs, basestring):
1000
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1001
        res_nlvm[node] = lvs
1002
      elif not isinstance(lvs, dict):
1003
        logging.warning("Connection to node %s failed or invalid data"
1004
                        " returned", node)
1005
        res_nodes.append(node)
1006
        continue
1007

    
1008
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1009
        inst = nv_dict.pop((node, lv_name), None)
1010
        if (not lv_online and inst is not None
1011
            and inst.name not in res_instances):
1012
          res_instances.append(inst.name)
1013

    
1014
    # any leftover items in nv_dict are missing LVs, let's arrange the
1015
    # data better
1016
    for key, inst in nv_dict.iteritems():
1017
      if inst.name not in res_missing:
1018
        res_missing[inst.name] = []
1019
      res_missing[inst.name].append(key)
1020

    
1021
    return result
1022

    
1023

    
1024
class LURenameCluster(LogicalUnit):
1025
  """Rename the cluster.
1026

1027
  """
1028
  HPATH = "cluster-rename"
1029
  HTYPE = constants.HTYPE_CLUSTER
1030
  _OP_REQP = ["name"]
1031

    
1032
  def BuildHooksEnv(self):
1033
    """Build hooks env.
1034

1035
    """
1036
    env = {
1037
      "OP_TARGET": self.cfg.GetClusterName(),
1038
      "NEW_NAME": self.op.name,
1039
      }
1040
    mn = self.cfg.GetMasterNode()
1041
    return env, [mn], [mn]
1042

    
1043
  def CheckPrereq(self):
1044
    """Verify that the passed name is a valid one.
1045

1046
    """
1047
    hostname = utils.HostInfo(self.op.name)
1048

    
1049
    new_name = hostname.name
1050
    self.ip = new_ip = hostname.ip
1051
    old_name = self.cfg.GetClusterName()
1052
    old_ip = self.cfg.GetMasterIP()
1053
    if new_name == old_name and new_ip == old_ip:
1054
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1055
                                 " cluster has changed")
1056
    if new_ip != old_ip:
1057
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1058
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1059
                                   " reachable on the network. Aborting." %
1060
                                   new_ip)
1061

    
1062
    self.op.name = new_name
1063

    
1064
  def Exec(self, feedback_fn):
1065
    """Rename the cluster.
1066

1067
    """
1068
    clustername = self.op.name
1069
    ip = self.ip
1070

    
1071
    # shutdown the master IP
1072
    master = self.cfg.GetMasterNode()
1073
    if not self.rpc.call_node_stop_master(master, False):
1074
      raise errors.OpExecError("Could not disable the master role")
1075

    
1076
    try:
1077
      # modify the sstore
1078
      # TODO: sstore
1079
      ss.SetKey(ss.SS_MASTER_IP, ip)
1080
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1081

    
1082
      # Distribute updated ss config to all nodes
1083
      myself = self.cfg.GetNodeInfo(master)
1084
      dist_nodes = self.cfg.GetNodeList()
1085
      if myself.name in dist_nodes:
1086
        dist_nodes.remove(myself.name)
1087

    
1088
      logging.debug("Copying updated ssconf data to all nodes")
1089
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1090
        fname = ss.KeyToFilename(keyname)
1091
        result = self.rpc.call_upload_file(dist_nodes, fname)
1092
        for to_node in dist_nodes:
1093
          if not result[to_node]:
1094
            self.LogWarning("Copy of file %s to node %s failed",
1095
                            fname, to_node)
1096
    finally:
1097
      if not self.rpc.call_node_start_master(master, False):
1098
        self.LogWarning("Could not re-enable the master role on"
1099
                        " the master, please restart manually.")
1100

    
1101

    
1102
def _RecursiveCheckIfLVMBased(disk):
1103
  """Check if the given disk or its children are lvm-based.
1104

1105
  Args:
1106
    disk: ganeti.objects.Disk object
1107

1108
  Returns:
1109
    boolean indicating whether a LD_LV dev_type was found or not
1110

1111
  """
1112
  if disk.children:
1113
    for chdisk in disk.children:
1114
      if _RecursiveCheckIfLVMBased(chdisk):
1115
        return True
1116
  return disk.dev_type == constants.LD_LV
1117

    
1118

    
1119
class LUSetClusterParams(LogicalUnit):
1120
  """Change the parameters of the cluster.
1121

1122
  """
1123
  HPATH = "cluster-modify"
1124
  HTYPE = constants.HTYPE_CLUSTER
1125
  _OP_REQP = []
1126
  REQ_BGL = False
1127

    
1128
  def ExpandNames(self):
1129
    # FIXME: in the future maybe other cluster params won't require checking on
1130
    # all nodes to be modified.
1131
    self.needed_locks = {
1132
      locking.LEVEL_NODE: locking.ALL_SET,
1133
    }
1134
    self.share_locks[locking.LEVEL_NODE] = 1
1135

    
1136
  def BuildHooksEnv(self):
1137
    """Build hooks env.
1138

1139
    """
1140
    env = {
1141
      "OP_TARGET": self.cfg.GetClusterName(),
1142
      "NEW_VG_NAME": self.op.vg_name,
1143
      }
1144
    mn = self.cfg.GetMasterNode()
1145
    return env, [mn], [mn]
1146

    
1147
  def CheckPrereq(self):
1148
    """Check prerequisites.
1149

1150
    This checks whether the given params don't conflict and
1151
    if the given volume group is valid.
1152

1153
    """
1154
    # FIXME: This only works because there is only one parameter that can be
1155
    # changed or removed.
1156
    if self.op.vg_name is not None and not self.op.vg_name:
1157
      instances = self.cfg.GetAllInstancesInfo().values()
1158
      for inst in instances:
1159
        for disk in inst.disks:
1160
          if _RecursiveCheckIfLVMBased(disk):
1161
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1162
                                       " lvm-based instances exist")
1163

    
1164
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1165

    
1166
    # if vg_name not None, checks given volume group on all nodes
1167
    if self.op.vg_name:
1168
      vglist = self.rpc.call_vg_list(node_list)
1169
      for node in node_list:
1170
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1171
                                              constants.MIN_VG_SIZE)
1172
        if vgstatus:
1173
          raise errors.OpPrereqError("Error on node '%s': %s" %
1174
                                     (node, vgstatus))
1175

    
1176
    self.cluster = cluster = self.cfg.GetClusterInfo()
1177
    # beparams changes do not need validation (we can't validate?),
1178
    # but we still process here
1179
    if self.op.beparams:
1180
      self.new_beparams = cluster.FillDict(
1181
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1182

    
1183
    # hypervisor list/parameters
1184
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1185
    if self.op.hvparams:
1186
      if not isinstance(self.op.hvparams, dict):
1187
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1188
      for hv_name, hv_dict in self.op.hvparams.items():
1189
        if hv_name not in self.new_hvparams:
1190
          self.new_hvparams[hv_name] = hv_dict
1191
        else:
1192
          self.new_hvparams[hv_name].update(hv_dict)
1193

    
1194
    if self.op.enabled_hypervisors is not None:
1195
      self.hv_list = self.op.enabled_hypervisors
1196
    else:
1197
      self.hv_list = cluster.enabled_hypervisors
1198

    
1199
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1200
      # either the enabled list has changed, or the parameters have, validate
1201
      for hv_name, hv_params in self.new_hvparams.items():
1202
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1203
            (self.op.enabled_hypervisors and
1204
             hv_name in self.op.enabled_hypervisors)):
1205
          # either this is a new hypervisor, or its parameters have changed
1206
          hv_class = hypervisor.GetHypervisor(hv_name)
1207
          hv_class.CheckParameterSyntax(hv_params)
1208
          _CheckHVParams(self, node_list, hv_name, hv_params)
1209

    
1210
  def Exec(self, feedback_fn):
1211
    """Change the parameters of the cluster.
1212

1213
    """
1214
    if self.op.vg_name is not None:
1215
      if self.op.vg_name != self.cfg.GetVGName():
1216
        self.cfg.SetVGName(self.op.vg_name)
1217
      else:
1218
        feedback_fn("Cluster LVM configuration already in desired"
1219
                    " state, not changing")
1220
    if self.op.hvparams:
1221
      self.cluster.hvparams = self.new_hvparams
1222
    if self.op.enabled_hypervisors is not None:
1223
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1224
    if self.op.beparams:
1225
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1226
    self.cfg.Update(self.cluster)
1227

    
1228

    
1229
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1230
  """Sleep and poll for an instance's disk to sync.
1231

1232
  """
1233
  if not instance.disks:
1234
    return True
1235

    
1236
  if not oneshot:
1237
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1238

    
1239
  node = instance.primary_node
1240

    
1241
  for dev in instance.disks:
1242
    lu.cfg.SetDiskID(dev, node)
1243

    
1244
  retries = 0
1245
  while True:
1246
    max_time = 0
1247
    done = True
1248
    cumul_degraded = False
1249
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1250
    if not rstats:
1251
      lu.LogWarning("Can't get any data from node %s", node)
1252
      retries += 1
1253
      if retries >= 10:
1254
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1255
                                 " aborting." % node)
1256
      time.sleep(6)
1257
      continue
1258
    retries = 0
1259
    for i in range(len(rstats)):
1260
      mstat = rstats[i]
1261
      if mstat is None:
1262
        lu.LogWarning("Can't compute data for node %s/%s",
1263
                           node, instance.disks[i].iv_name)
1264
        continue
1265
      # we ignore the ldisk parameter
1266
      perc_done, est_time, is_degraded, _ = mstat
1267
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1268
      if perc_done is not None:
1269
        done = False
1270
        if est_time is not None:
1271
          rem_time = "%d estimated seconds remaining" % est_time
1272
          max_time = est_time
1273
        else:
1274
          rem_time = "no time estimate"
1275
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1276
                        (instance.disks[i].iv_name, perc_done, rem_time))
1277
    if done or oneshot:
1278
      break
1279

    
1280
    time.sleep(min(60, max_time))
1281

    
1282
  if done:
1283
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1284
  return not cumul_degraded
1285

    
1286

    
1287
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1288
  """Check that mirrors are not degraded.
1289

1290
  The ldisk parameter, if True, will change the test from the
1291
  is_degraded attribute (which represents overall non-ok status for
1292
  the device(s)) to the ldisk (representing the local storage status).
1293

1294
  """
1295
  lu.cfg.SetDiskID(dev, node)
1296
  if ldisk:
1297
    idx = 6
1298
  else:
1299
    idx = 5
1300

    
1301
  result = True
1302
  if on_primary or dev.AssembleOnSecondary():
1303
    rstats = lu.rpc.call_blockdev_find(node, dev)
1304
    if not rstats:
1305
      logging.warning("Node %s: disk degraded, not found or node down", node)
1306
      result = False
1307
    else:
1308
      result = result and (not rstats[idx])
1309
  if dev.children:
1310
    for child in dev.children:
1311
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1312

    
1313
  return result
1314

    
1315

    
1316
class LUDiagnoseOS(NoHooksLU):
1317
  """Logical unit for OS diagnose/query.
1318

1319
  """
1320
  _OP_REQP = ["output_fields", "names"]
1321
  REQ_BGL = False
1322

    
1323
  def ExpandNames(self):
1324
    if self.op.names:
1325
      raise errors.OpPrereqError("Selective OS query not supported")
1326

    
1327
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1328
    _CheckOutputFields(static=[],
1329
                       dynamic=self.dynamic_fields,
1330
                       selected=self.op.output_fields)
1331

    
1332
    # Lock all nodes, in shared mode
1333
    self.needed_locks = {}
1334
    self.share_locks[locking.LEVEL_NODE] = 1
1335
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1336

    
1337
  def CheckPrereq(self):
1338
    """Check prerequisites.
1339

1340
    """
1341

    
1342
  @staticmethod
1343
  def _DiagnoseByOS(node_list, rlist):
1344
    """Remaps a per-node return list into an a per-os per-node dictionary
1345

1346
      Args:
1347
        node_list: a list with the names of all nodes
1348
        rlist: a map with node names as keys and OS objects as values
1349

1350
      Returns:
1351
        map: a map with osnames as keys and as value another map, with
1352
             nodes as
1353
             keys and list of OS objects as values
1354
             e.g. {"debian-etch": {"node1": [<object>,...],
1355
                                   "node2": [<object>,]}
1356
                  }
1357

1358
    """
1359
    all_os = {}
1360
    for node_name, nr in rlist.iteritems():
1361
      if not nr:
1362
        continue
1363
      for os_obj in nr:
1364
        if os_obj.name not in all_os:
1365
          # build a list of nodes for this os containing empty lists
1366
          # for each node in node_list
1367
          all_os[os_obj.name] = {}
1368
          for nname in node_list:
1369
            all_os[os_obj.name][nname] = []
1370
        all_os[os_obj.name][node_name].append(os_obj)
1371
    return all_os
1372

    
1373
  def Exec(self, feedback_fn):
1374
    """Compute the list of OSes.
1375

1376
    """
1377
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1378
    node_data = self.rpc.call_os_diagnose(node_list)
1379
    if node_data == False:
1380
      raise errors.OpExecError("Can't gather the list of OSes")
1381
    pol = self._DiagnoseByOS(node_list, node_data)
1382
    output = []
1383
    for os_name, os_data in pol.iteritems():
1384
      row = []
1385
      for field in self.op.output_fields:
1386
        if field == "name":
1387
          val = os_name
1388
        elif field == "valid":
1389
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1390
        elif field == "node_status":
1391
          val = {}
1392
          for node_name, nos_list in os_data.iteritems():
1393
            val[node_name] = [(v.status, v.path) for v in nos_list]
1394
        else:
1395
          raise errors.ParameterError(field)
1396
        row.append(val)
1397
      output.append(row)
1398

    
1399
    return output
1400

    
1401

    
1402
class LURemoveNode(LogicalUnit):
1403
  """Logical unit for removing a node.
1404

1405
  """
1406
  HPATH = "node-remove"
1407
  HTYPE = constants.HTYPE_NODE
1408
  _OP_REQP = ["node_name"]
1409

    
1410
  def BuildHooksEnv(self):
1411
    """Build hooks env.
1412

1413
    This doesn't run on the target node in the pre phase as a failed
1414
    node would then be impossible to remove.
1415

1416
    """
1417
    env = {
1418
      "OP_TARGET": self.op.node_name,
1419
      "NODE_NAME": self.op.node_name,
1420
      }
1421
    all_nodes = self.cfg.GetNodeList()
1422
    all_nodes.remove(self.op.node_name)
1423
    return env, all_nodes, all_nodes
1424

    
1425
  def CheckPrereq(self):
1426
    """Check prerequisites.
1427

1428
    This checks:
1429
     - the node exists in the configuration
1430
     - it does not have primary or secondary instances
1431
     - it's not the master
1432

1433
    Any errors are signalled by raising errors.OpPrereqError.
1434

1435
    """
1436
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1437
    if node is None:
1438
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1439

    
1440
    instance_list = self.cfg.GetInstanceList()
1441

    
1442
    masternode = self.cfg.GetMasterNode()
1443
    if node.name == masternode:
1444
      raise errors.OpPrereqError("Node is the master node,"
1445
                                 " you need to failover first.")
1446

    
1447
    for instance_name in instance_list:
1448
      instance = self.cfg.GetInstanceInfo(instance_name)
1449
      if node.name == instance.primary_node:
1450
        raise errors.OpPrereqError("Instance %s still running on the node,"
1451
                                   " please remove first." % instance_name)
1452
      if node.name in instance.secondary_nodes:
1453
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1454
                                   " please remove first." % instance_name)
1455
    self.op.node_name = node.name
1456
    self.node = node
1457

    
1458
  def Exec(self, feedback_fn):
1459
    """Removes the node from the cluster.
1460

1461
    """
1462
    node = self.node
1463
    logging.info("Stopping the node daemon and removing configs from node %s",
1464
                 node.name)
1465

    
1466
    self.context.RemoveNode(node.name)
1467

    
1468
    self.rpc.call_node_leave_cluster(node.name)
1469

    
1470

    
1471
class LUQueryNodes(NoHooksLU):
1472
  """Logical unit for querying nodes.
1473

1474
  """
1475
  _OP_REQP = ["output_fields", "names"]
1476
  REQ_BGL = False
1477

    
1478
  def ExpandNames(self):
1479
    self.dynamic_fields = frozenset([
1480
      "dtotal", "dfree",
1481
      "mtotal", "mnode", "mfree",
1482
      "bootid",
1483
      "ctotal",
1484
      ])
1485

    
1486
    self.static_fields = frozenset([
1487
      "name", "pinst_cnt", "sinst_cnt",
1488
      "pinst_list", "sinst_list",
1489
      "pip", "sip", "tags",
1490
      "serial_no",
1491
      ])
1492

    
1493
    _CheckOutputFields(static=self.static_fields,
1494
                       dynamic=self.dynamic_fields,
1495
                       selected=self.op.output_fields)
1496

    
1497
    self.needed_locks = {}
1498
    self.share_locks[locking.LEVEL_NODE] = 1
1499

    
1500
    if self.op.names:
1501
      self.wanted = _GetWantedNodes(self, self.op.names)
1502
    else:
1503
      self.wanted = locking.ALL_SET
1504

    
1505
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1506
    if self.do_locking:
1507
      # if we don't request only static fields, we need to lock the nodes
1508
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1509

    
1510

    
1511
  def CheckPrereq(self):
1512
    """Check prerequisites.
1513

1514
    """
1515
    # The validation of the node list is done in the _GetWantedNodes,
1516
    # if non empty, and if empty, there's no validation to do
1517
    pass
1518

    
1519
  def Exec(self, feedback_fn):
1520
    """Computes the list of nodes and their attributes.
1521

1522
    """
1523
    all_info = self.cfg.GetAllNodesInfo()
1524
    if self.do_locking:
1525
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1526
    elif self.wanted != locking.ALL_SET:
1527
      nodenames = self.wanted
1528
      missing = set(nodenames).difference(all_info.keys())
1529
      if missing:
1530
        raise errors.OpExecError(
1531
          "Some nodes were removed before retrieving their data: %s" % missing)
1532
    else:
1533
      nodenames = all_info.keys()
1534

    
1535
    nodenames = utils.NiceSort(nodenames)
1536
    nodelist = [all_info[name] for name in nodenames]
1537

    
1538
    # begin data gathering
1539

    
1540
    if self.dynamic_fields.intersection(self.op.output_fields):
1541
      live_data = {}
1542
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1543
                                          self.cfg.GetHypervisorType())
1544
      for name in nodenames:
1545
        nodeinfo = node_data.get(name, None)
1546
        if nodeinfo:
1547
          live_data[name] = {
1548
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1549
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1550
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1551
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1552
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1553
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1554
            "bootid": nodeinfo['bootid'],
1555
            }
1556
        else:
1557
          live_data[name] = {}
1558
    else:
1559
      live_data = dict.fromkeys(nodenames, {})
1560

    
1561
    node_to_primary = dict([(name, set()) for name in nodenames])
1562
    node_to_secondary = dict([(name, set()) for name in nodenames])
1563

    
1564
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1565
                             "sinst_cnt", "sinst_list"))
1566
    if inst_fields & frozenset(self.op.output_fields):
1567
      instancelist = self.cfg.GetInstanceList()
1568

    
1569
      for instance_name in instancelist:
1570
        inst = self.cfg.GetInstanceInfo(instance_name)
1571
        if inst.primary_node in node_to_primary:
1572
          node_to_primary[inst.primary_node].add(inst.name)
1573
        for secnode in inst.secondary_nodes:
1574
          if secnode in node_to_secondary:
1575
            node_to_secondary[secnode].add(inst.name)
1576

    
1577
    # end data gathering
1578

    
1579
    output = []
1580
    for node in nodelist:
1581
      node_output = []
1582
      for field in self.op.output_fields:
1583
        if field == "name":
1584
          val = node.name
1585
        elif field == "pinst_list":
1586
          val = list(node_to_primary[node.name])
1587
        elif field == "sinst_list":
1588
          val = list(node_to_secondary[node.name])
1589
        elif field == "pinst_cnt":
1590
          val = len(node_to_primary[node.name])
1591
        elif field == "sinst_cnt":
1592
          val = len(node_to_secondary[node.name])
1593
        elif field == "pip":
1594
          val = node.primary_ip
1595
        elif field == "sip":
1596
          val = node.secondary_ip
1597
        elif field == "tags":
1598
          val = list(node.GetTags())
1599
        elif field == "serial_no":
1600
          val = node.serial_no
1601
        elif field in self.dynamic_fields:
1602
          val = live_data[node.name].get(field, None)
1603
        else:
1604
          raise errors.ParameterError(field)
1605
        node_output.append(val)
1606
      output.append(node_output)
1607

    
1608
    return output
1609

    
1610

    
1611
class LUQueryNodeVolumes(NoHooksLU):
1612
  """Logical unit for getting volumes on node(s).
1613

1614
  """
1615
  _OP_REQP = ["nodes", "output_fields"]
1616
  REQ_BGL = False
1617

    
1618
  def ExpandNames(self):
1619
    _CheckOutputFields(static=["node"],
1620
                       dynamic=["phys", "vg", "name", "size", "instance"],
1621
                       selected=self.op.output_fields)
1622

    
1623
    self.needed_locks = {}
1624
    self.share_locks[locking.LEVEL_NODE] = 1
1625
    if not self.op.nodes:
1626
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1627
    else:
1628
      self.needed_locks[locking.LEVEL_NODE] = \
1629
        _GetWantedNodes(self, self.op.nodes)
1630

    
1631
  def CheckPrereq(self):
1632
    """Check prerequisites.
1633

1634
    This checks that the fields required are valid output fields.
1635

1636
    """
1637
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1638

    
1639
  def Exec(self, feedback_fn):
1640
    """Computes the list of nodes and their attributes.
1641

1642
    """
1643
    nodenames = self.nodes
1644
    volumes = self.rpc.call_node_volumes(nodenames)
1645

    
1646
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1647
             in self.cfg.GetInstanceList()]
1648

    
1649
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1650

    
1651
    output = []
1652
    for node in nodenames:
1653
      if node not in volumes or not volumes[node]:
1654
        continue
1655

    
1656
      node_vols = volumes[node][:]
1657
      node_vols.sort(key=lambda vol: vol['dev'])
1658

    
1659
      for vol in node_vols:
1660
        node_output = []
1661
        for field in self.op.output_fields:
1662
          if field == "node":
1663
            val = node
1664
          elif field == "phys":
1665
            val = vol['dev']
1666
          elif field == "vg":
1667
            val = vol['vg']
1668
          elif field == "name":
1669
            val = vol['name']
1670
          elif field == "size":
1671
            val = int(float(vol['size']))
1672
          elif field == "instance":
1673
            for inst in ilist:
1674
              if node not in lv_by_node[inst]:
1675
                continue
1676
              if vol['name'] in lv_by_node[inst][node]:
1677
                val = inst.name
1678
                break
1679
            else:
1680
              val = '-'
1681
          else:
1682
            raise errors.ParameterError(field)
1683
          node_output.append(str(val))
1684

    
1685
        output.append(node_output)
1686

    
1687
    return output
1688

    
1689

    
1690
class LUAddNode(LogicalUnit):
1691
  """Logical unit for adding node to the cluster.
1692

1693
  """
1694
  HPATH = "node-add"
1695
  HTYPE = constants.HTYPE_NODE
1696
  _OP_REQP = ["node_name"]
1697

    
1698
  def BuildHooksEnv(self):
1699
    """Build hooks env.
1700

1701
    This will run on all nodes before, and on all nodes + the new node after.
1702

1703
    """
1704
    env = {
1705
      "OP_TARGET": self.op.node_name,
1706
      "NODE_NAME": self.op.node_name,
1707
      "NODE_PIP": self.op.primary_ip,
1708
      "NODE_SIP": self.op.secondary_ip,
1709
      }
1710
    nodes_0 = self.cfg.GetNodeList()
1711
    nodes_1 = nodes_0 + [self.op.node_name, ]
1712
    return env, nodes_0, nodes_1
1713

    
1714
  def CheckPrereq(self):
1715
    """Check prerequisites.
1716

1717
    This checks:
1718
     - the new node is not already in the config
1719
     - it is resolvable
1720
     - its parameters (single/dual homed) matches the cluster
1721

1722
    Any errors are signalled by raising errors.OpPrereqError.
1723

1724
    """
1725
    node_name = self.op.node_name
1726
    cfg = self.cfg
1727

    
1728
    dns_data = utils.HostInfo(node_name)
1729

    
1730
    node = dns_data.name
1731
    primary_ip = self.op.primary_ip = dns_data.ip
1732
    secondary_ip = getattr(self.op, "secondary_ip", None)
1733
    if secondary_ip is None:
1734
      secondary_ip = primary_ip
1735
    if not utils.IsValidIP(secondary_ip):
1736
      raise errors.OpPrereqError("Invalid secondary IP given")
1737
    self.op.secondary_ip = secondary_ip
1738

    
1739
    node_list = cfg.GetNodeList()
1740
    if not self.op.readd and node in node_list:
1741
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1742
                                 node)
1743
    elif self.op.readd and node not in node_list:
1744
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1745

    
1746
    for existing_node_name in node_list:
1747
      existing_node = cfg.GetNodeInfo(existing_node_name)
1748

    
1749
      if self.op.readd and node == existing_node_name:
1750
        if (existing_node.primary_ip != primary_ip or
1751
            existing_node.secondary_ip != secondary_ip):
1752
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1753
                                     " address configuration as before")
1754
        continue
1755

    
1756
      if (existing_node.primary_ip == primary_ip or
1757
          existing_node.secondary_ip == primary_ip or
1758
          existing_node.primary_ip == secondary_ip or
1759
          existing_node.secondary_ip == secondary_ip):
1760
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1761
                                   " existing node %s" % existing_node.name)
1762

    
1763
    # check that the type of the node (single versus dual homed) is the
1764
    # same as for the master
1765
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1766
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1767
    newbie_singlehomed = secondary_ip == primary_ip
1768
    if master_singlehomed != newbie_singlehomed:
1769
      if master_singlehomed:
1770
        raise errors.OpPrereqError("The master has no private ip but the"
1771
                                   " new node has one")
1772
      else:
1773
        raise errors.OpPrereqError("The master has a private ip but the"
1774
                                   " new node doesn't have one")
1775

    
1776
    # checks reachablity
1777
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1778
      raise errors.OpPrereqError("Node not reachable by ping")
1779

    
1780
    if not newbie_singlehomed:
1781
      # check reachability from my secondary ip to newbie's secondary ip
1782
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1783
                           source=myself.secondary_ip):
1784
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1785
                                   " based ping to noded port")
1786

    
1787
    self.new_node = objects.Node(name=node,
1788
                                 primary_ip=primary_ip,
1789
                                 secondary_ip=secondary_ip)
1790

    
1791
  def Exec(self, feedback_fn):
1792
    """Adds the new node to the cluster.
1793

1794
    """
1795
    new_node = self.new_node
1796
    node = new_node.name
1797

    
1798
    # check connectivity
1799
    result = self.rpc.call_version([node])[node]
1800
    if result:
1801
      if constants.PROTOCOL_VERSION == result:
1802
        logging.info("Communication to node %s fine, sw version %s match",
1803
                     node, result)
1804
      else:
1805
        raise errors.OpExecError("Version mismatch master version %s,"
1806
                                 " node version %s" %
1807
                                 (constants.PROTOCOL_VERSION, result))
1808
    else:
1809
      raise errors.OpExecError("Cannot get version from the new node")
1810

    
1811
    # setup ssh on node
1812
    logging.info("Copy ssh key to node %s", node)
1813
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1814
    keyarray = []
1815
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1816
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1817
                priv_key, pub_key]
1818

    
1819
    for i in keyfiles:
1820
      f = open(i, 'r')
1821
      try:
1822
        keyarray.append(f.read())
1823
      finally:
1824
        f.close()
1825

    
1826
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1827
                                    keyarray[2],
1828
                                    keyarray[3], keyarray[4], keyarray[5])
1829

    
1830
    if not result:
1831
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1832

    
1833
    # Add node to our /etc/hosts, and add key to known_hosts
1834
    utils.AddHostToEtcHosts(new_node.name)
1835

    
1836
    if new_node.secondary_ip != new_node.primary_ip:
1837
      if not self.rpc.call_node_has_ip_address(new_node.name,
1838
                                               new_node.secondary_ip):
1839
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1840
                                 " you gave (%s). Please fix and re-run this"
1841
                                 " command." % new_node.secondary_ip)
1842

    
1843
    node_verify_list = [self.cfg.GetMasterNode()]
1844
    node_verify_param = {
1845
      'nodelist': [node],
1846
      # TODO: do a node-net-test as well?
1847
    }
1848

    
1849
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1850
                                       self.cfg.GetClusterName())
1851
    for verifier in node_verify_list:
1852
      if not result[verifier]:
1853
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1854
                                 " for remote verification" % verifier)
1855
      if result[verifier]['nodelist']:
1856
        for failed in result[verifier]['nodelist']:
1857
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1858
                      (verifier, result[verifier]['nodelist'][failed]))
1859
        raise errors.OpExecError("ssh/hostname verification failed.")
1860

    
1861
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1862
    # including the node just added
1863
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1864
    dist_nodes = self.cfg.GetNodeList()
1865
    if not self.op.readd:
1866
      dist_nodes.append(node)
1867
    if myself.name in dist_nodes:
1868
      dist_nodes.remove(myself.name)
1869

    
1870
    logging.debug("Copying hosts and known_hosts to all nodes")
1871
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1872
      result = self.rpc.call_upload_file(dist_nodes, fname)
1873
      for to_node in dist_nodes:
1874
        if not result[to_node]:
1875
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1876

    
1877
    to_copy = []
1878
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1879
      to_copy.append(constants.VNC_PASSWORD_FILE)
1880
    for fname in to_copy:
1881
      result = self.rpc.call_upload_file([node], fname)
1882
      if not result[node]:
1883
        logging.error("Could not copy file %s to node %s", fname, node)
1884

    
1885
    if self.op.readd:
1886
      self.context.ReaddNode(new_node)
1887
    else:
1888
      self.context.AddNode(new_node)
1889

    
1890

    
1891
class LUQueryClusterInfo(NoHooksLU):
1892
  """Query cluster configuration.
1893

1894
  """
1895
  _OP_REQP = []
1896
  REQ_MASTER = False
1897
  REQ_BGL = False
1898

    
1899
  def ExpandNames(self):
1900
    self.needed_locks = {}
1901

    
1902
  def CheckPrereq(self):
1903
    """No prerequsites needed for this LU.
1904

1905
    """
1906
    pass
1907

    
1908
  def Exec(self, feedback_fn):
1909
    """Return cluster config.
1910

1911
    """
1912
    cluster = self.cfg.GetClusterInfo()
1913
    result = {
1914
      "software_version": constants.RELEASE_VERSION,
1915
      "protocol_version": constants.PROTOCOL_VERSION,
1916
      "config_version": constants.CONFIG_VERSION,
1917
      "os_api_version": constants.OS_API_VERSION,
1918
      "export_version": constants.EXPORT_VERSION,
1919
      "architecture": (platform.architecture()[0], platform.machine()),
1920
      "name": cluster.cluster_name,
1921
      "master": cluster.master_node,
1922
      "default_hypervisor": cluster.default_hypervisor,
1923
      "enabled_hypervisors": cluster.enabled_hypervisors,
1924
      "hvparams": cluster.hvparams,
1925
      "beparams": cluster.beparams,
1926
      }
1927

    
1928
    return result
1929

    
1930

    
1931
class LUQueryConfigValues(NoHooksLU):
1932
  """Return configuration values.
1933

1934
  """
1935
  _OP_REQP = []
1936
  REQ_BGL = False
1937

    
1938
  def ExpandNames(self):
1939
    self.needed_locks = {}
1940

    
1941
    static_fields = ["cluster_name", "master_node", "drain_flag"]
1942
    _CheckOutputFields(static=static_fields,
1943
                       dynamic=[],
1944
                       selected=self.op.output_fields)
1945

    
1946
  def CheckPrereq(self):
1947
    """No prerequisites.
1948

1949
    """
1950
    pass
1951

    
1952
  def Exec(self, feedback_fn):
1953
    """Dump a representation of the cluster config to the standard output.
1954

1955
    """
1956
    values = []
1957
    for field in self.op.output_fields:
1958
      if field == "cluster_name":
1959
        entry = self.cfg.GetClusterName()
1960
      elif field == "master_node":
1961
        entry = self.cfg.GetMasterNode()
1962
      elif field == "drain_flag":
1963
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1964
      else:
1965
        raise errors.ParameterError(field)
1966
      values.append(entry)
1967
    return values
1968

    
1969

    
1970
class LUActivateInstanceDisks(NoHooksLU):
1971
  """Bring up an instance's disks.
1972

1973
  """
1974
  _OP_REQP = ["instance_name"]
1975
  REQ_BGL = False
1976

    
1977
  def ExpandNames(self):
1978
    self._ExpandAndLockInstance()
1979
    self.needed_locks[locking.LEVEL_NODE] = []
1980
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1981

    
1982
  def DeclareLocks(self, level):
1983
    if level == locking.LEVEL_NODE:
1984
      self._LockInstancesNodes()
1985

    
1986
  def CheckPrereq(self):
1987
    """Check prerequisites.
1988

1989
    This checks that the instance is in the cluster.
1990

1991
    """
1992
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1993
    assert self.instance is not None, \
1994
      "Cannot retrieve locked instance %s" % self.op.instance_name
1995

    
1996
  def Exec(self, feedback_fn):
1997
    """Activate the disks.
1998

1999
    """
2000
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2001
    if not disks_ok:
2002
      raise errors.OpExecError("Cannot activate block devices")
2003

    
2004
    return disks_info
2005

    
2006

    
2007
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2008
  """Prepare the block devices for an instance.
2009

2010
  This sets up the block devices on all nodes.
2011

2012
  Args:
2013
    instance: a ganeti.objects.Instance object
2014
    ignore_secondaries: if true, errors on secondary nodes won't result
2015
                        in an error return from the function
2016

2017
  Returns:
2018
    false if the operation failed
2019
    list of (host, instance_visible_name, node_visible_name) if the operation
2020
         suceeded with the mapping from node devices to instance devices
2021
  """
2022
  device_info = []
2023
  disks_ok = True
2024
  iname = instance.name
2025
  # With the two passes mechanism we try to reduce the window of
2026
  # opportunity for the race condition of switching DRBD to primary
2027
  # before handshaking occured, but we do not eliminate it
2028

    
2029
  # The proper fix would be to wait (with some limits) until the
2030
  # connection has been made and drbd transitions from WFConnection
2031
  # into any other network-connected state (Connected, SyncTarget,
2032
  # SyncSource, etc.)
2033

    
2034
  # 1st pass, assemble on all nodes in secondary mode
2035
  for inst_disk in instance.disks:
2036
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2037
      lu.cfg.SetDiskID(node_disk, node)
2038
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2039
      if not result:
2040
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2041
                           " (is_primary=False, pass=1)",
2042
                           inst_disk.iv_name, node)
2043
        if not ignore_secondaries:
2044
          disks_ok = False
2045

    
2046
  # FIXME: race condition on drbd migration to primary
2047

    
2048
  # 2nd pass, do only the primary node
2049
  for inst_disk in instance.disks:
2050
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2051
      if node != instance.primary_node:
2052
        continue
2053
      lu.cfg.SetDiskID(node_disk, node)
2054
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2055
      if not result:
2056
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2057
                           " (is_primary=True, pass=2)",
2058
                           inst_disk.iv_name, node)
2059
        disks_ok = False
2060
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2061

    
2062
  # leave the disks configured for the primary node
2063
  # this is a workaround that would be fixed better by
2064
  # improving the logical/physical id handling
2065
  for disk in instance.disks:
2066
    lu.cfg.SetDiskID(disk, instance.primary_node)
2067

    
2068
  return disks_ok, device_info
2069

    
2070

    
2071
def _StartInstanceDisks(lu, instance, force):
2072
  """Start the disks of an instance.
2073

2074
  """
2075
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2076
                                           ignore_secondaries=force)
2077
  if not disks_ok:
2078
    _ShutdownInstanceDisks(lu, instance)
2079
    if force is not None and not force:
2080
      lu.proc.LogWarning("", hint="If the message above refers to a"
2081
                         " secondary node,"
2082
                         " you can retry the operation using '--force'.")
2083
    raise errors.OpExecError("Disk consistency error")
2084

    
2085

    
2086
class LUDeactivateInstanceDisks(NoHooksLU):
2087
  """Shutdown an instance's disks.
2088

2089
  """
2090
  _OP_REQP = ["instance_name"]
2091
  REQ_BGL = False
2092

    
2093
  def ExpandNames(self):
2094
    self._ExpandAndLockInstance()
2095
    self.needed_locks[locking.LEVEL_NODE] = []
2096
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2097

    
2098
  def DeclareLocks(self, level):
2099
    if level == locking.LEVEL_NODE:
2100
      self._LockInstancesNodes()
2101

    
2102
  def CheckPrereq(self):
2103
    """Check prerequisites.
2104

2105
    This checks that the instance is in the cluster.
2106

2107
    """
2108
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2109
    assert self.instance is not None, \
2110
      "Cannot retrieve locked instance %s" % self.op.instance_name
2111

    
2112
  def Exec(self, feedback_fn):
2113
    """Deactivate the disks
2114

2115
    """
2116
    instance = self.instance
2117
    _SafeShutdownInstanceDisks(self, instance)
2118

    
2119

    
2120
def _SafeShutdownInstanceDisks(lu, instance):
2121
  """Shutdown block devices of an instance.
2122

2123
  This function checks if an instance is running, before calling
2124
  _ShutdownInstanceDisks.
2125

2126
  """
2127
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2128
                                      [instance.hypervisor])
2129
  ins_l = ins_l[instance.primary_node]
2130
  if not type(ins_l) is list:
2131
    raise errors.OpExecError("Can't contact node '%s'" %
2132
                             instance.primary_node)
2133

    
2134
  if instance.name in ins_l:
2135
    raise errors.OpExecError("Instance is running, can't shutdown"
2136
                             " block devices.")
2137

    
2138
  _ShutdownInstanceDisks(lu, instance)
2139

    
2140

    
2141
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2142
  """Shutdown block devices of an instance.
2143

2144
  This does the shutdown on all nodes of the instance.
2145

2146
  If the ignore_primary is false, errors on the primary node are
2147
  ignored.
2148

2149
  """
2150
  result = True
2151
  for disk in instance.disks:
2152
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2153
      lu.cfg.SetDiskID(top_disk, node)
2154
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2155
        logging.error("Could not shutdown block device %s on node %s",
2156
                      disk.iv_name, node)
2157
        if not ignore_primary or node != instance.primary_node:
2158
          result = False
2159
  return result
2160

    
2161

    
2162
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2163
  """Checks if a node has enough free memory.
2164

2165
  This function check if a given node has the needed amount of free
2166
  memory. In case the node has less memory or we cannot get the
2167
  information from the node, this function raise an OpPrereqError
2168
  exception.
2169

2170
  @type lu: C{LogicalUnit}
2171
  @param lu: a logical unit from which we get configuration data
2172
  @type node: C{str}
2173
  @param node: the node to check
2174
  @type reason: C{str}
2175
  @param reason: string to use in the error message
2176
  @type requested: C{int}
2177
  @param requested: the amount of memory in MiB to check for
2178
  @type hypervisor: C{str}
2179
  @param hypervisor: the hypervisor to ask for memory stats
2180
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2181
      we cannot check the node
2182

2183
  """
2184
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2185
  if not nodeinfo or not isinstance(nodeinfo, dict):
2186
    raise errors.OpPrereqError("Could not contact node %s for resource"
2187
                             " information" % (node,))
2188

    
2189
  free_mem = nodeinfo[node].get('memory_free')
2190
  if not isinstance(free_mem, int):
2191
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2192
                             " was '%s'" % (node, free_mem))
2193
  if requested > free_mem:
2194
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2195
                             " needed %s MiB, available %s MiB" %
2196
                             (node, reason, requested, free_mem))
2197

    
2198

    
2199
class LUStartupInstance(LogicalUnit):
2200
  """Starts an instance.
2201

2202
  """
2203
  HPATH = "instance-start"
2204
  HTYPE = constants.HTYPE_INSTANCE
2205
  _OP_REQP = ["instance_name", "force"]
2206
  REQ_BGL = False
2207

    
2208
  def ExpandNames(self):
2209
    self._ExpandAndLockInstance()
2210
    self.needed_locks[locking.LEVEL_NODE] = []
2211
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2212

    
2213
  def DeclareLocks(self, level):
2214
    if level == locking.LEVEL_NODE:
2215
      self._LockInstancesNodes()
2216

    
2217
  def BuildHooksEnv(self):
2218
    """Build hooks env.
2219

2220
    This runs on master, primary and secondary nodes of the instance.
2221

2222
    """
2223
    env = {
2224
      "FORCE": self.op.force,
2225
      }
2226
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2227
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2228
          list(self.instance.secondary_nodes))
2229
    return env, nl, nl
2230

    
2231
  def CheckPrereq(self):
2232
    """Check prerequisites.
2233

2234
    This checks that the instance is in the cluster.
2235

2236
    """
2237
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2238
    assert self.instance is not None, \
2239
      "Cannot retrieve locked instance %s" % self.op.instance_name
2240

    
2241
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2242
    # check bridges existance
2243
    _CheckInstanceBridgesExist(self, instance)
2244

    
2245
    _CheckNodeFreeMemory(self, instance.primary_node,
2246
                         "starting instance %s" % instance.name,
2247
                         bep[constants.BE_MEMORY], instance.hypervisor)
2248

    
2249
  def Exec(self, feedback_fn):
2250
    """Start the instance.
2251

2252
    """
2253
    instance = self.instance
2254
    force = self.op.force
2255
    extra_args = getattr(self.op, "extra_args", "")
2256

    
2257
    self.cfg.MarkInstanceUp(instance.name)
2258

    
2259
    node_current = instance.primary_node
2260

    
2261
    _StartInstanceDisks(self, instance, force)
2262

    
2263
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2264
      _ShutdownInstanceDisks(self, instance)
2265
      raise errors.OpExecError("Could not start instance")
2266

    
2267

    
2268
class LURebootInstance(LogicalUnit):
2269
  """Reboot an instance.
2270

2271
  """
2272
  HPATH = "instance-reboot"
2273
  HTYPE = constants.HTYPE_INSTANCE
2274
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2275
  REQ_BGL = False
2276

    
2277
  def ExpandNames(self):
2278
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2279
                                   constants.INSTANCE_REBOOT_HARD,
2280
                                   constants.INSTANCE_REBOOT_FULL]:
2281
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2282
                                  (constants.INSTANCE_REBOOT_SOFT,
2283
                                   constants.INSTANCE_REBOOT_HARD,
2284
                                   constants.INSTANCE_REBOOT_FULL))
2285
    self._ExpandAndLockInstance()
2286
    self.needed_locks[locking.LEVEL_NODE] = []
2287
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2288

    
2289
  def DeclareLocks(self, level):
2290
    if level == locking.LEVEL_NODE:
2291
      primary_only = not constants.INSTANCE_REBOOT_FULL
2292
      self._LockInstancesNodes(primary_only=primary_only)
2293

    
2294
  def BuildHooksEnv(self):
2295
    """Build hooks env.
2296

2297
    This runs on master, primary and secondary nodes of the instance.
2298

2299
    """
2300
    env = {
2301
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2302
      }
2303
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2304
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2305
          list(self.instance.secondary_nodes))
2306
    return env, nl, nl
2307

    
2308
  def CheckPrereq(self):
2309
    """Check prerequisites.
2310

2311
    This checks that the instance is in the cluster.
2312

2313
    """
2314
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2315
    assert self.instance is not None, \
2316
      "Cannot retrieve locked instance %s" % self.op.instance_name
2317

    
2318
    # check bridges existance
2319
    _CheckInstanceBridgesExist(self, instance)
2320

    
2321
  def Exec(self, feedback_fn):
2322
    """Reboot the instance.
2323

2324
    """
2325
    instance = self.instance
2326
    ignore_secondaries = self.op.ignore_secondaries
2327
    reboot_type = self.op.reboot_type
2328
    extra_args = getattr(self.op, "extra_args", "")
2329

    
2330
    node_current = instance.primary_node
2331

    
2332
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2333
                       constants.INSTANCE_REBOOT_HARD]:
2334
      if not self.rpc.call_instance_reboot(node_current, instance,
2335
                                           reboot_type, extra_args):
2336
        raise errors.OpExecError("Could not reboot instance")
2337
    else:
2338
      if not self.rpc.call_instance_shutdown(node_current, instance):
2339
        raise errors.OpExecError("could not shutdown instance for full reboot")
2340
      _ShutdownInstanceDisks(self, instance)
2341
      _StartInstanceDisks(self, instance, ignore_secondaries)
2342
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2343
        _ShutdownInstanceDisks(self, instance)
2344
        raise errors.OpExecError("Could not start instance for full reboot")
2345

    
2346
    self.cfg.MarkInstanceUp(instance.name)
2347

    
2348

    
2349
class LUShutdownInstance(LogicalUnit):
2350
  """Shutdown an instance.
2351

2352
  """
2353
  HPATH = "instance-stop"
2354
  HTYPE = constants.HTYPE_INSTANCE
2355
  _OP_REQP = ["instance_name"]
2356
  REQ_BGL = False
2357

    
2358
  def ExpandNames(self):
2359
    self._ExpandAndLockInstance()
2360
    self.needed_locks[locking.LEVEL_NODE] = []
2361
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2362

    
2363
  def DeclareLocks(self, level):
2364
    if level == locking.LEVEL_NODE:
2365
      self._LockInstancesNodes()
2366

    
2367
  def BuildHooksEnv(self):
2368
    """Build hooks env.
2369

2370
    This runs on master, primary and secondary nodes of the instance.
2371

2372
    """
2373
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2374
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2375
          list(self.instance.secondary_nodes))
2376
    return env, nl, nl
2377

    
2378
  def CheckPrereq(self):
2379
    """Check prerequisites.
2380

2381
    This checks that the instance is in the cluster.
2382

2383
    """
2384
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2385
    assert self.instance is not None, \
2386
      "Cannot retrieve locked instance %s" % self.op.instance_name
2387

    
2388
  def Exec(self, feedback_fn):
2389
    """Shutdown the instance.
2390

2391
    """
2392
    instance = self.instance
2393
    node_current = instance.primary_node
2394
    self.cfg.MarkInstanceDown(instance.name)
2395
    if not self.rpc.call_instance_shutdown(node_current, instance):
2396
      self.proc.LogWarning("Could not shutdown instance")
2397

    
2398
    _ShutdownInstanceDisks(self, instance)
2399

    
2400

    
2401
class LUReinstallInstance(LogicalUnit):
2402
  """Reinstall an instance.
2403

2404
  """
2405
  HPATH = "instance-reinstall"
2406
  HTYPE = constants.HTYPE_INSTANCE
2407
  _OP_REQP = ["instance_name"]
2408
  REQ_BGL = False
2409

    
2410
  def ExpandNames(self):
2411
    self._ExpandAndLockInstance()
2412
    self.needed_locks[locking.LEVEL_NODE] = []
2413
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2414

    
2415
  def DeclareLocks(self, level):
2416
    if level == locking.LEVEL_NODE:
2417
      self._LockInstancesNodes()
2418

    
2419
  def BuildHooksEnv(self):
2420
    """Build hooks env.
2421

2422
    This runs on master, primary and secondary nodes of the instance.
2423

2424
    """
2425
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2426
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2427
          list(self.instance.secondary_nodes))
2428
    return env, nl, nl
2429

    
2430
  def CheckPrereq(self):
2431
    """Check prerequisites.
2432

2433
    This checks that the instance is in the cluster and is not running.
2434

2435
    """
2436
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2437
    assert instance is not None, \
2438
      "Cannot retrieve locked instance %s" % self.op.instance_name
2439

    
2440
    if instance.disk_template == constants.DT_DISKLESS:
2441
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2442
                                 self.op.instance_name)
2443
    if instance.status != "down":
2444
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2445
                                 self.op.instance_name)
2446
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2447
                                              instance.name,
2448
                                              instance.hypervisor)
2449
    if remote_info:
2450
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2451
                                 (self.op.instance_name,
2452
                                  instance.primary_node))
2453

    
2454
    self.op.os_type = getattr(self.op, "os_type", None)
2455
    if self.op.os_type is not None:
2456
      # OS verification
2457
      pnode = self.cfg.GetNodeInfo(
2458
        self.cfg.ExpandNodeName(instance.primary_node))
2459
      if pnode is None:
2460
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2461
                                   self.op.pnode)
2462
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2463
      if not os_obj:
2464
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2465
                                   " primary node"  % self.op.os_type)
2466

    
2467
    self.instance = instance
2468

    
2469
  def Exec(self, feedback_fn):
2470
    """Reinstall the instance.
2471

2472
    """
2473
    inst = self.instance
2474

    
2475
    if self.op.os_type is not None:
2476
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2477
      inst.os = self.op.os_type
2478
      self.cfg.Update(inst)
2479

    
2480
    _StartInstanceDisks(self, inst, None)
2481
    try:
2482
      feedback_fn("Running the instance OS create scripts...")
2483
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2484
                                           "sda", "sdb"):
2485
        raise errors.OpExecError("Could not install OS for instance %s"
2486
                                 " on node %s" %
2487
                                 (inst.name, inst.primary_node))
2488
    finally:
2489
      _ShutdownInstanceDisks(self, inst)
2490

    
2491

    
2492
class LURenameInstance(LogicalUnit):
2493
  """Rename an instance.
2494

2495
  """
2496
  HPATH = "instance-rename"
2497
  HTYPE = constants.HTYPE_INSTANCE
2498
  _OP_REQP = ["instance_name", "new_name"]
2499

    
2500
  def BuildHooksEnv(self):
2501
    """Build hooks env.
2502

2503
    This runs on master, primary and secondary nodes of the instance.
2504

2505
    """
2506
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2507
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2508
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2509
          list(self.instance.secondary_nodes))
2510
    return env, nl, nl
2511

    
2512
  def CheckPrereq(self):
2513
    """Check prerequisites.
2514

2515
    This checks that the instance is in the cluster and is not running.
2516

2517
    """
2518
    instance = self.cfg.GetInstanceInfo(
2519
      self.cfg.ExpandInstanceName(self.op.instance_name))
2520
    if instance is None:
2521
      raise errors.OpPrereqError("Instance '%s' not known" %
2522
                                 self.op.instance_name)
2523
    if instance.status != "down":
2524
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2525
                                 self.op.instance_name)
2526
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2527
                                              instance.name,
2528
                                              instance.hypervisor)
2529
    if remote_info:
2530
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2531
                                 (self.op.instance_name,
2532
                                  instance.primary_node))
2533
    self.instance = instance
2534

    
2535
    # new name verification
2536
    name_info = utils.HostInfo(self.op.new_name)
2537

    
2538
    self.op.new_name = new_name = name_info.name
2539
    instance_list = self.cfg.GetInstanceList()
2540
    if new_name in instance_list:
2541
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2542
                                 new_name)
2543

    
2544
    if not getattr(self.op, "ignore_ip", False):
2545
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2546
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2547
                                   (name_info.ip, new_name))
2548

    
2549

    
2550
  def Exec(self, feedback_fn):
2551
    """Reinstall the instance.
2552

2553
    """
2554
    inst = self.instance
2555
    old_name = inst.name
2556

    
2557
    if inst.disk_template == constants.DT_FILE:
2558
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2559

    
2560
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2561
    # Change the instance lock. This is definitely safe while we hold the BGL
2562
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2563
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2564

    
2565
    # re-read the instance from the configuration after rename
2566
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2567

    
2568
    if inst.disk_template == constants.DT_FILE:
2569
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2570
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2571
                                                     old_file_storage_dir,
2572
                                                     new_file_storage_dir)
2573

    
2574
      if not result:
2575
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2576
                                 " directory '%s' to '%s' (but the instance"
2577
                                 " has been renamed in Ganeti)" % (
2578
                                 inst.primary_node, old_file_storage_dir,
2579
                                 new_file_storage_dir))
2580

    
2581
      if not result[0]:
2582
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2583
                                 " (but the instance has been renamed in"
2584
                                 " Ganeti)" % (old_file_storage_dir,
2585
                                               new_file_storage_dir))
2586

    
2587
    _StartInstanceDisks(self, inst, None)
2588
    try:
2589
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2590
                                               old_name):
2591
        msg = ("Could not run OS rename script for instance %s on node %s"
2592
               " (but the instance has been renamed in Ganeti)" %
2593
               (inst.name, inst.primary_node))
2594
        self.proc.LogWarning(msg)
2595
    finally:
2596
      _ShutdownInstanceDisks(self, inst)
2597

    
2598

    
2599
class LURemoveInstance(LogicalUnit):
2600
  """Remove an instance.
2601

2602
  """
2603
  HPATH = "instance-remove"
2604
  HTYPE = constants.HTYPE_INSTANCE
2605
  _OP_REQP = ["instance_name", "ignore_failures"]
2606
  REQ_BGL = False
2607

    
2608
  def ExpandNames(self):
2609
    self._ExpandAndLockInstance()
2610
    self.needed_locks[locking.LEVEL_NODE] = []
2611
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2612

    
2613
  def DeclareLocks(self, level):
2614
    if level == locking.LEVEL_NODE:
2615
      self._LockInstancesNodes()
2616

    
2617
  def BuildHooksEnv(self):
2618
    """Build hooks env.
2619

2620
    This runs on master, primary and secondary nodes of the instance.
2621

2622
    """
2623
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2624
    nl = [self.cfg.GetMasterNode()]
2625
    return env, nl, nl
2626

    
2627
  def CheckPrereq(self):
2628
    """Check prerequisites.
2629

2630
    This checks that the instance is in the cluster.
2631

2632
    """
2633
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2634
    assert self.instance is not None, \
2635
      "Cannot retrieve locked instance %s" % self.op.instance_name
2636

    
2637
  def Exec(self, feedback_fn):
2638
    """Remove the instance.
2639

2640
    """
2641
    instance = self.instance
2642
    logging.info("Shutting down instance %s on node %s",
2643
                 instance.name, instance.primary_node)
2644

    
2645
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2646
      if self.op.ignore_failures:
2647
        feedback_fn("Warning: can't shutdown instance")
2648
      else:
2649
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2650
                                 (instance.name, instance.primary_node))
2651

    
2652
    logging.info("Removing block devices for instance %s", instance.name)
2653

    
2654
    if not _RemoveDisks(self, instance):
2655
      if self.op.ignore_failures:
2656
        feedback_fn("Warning: can't remove instance's disks")
2657
      else:
2658
        raise errors.OpExecError("Can't remove instance's disks")
2659

    
2660
    logging.info("Removing instance %s out of cluster config", instance.name)
2661

    
2662
    self.cfg.RemoveInstance(instance.name)
2663
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2664

    
2665

    
2666
class LUQueryInstances(NoHooksLU):
2667
  """Logical unit for querying instances.
2668

2669
  """
2670
  _OP_REQP = ["output_fields", "names"]
2671
  REQ_BGL = False
2672

    
2673
  def ExpandNames(self):
2674
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2675
    hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2676
    bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2677
    self.static_fields = frozenset([
2678
      "name", "os", "pnode", "snodes",
2679
      "admin_state", "admin_ram",
2680
      "disk_template", "ip", "mac", "bridge",
2681
      "sda_size", "sdb_size", "vcpus", "tags",
2682
      "network_port",
2683
      "serial_no", "hypervisor", "hvparams",
2684
      ] + hvp + bep)
2685

    
2686
    _CheckOutputFields(static=self.static_fields,
2687
                       dynamic=self.dynamic_fields,
2688
                       selected=self.op.output_fields)
2689

    
2690
    self.needed_locks = {}
2691
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2692
    self.share_locks[locking.LEVEL_NODE] = 1
2693

    
2694
    if self.op.names:
2695
      self.wanted = _GetWantedInstances(self, self.op.names)
2696
    else:
2697
      self.wanted = locking.ALL_SET
2698

    
2699
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2700
    if self.do_locking:
2701
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2702
      self.needed_locks[locking.LEVEL_NODE] = []
2703
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2704

    
2705
  def DeclareLocks(self, level):
2706
    if level == locking.LEVEL_NODE and self.do_locking:
2707
      self._LockInstancesNodes()
2708

    
2709
  def CheckPrereq(self):
2710
    """Check prerequisites.
2711

2712
    """
2713
    pass
2714

    
2715
  def Exec(self, feedback_fn):
2716
    """Computes the list of nodes and their attributes.
2717

2718
    """
2719
    all_info = self.cfg.GetAllInstancesInfo()
2720
    if self.do_locking:
2721
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2722
    elif self.wanted != locking.ALL_SET:
2723
      instance_names = self.wanted
2724
      missing = set(instance_names).difference(all_info.keys())
2725
      if missing:
2726
        raise errors.OpExecError(
2727
          "Some instances were removed before retrieving their data: %s"
2728
          % missing)
2729
    else:
2730
      instance_names = all_info.keys()
2731

    
2732
    instance_names = utils.NiceSort(instance_names)
2733
    instance_list = [all_info[iname] for iname in instance_names]
2734

    
2735
    # begin data gathering
2736

    
2737
    nodes = frozenset([inst.primary_node for inst in instance_list])
2738
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2739

    
2740
    bad_nodes = []
2741
    if self.dynamic_fields.intersection(self.op.output_fields):
2742
      live_data = {}
2743
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2744
      for name in nodes:
2745
        result = node_data[name]
2746
        if result:
2747
          live_data.update(result)
2748
        elif result == False:
2749
          bad_nodes.append(name)
2750
        # else no instance is alive
2751
    else:
2752
      live_data = dict([(name, {}) for name in instance_names])
2753

    
2754
    # end data gathering
2755

    
2756
    HVPREFIX = "hv/"
2757
    BEPREFIX = "be/"
2758
    output = []
2759
    for instance in instance_list:
2760
      iout = []
2761
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2762
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2763
      for field in self.op.output_fields:
2764
        if field == "name":
2765
          val = instance.name
2766
        elif field == "os":
2767
          val = instance.os
2768
        elif field == "pnode":
2769
          val = instance.primary_node
2770
        elif field == "snodes":
2771
          val = list(instance.secondary_nodes)
2772
        elif field == "admin_state":
2773
          val = (instance.status != "down")
2774
        elif field == "oper_state":
2775
          if instance.primary_node in bad_nodes:
2776
            val = None
2777
          else:
2778
            val = bool(live_data.get(instance.name))
2779
        elif field == "status":
2780
          if instance.primary_node in bad_nodes:
2781
            val = "ERROR_nodedown"
2782
          else:
2783
            running = bool(live_data.get(instance.name))
2784
            if running:
2785
              if instance.status != "down":
2786
                val = "running"
2787
              else:
2788
                val = "ERROR_up"
2789
            else:
2790
              if instance.status != "down":
2791
                val = "ERROR_down"
2792
              else:
2793
                val = "ADMIN_down"
2794
        elif field == "oper_ram":
2795
          if instance.primary_node in bad_nodes:
2796
            val = None
2797
          elif instance.name in live_data:
2798
            val = live_data[instance.name].get("memory", "?")
2799
          else:
2800
            val = "-"
2801
        elif field == "disk_template":
2802
          val = instance.disk_template
2803
        elif field == "ip":
2804
          val = instance.nics[0].ip
2805
        elif field == "bridge":
2806
          val = instance.nics[0].bridge
2807
        elif field == "mac":
2808
          val = instance.nics[0].mac
2809
        elif field == "sda_size" or field == "sdb_size":
2810
          disk = instance.FindDisk(field[:3])
2811
          if disk is None:
2812
            val = None
2813
          else:
2814
            val = disk.size
2815
        elif field == "tags":
2816
          val = list(instance.GetTags())
2817
        elif field == "serial_no":
2818
          val = instance.serial_no
2819
        elif field == "network_port":
2820
          val = instance.network_port
2821
        elif field == "hypervisor":
2822
          val = instance.hypervisor
2823
        elif field == "hvparams":
2824
          val = i_hv
2825
        elif (field.startswith(HVPREFIX) and
2826
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2827
          val = i_hv.get(field[len(HVPREFIX):], None)
2828
        elif field == "beparams":
2829
          val = i_be
2830
        elif (field.startswith(BEPREFIX) and
2831
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2832
          val = i_be.get(field[len(BEPREFIX):], None)
2833
        else:
2834
          raise errors.ParameterError(field)
2835
        iout.append(val)
2836
      output.append(iout)
2837

    
2838
    return output
2839

    
2840

    
2841
class LUFailoverInstance(LogicalUnit):
2842
  """Failover an instance.
2843

2844
  """
2845
  HPATH = "instance-failover"
2846
  HTYPE = constants.HTYPE_INSTANCE
2847
  _OP_REQP = ["instance_name", "ignore_consistency"]
2848
  REQ_BGL = False
2849

    
2850
  def ExpandNames(self):
2851
    self._ExpandAndLockInstance()
2852
    self.needed_locks[locking.LEVEL_NODE] = []
2853
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2854

    
2855
  def DeclareLocks(self, level):
2856
    if level == locking.LEVEL_NODE:
2857
      self._LockInstancesNodes()
2858

    
2859
  def BuildHooksEnv(self):
2860
    """Build hooks env.
2861

2862
    This runs on master, primary and secondary nodes of the instance.
2863

2864
    """
2865
    env = {
2866
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2867
      }
2868
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2869
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2870
    return env, nl, nl
2871

    
2872
  def CheckPrereq(self):
2873
    """Check prerequisites.
2874

2875
    This checks that the instance is in the cluster.
2876

2877
    """
2878
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2879
    assert self.instance is not None, \
2880
      "Cannot retrieve locked instance %s" % self.op.instance_name
2881

    
2882
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2883
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2884
      raise errors.OpPrereqError("Instance's disk layout is not"
2885
                                 " network mirrored, cannot failover.")
2886

    
2887
    secondary_nodes = instance.secondary_nodes
2888
    if not secondary_nodes:
2889
      raise errors.ProgrammerError("no secondary node but using "
2890
                                   "a mirrored disk template")
2891

    
2892
    target_node = secondary_nodes[0]
2893
    # check memory requirements on the secondary node
2894
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2895
                         instance.name, bep[constants.BE_MEMORY],
2896
                         instance.hypervisor)
2897

    
2898
    # check bridge existance
2899
    brlist = [nic.bridge for nic in instance.nics]
2900
    if not self.rpc.call_bridges_exist(target_node, brlist):
2901
      raise errors.OpPrereqError("One or more target bridges %s does not"
2902
                                 " exist on destination node '%s'" %
2903
                                 (brlist, target_node))
2904

    
2905
  def Exec(self, feedback_fn):
2906
    """Failover an instance.
2907

2908
    The failover is done by shutting it down on its present node and
2909
    starting it on the secondary.
2910

2911
    """
2912
    instance = self.instance
2913

    
2914
    source_node = instance.primary_node
2915
    target_node = instance.secondary_nodes[0]
2916

    
2917
    feedback_fn("* checking disk consistency between source and target")
2918
    for dev in instance.disks:
2919
      # for drbd, these are drbd over lvm
2920
      if not _CheckDiskConsistency(self, dev, target_node, False):
2921
        if instance.status == "up" and not self.op.ignore_consistency:
2922
          raise errors.OpExecError("Disk %s is degraded on target node,"
2923
                                   " aborting failover." % dev.iv_name)
2924

    
2925
    feedback_fn("* shutting down instance on source node")
2926
    logging.info("Shutting down instance %s on node %s",
2927
                 instance.name, source_node)
2928

    
2929
    if not self.rpc.call_instance_shutdown(source_node, instance):
2930
      if self.op.ignore_consistency:
2931
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
2932
                             " Proceeding"
2933
                             " anyway. Please make sure node %s is down",
2934
                             instance.name, source_node, source_node)
2935
      else:
2936
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2937
                                 (instance.name, source_node))
2938

    
2939
    feedback_fn("* deactivating the instance's disks on source node")
2940
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2941
      raise errors.OpExecError("Can't shut down the instance's disks.")
2942

    
2943
    instance.primary_node = target_node
2944
    # distribute new instance config to the other nodes
2945
    self.cfg.Update(instance)
2946

    
2947
    # Only start the instance if it's marked as up
2948
    if instance.status == "up":
2949
      feedback_fn("* activating the instance's disks on target node")
2950
      logging.info("Starting instance %s on node %s",
2951
                   instance.name, target_node)
2952

    
2953
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2954
                                               ignore_secondaries=True)
2955
      if not disks_ok:
2956
        _ShutdownInstanceDisks(self, instance)
2957
        raise errors.OpExecError("Can't activate the instance's disks")
2958

    
2959
      feedback_fn("* starting the instance on the target node")
2960
      if not self.rpc.call_instance_start(target_node, instance, None):
2961
        _ShutdownInstanceDisks(self, instance)
2962
        raise errors.OpExecError("Could not start instance %s on node %s." %
2963
                                 (instance.name, target_node))
2964

    
2965

    
2966
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2967
  """Create a tree of block devices on the primary node.
2968

2969
  This always creates all devices.
2970

2971
  """
2972
  if device.children:
2973
    for child in device.children:
2974
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2975
        return False
2976

    
2977
  lu.cfg.SetDiskID(device, node)
2978
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2979
                                       instance.name, True, info)
2980
  if not new_id:
2981
    return False
2982
  if device.physical_id is None:
2983
    device.physical_id = new_id
2984
  return True
2985

    
2986

    
2987
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2988
  """Create a tree of block devices on a secondary node.
2989

2990
  If this device type has to be created on secondaries, create it and
2991
  all its children.
2992

2993
  If not, just recurse to children keeping the same 'force' value.
2994

2995
  """
2996
  if device.CreateOnSecondary():
2997
    force = True
2998
  if device.children:
2999
    for child in device.children:
3000
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3001
                                        child, force, info):
3002
        return False
3003

    
3004
  if not force:
3005
    return True
3006
  lu.cfg.SetDiskID(device, node)
3007
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3008
                                       instance.name, False, info)
3009
  if not new_id:
3010
    return False
3011
  if device.physical_id is None:
3012
    device.physical_id = new_id
3013
  return True
3014

    
3015

    
3016
def _GenerateUniqueNames(lu, exts):
3017
  """Generate a suitable LV name.
3018

3019
  This will generate a logical volume name for the given instance.
3020

3021
  """
3022
  results = []
3023
  for val in exts:
3024
    new_id = lu.cfg.GenerateUniqueID()
3025
    results.append("%s%s" % (new_id, val))
3026
  return results
3027

    
3028

    
3029
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3030
                         p_minor, s_minor):
3031
  """Generate a drbd8 device complete with its children.
3032

3033
  """
3034
  port = lu.cfg.AllocatePort()
3035
  vgname = lu.cfg.GetVGName()
3036
  shared_secret = lu.cfg.GenerateDRBDSecret()
3037
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3038
                          logical_id=(vgname, names[0]))
3039
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3040
                          logical_id=(vgname, names[1]))
3041
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3042
                          logical_id=(primary, secondary, port,
3043
                                      p_minor, s_minor,
3044
                                      shared_secret),
3045
                          children=[dev_data, dev_meta],
3046
                          iv_name=iv_name)
3047
  return drbd_dev
3048

    
3049

    
3050
def _GenerateDiskTemplate(lu, template_name,
3051
                          instance_name, primary_node,
3052
                          secondary_nodes, disk_sz, swap_sz,
3053
                          file_storage_dir, file_driver):
3054
  """Generate the entire disk layout for a given template type.
3055

3056
  """
3057
  #TODO: compute space requirements
3058

    
3059
  vgname = lu.cfg.GetVGName()
3060
  if template_name == constants.DT_DISKLESS:
3061
    disks = []
3062
  elif template_name == constants.DT_PLAIN:
3063
    if len(secondary_nodes) != 0:
3064
      raise errors.ProgrammerError("Wrong template configuration")
3065

    
3066
    names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3067
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3068
                           logical_id=(vgname, names[0]),
3069
                           iv_name = "sda")
3070
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3071
                           logical_id=(vgname, names[1]),
3072
                           iv_name = "sdb")
3073
    disks = [sda_dev, sdb_dev]
3074
  elif template_name == constants.DT_DRBD8:
3075
    if len(secondary_nodes) != 1:
3076
      raise errors.ProgrammerError("Wrong template configuration")
3077
    remote_node = secondary_nodes[0]
3078
    (minor_pa, minor_pb,
3079
     minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3080
      [primary_node, primary_node, remote_node, remote_node], instance_name)
3081

    
3082
    names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3083
                                      ".sdb_data", ".sdb_meta"])
3084
    drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3085
                                        disk_sz, names[0:2], "sda",
3086
                                        minor_pa, minor_sa)
3087
    drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3088
                                        swap_sz, names[2:4], "sdb",
3089
                                        minor_pb, minor_sb)
3090
    disks = [drbd_sda_dev, drbd_sdb_dev]
3091
  elif template_name == constants.DT_FILE:
3092
    if len(secondary_nodes) != 0:
3093
      raise errors.ProgrammerError("Wrong template configuration")
3094

    
3095
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3096
                                iv_name="sda", logical_id=(file_driver,
3097
                                "%s/sda" % file_storage_dir))
3098
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3099
                                iv_name="sdb", logical_id=(file_driver,
3100
                                "%s/sdb" % file_storage_dir))
3101
    disks = [file_sda_dev, file_sdb_dev]
3102
  else:
3103
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3104
  return disks
3105

    
3106

    
3107
def _GetInstanceInfoText(instance):
3108
  """Compute that text that should be added to the disk's metadata.
3109

3110
  """
3111
  return "originstname+%s" % instance.name
3112

    
3113

    
3114
def _CreateDisks(lu, instance):
3115
  """Create all disks for an instance.
3116

3117
  This abstracts away some work from AddInstance.
3118

3119
  Args:
3120
    instance: the instance object
3121

3122
  Returns:
3123
    True or False showing the success of the creation process
3124

3125
  """
3126
  info = _GetInstanceInfoText(instance)
3127

    
3128
  if instance.disk_template == constants.DT_FILE:
3129
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3130
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3131
                                                 file_storage_dir)
3132

    
3133
    if not result:
3134
      logging.error("Could not connect to node '%s'", instance.primary_node)
3135
      return False
3136

    
3137
    if not result[0]:
3138
      logging.error("Failed to create directory '%s'", file_storage_dir)
3139
      return False
3140

    
3141
  for device in instance.disks:
3142
    logging.info("Creating volume %s for instance %s",
3143
                 device.iv_name, instance.name)
3144
    #HARDCODE
3145
    for secondary_node in instance.secondary_nodes:
3146
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3147
                                        device, False, info):
3148
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3149
                      device.iv_name, device, secondary_node)
3150
        return False
3151
    #HARDCODE
3152
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3153
                                    instance, device, info):
3154
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3155
      return False
3156

    
3157
  return True
3158

    
3159

    
3160
def _RemoveDisks(lu, instance):
3161
  """Remove all disks for an instance.
3162

3163
  This abstracts away some work from `AddInstance()` and
3164
  `RemoveInstance()`. Note that in case some of the devices couldn't
3165
  be removed, the removal will continue with the other ones (compare
3166
  with `_CreateDisks()`).
3167

3168
  Args:
3169
    instance: the instance object
3170

3171
  Returns:
3172
    True or False showing the success of the removal proces
3173

3174
  """
3175
  logging.info("Removing block devices for instance %s", instance.name)
3176

    
3177
  result = True
3178
  for device in instance.disks:
3179
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3180
      lu.cfg.SetDiskID(disk, node)
3181
      if not lu.rpc.call_blockdev_remove(node, disk):
3182
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3183
                           " continuing anyway", device.iv_name, node)
3184
        result = False
3185

    
3186
  if instance.disk_template == constants.DT_FILE:
3187
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3188
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3189
                                               file_storage_dir):
3190
      logging.error("Could not remove directory '%s'", file_storage_dir)
3191
      result = False
3192

    
3193
  return result
3194

    
3195

    
3196
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3197
  """Compute disk size requirements in the volume group
3198

3199
  This is currently hard-coded for the two-drive layout.
3200

3201
  """
3202
  # Required free disk space as a function of disk and swap space
3203
  req_size_dict = {
3204
    constants.DT_DISKLESS: None,
3205
    constants.DT_PLAIN: disk_size + swap_size,
3206
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3207
    constants.DT_DRBD8: disk_size + swap_size + 256,
3208
    constants.DT_FILE: None,
3209
  }
3210

    
3211
  if disk_template not in req_size_dict:
3212
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3213
                                 " is unknown" %  disk_template)
3214

    
3215
  return req_size_dict[disk_template]
3216

    
3217

    
3218
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3219
  """Hypervisor parameter validation.
3220

3221
  This function abstract the hypervisor parameter validation to be
3222
  used in both instance create and instance modify.
3223

3224
  @type lu: L{LogicalUnit}
3225
  @param lu: the logical unit for which we check
3226
  @type nodenames: list
3227
  @param nodenames: the list of nodes on which we should check
3228
  @type hvname: string
3229
  @param hvname: the name of the hypervisor we should use
3230
  @type hvparams: dict
3231
  @param hvparams: the parameters which we need to check
3232
  @raise errors.OpPrereqError: if the parameters are not valid
3233

3234
  """
3235
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3236
                                                  hvname,
3237
                                                  hvparams)
3238
  for node in nodenames:
3239
    info = hvinfo.get(node, None)
3240
    if not info or not isinstance(info, (tuple, list)):
3241
      raise errors.OpPrereqError("Cannot get current information"
3242
                                 " from node '%s' (%s)" % (node, info))
3243
    if not info[0]:
3244
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3245
                                 " %s" % info[1])
3246

    
3247

    
3248
class LUCreateInstance(LogicalUnit):
3249
  """Create an instance.
3250

3251
  """
3252
  HPATH = "instance-add"
3253
  HTYPE = constants.HTYPE_INSTANCE
3254
  _OP_REQP = ["instance_name", "disk_size",
3255
              "disk_template", "swap_size", "mode", "start",
3256
              "wait_for_sync", "ip_check", "mac",
3257
              "hvparams", "beparams"]
3258
  REQ_BGL = False
3259

    
3260
  def _ExpandNode(self, node):
3261
    """Expands and checks one node name.
3262

3263
    """
3264
    node_full = self.cfg.ExpandNodeName(node)
3265
    if node_full is None:
3266
      raise errors.OpPrereqError("Unknown node %s" % node)
3267
    return node_full
3268

    
3269
  def ExpandNames(self):
3270
    """ExpandNames for CreateInstance.
3271

3272
    Figure out the right locks for instance creation.
3273

3274
    """
3275
    self.needed_locks = {}
3276

    
3277
    # set optional parameters to none if they don't exist
3278
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3279
      if not hasattr(self.op, attr):
3280
        setattr(self.op, attr, None)
3281

    
3282
    # cheap checks, mostly valid constants given
3283

    
3284
    # verify creation mode
3285
    if self.op.mode not in (constants.INSTANCE_CREATE,
3286
                            constants.INSTANCE_IMPORT):
3287
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3288
                                 self.op.mode)
3289

    
3290
    # disk template and mirror node verification
3291
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3292
      raise errors.OpPrereqError("Invalid disk template name")
3293

    
3294
    if self.op.hypervisor is None:
3295
      self.op.hypervisor = self.cfg.GetHypervisorType()
3296

    
3297
    cluster = self.cfg.GetClusterInfo()
3298
    enabled_hvs = cluster.enabled_hypervisors
3299
    if self.op.hypervisor not in enabled_hvs:
3300
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3301
                                 " cluster (%s)" % (self.op.hypervisor,
3302
                                  ",".join(enabled_hvs)))
3303

    
3304
    # check hypervisor parameter syntax (locally)
3305

    
3306
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3307
                                  self.op.hvparams)
3308
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3309
    hv_type.CheckParameterSyntax(filled_hvp)
3310

    
3311
    # fill and remember the beparams dict
3312
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3313
                                    self.op.beparams)
3314

    
3315
    #### instance parameters check
3316

    
3317
    # instance name verification
3318
    hostname1 = utils.HostInfo(self.op.instance_name)
3319
    self.op.instance_name = instance_name = hostname1.name
3320

    
3321
    # this is just a preventive check, but someone might still add this
3322
    # instance in the meantime, and creation will fail at lock-add time
3323
    if instance_name in self.cfg.GetInstanceList():
3324
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3325
                                 instance_name)
3326

    
3327
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3328

    
3329
    # ip validity checks
3330
    ip = getattr(self.op, "ip", None)
3331
    if ip is None or ip.lower() == "none":
3332
      inst_ip = None
3333
    elif ip.lower() == constants.VALUE_AUTO:
3334
      inst_ip = hostname1.ip
3335
    else:
3336
      if not utils.IsValidIP(ip):
3337
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3338
                                   " like a valid IP" % ip)
3339
      inst_ip = ip
3340
    self.inst_ip = self.op.ip = inst_ip
3341
    # used in CheckPrereq for ip ping check
3342
    self.check_ip = hostname1.ip
3343

    
3344
    # MAC address verification
3345
    if self.op.mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3346
      if not utils.IsValidMac(self.op.mac.lower()):
3347
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3348
                                   self.op.mac)
3349

    
3350
    # file storage checks
3351
    if (self.op.file_driver and
3352
        not self.op.file_driver in constants.FILE_DRIVER):
3353
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3354
                                 self.op.file_driver)
3355

    
3356
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3357
      raise errors.OpPrereqError("File storage directory path not absolute")
3358

    
3359
    ### Node/iallocator related checks
3360
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3361
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3362
                                 " node must be given")
3363

    
3364
    if self.op.iallocator:
3365
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3366
    else:
3367
      self.op.pnode = self._ExpandNode(self.op.pnode)
3368
      nodelist = [self.op.pnode]
3369
      if self.op.snode is not None:
3370
        self.op.snode = self._ExpandNode(self.op.snode)
3371
        nodelist.append(self.op.snode)
3372
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3373

    
3374
    # in case of import lock the source node too
3375
    if self.op.mode == constants.INSTANCE_IMPORT:
3376
      src_node = getattr(self.op, "src_node", None)
3377
      src_path = getattr(self.op, "src_path", None)
3378

    
3379
      if src_node is None or src_path is None:
3380
        raise errors.OpPrereqError("Importing an instance requires source"
3381
                                   " node and path options")
3382

    
3383
      if not os.path.isabs(src_path):
3384
        raise errors.OpPrereqError("The source path must be absolute")
3385

    
3386
      self.op.src_node = src_node = self._ExpandNode(src_node)
3387
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3388
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3389

    
3390
    else: # INSTANCE_CREATE
3391
      if getattr(self.op, "os_type", None) is None:
3392
        raise errors.OpPrereqError("No guest OS specified")
3393

    
3394
  def _RunAllocator(self):
3395
    """Run the allocator based on input opcode.
3396

3397
    """
3398
    disks = [{"size": self.op.disk_size, "mode": "w"},
3399
             {"size": self.op.swap_size, "mode": "w"}]
3400
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3401
             "bridge": self.op.bridge}]
3402
    ial = IAllocator(self,
3403
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3404
                     name=self.op.instance_name,
3405
                     disk_template=self.op.disk_template,
3406
                     tags=[],
3407
                     os=self.op.os_type,
3408
                     vcpus=self.be_full[constants.BE_VCPUS],
3409
                     mem_size=self.be_full[constants.BE_MEMORY],
3410
                     disks=disks,
3411
                     nics=nics,
3412
                     )
3413

    
3414
    ial.Run(self.op.iallocator)
3415

    
3416
    if not ial.success:
3417
      raise errors.OpPrereqError("Can't compute nodes using"
3418
                                 " iallocator '%s': %s" % (self.op.iallocator,
3419
                                                           ial.info))
3420
    if len(ial.nodes) != ial.required_nodes:
3421
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3422
                                 " of nodes (%s), required %s" %
3423
                                 (self.op.iallocator, len(ial.nodes),
3424
                                  ial.required_nodes))
3425
    self.op.pnode = ial.nodes[0]
3426
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3427
                 self.op.instance_name, self.op.iallocator,
3428
                 ", ".join(ial.nodes))
3429
    if ial.required_nodes == 2:
3430
      self.op.snode = ial.nodes[1]
3431

    
3432
  def BuildHooksEnv(self):
3433
    """Build hooks env.
3434

3435
    This runs on master, primary and secondary nodes of the instance.
3436

3437
    """
3438
    env = {
3439
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3440
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3441
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3442
      "INSTANCE_ADD_MODE": self.op.mode,
3443
      }
3444
    if self.op.mode == constants.INSTANCE_IMPORT:
3445
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3446
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3447
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3448

    
3449
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3450
      primary_node=self.op.pnode,
3451
      secondary_nodes=self.secondaries,
3452
      status=self.instance_status,
3453
      os_type=self.op.os_type,
3454
      memory=self.be_full[constants.BE_MEMORY],
3455
      vcpus=self.be_full[constants.BE_VCPUS],
3456
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3457
    ))
3458

    
3459
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3460
          self.secondaries)
3461
    return env, nl, nl
3462

    
3463

    
3464
  def CheckPrereq(self):
3465
    """Check prerequisites.
3466

3467
    """
3468
    if (not self.cfg.GetVGName() and
3469
        self.op.disk_template not in constants.DTS_NOT_LVM):
3470
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3471
                                 " instances")
3472

    
3473

    
3474
    if self.op.mode == constants.INSTANCE_IMPORT:
3475
      src_node = self.op.src_node
3476
      src_path = self.op.src_path
3477

    
3478
      export_info = self.rpc.call_export_info(src_node, src_path)
3479

    
3480
      if not export_info:
3481
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3482

    
3483
      if not export_info.has_section(constants.INISECT_EXP):
3484
        raise errors.ProgrammerError("Corrupted export config")
3485

    
3486
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3487
      if (int(ei_version) != constants.EXPORT_VERSION):
3488
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3489
                                   (ei_version, constants.EXPORT_VERSION))
3490

    
3491
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3492
        raise errors.OpPrereqError("Can't import instance with more than"
3493
                                   " one data disk")
3494

    
3495
      # FIXME: are the old os-es, disk sizes, etc. useful?
3496
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3497
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3498
                                                         'disk0_dump'))
3499
      self.src_image = diskimage
3500

    
3501
      if self.op.mac == constants.VALUE_AUTO:
3502
        old_name = export_info.get(constants.INISECT_INS, 'name')
3503
        if self.op.instance_name == old_name:
3504
          # FIXME: adjust every nic, when we'll be able to create instances
3505
          # with more than one
3506
          if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3507
            self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3508

    
3509
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3510

    
3511
    if self.op.start and not self.op.ip_check:
3512
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3513
                                 " adding an instance in start mode")
3514

    
3515
    if self.op.ip_check:
3516
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3517
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3518
                                   (self.check_ip, self.op.instance_name))
3519

    
3520
    # bridge verification
3521
    bridge = getattr(self.op, "bridge", None)
3522
    if bridge is None:
3523
      self.op.bridge = self.cfg.GetDefBridge()
3524
    else:
3525
      self.op.bridge = bridge
3526

    
3527
    #### allocator run
3528

    
3529
    if self.op.iallocator is not None:
3530
      self._RunAllocator()
3531

    
3532
    #### node related checks
3533

    
3534
    # check primary node
3535
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3536
    assert self.pnode is not None, \
3537
      "Cannot retrieve locked node %s" % self.op.pnode
3538
    self.secondaries = []
3539

    
3540
    # mirror node verification
3541
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3542
      if self.op.snode is None:
3543
        raise errors.OpPrereqError("The networked disk templates need"
3544
                                   " a mirror node")
3545
      if self.op.snode == pnode.name:
3546
        raise errors.OpPrereqError("The secondary node cannot be"
3547
                                   " the primary node.")
3548
      self.secondaries.append(self.op.snode)
3549

    
3550
    nodenames = [pnode.name] + self.secondaries
3551

    
3552
    req_size = _ComputeDiskSize(self.op.disk_template,
3553
                                self.op.disk_size, self.op.swap_size)
3554

    
3555
    # Check lv size requirements
3556
    if req_size is not None:
3557
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3558
                                         self.op.hypervisor)
3559
      for node in nodenames:
3560
        info = nodeinfo.get(node, None)
3561
        if not info:
3562
          raise errors.OpPrereqError("Cannot get current information"
3563
                                     " from node '%s'" % node)
3564
        vg_free = info.get('vg_free', None)
3565
        if not isinstance(vg_free, int):
3566
          raise errors.OpPrereqError("Can't compute free disk space on"
3567
                                     " node %s" % node)
3568
        if req_size > info['vg_free']:
3569
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3570
                                     " %d MB available, %d MB required" %
3571
                                     (node, info['vg_free'], req_size))
3572

    
3573
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3574

    
3575
    # os verification
3576
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3577
    if not os_obj:
3578
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3579
                                 " primary node"  % self.op.os_type)
3580

    
3581
    # bridge check on primary node
3582
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3583
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3584
                                 " destination node '%s'" %
3585
                                 (self.op.bridge, pnode.name))
3586

    
3587
    # memory check on primary node
3588
    if self.op.start:
3589
      _CheckNodeFreeMemory(self, self.pnode.name,
3590
                           "creating instance %s" % self.op.instance_name,
3591
                           self.be_full[constants.BE_MEMORY],
3592
                           self.op.hypervisor)
3593

    
3594
    if self.op.start:
3595
      self.instance_status = 'up'
3596
    else:
3597
      self.instance_status = 'down'
3598

    
3599
  def Exec(self, feedback_fn):
3600
    """Create and add the instance to the cluster.
3601

3602
    """
3603
    instance = self.op.instance_name
3604
    pnode_name = self.pnode.name
3605

    
3606
    if self.op.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3607
      mac_address = self.cfg.GenerateMAC()
3608
    else:
3609
      mac_address = self.op.mac
3610

    
3611
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3612
    if self.inst_ip is not None:
3613
      nic.ip = self.inst_ip
3614

    
3615
    ht_kind = self.op.hypervisor
3616
    if ht_kind in constants.HTS_REQ_PORT:
3617
      network_port = self.cfg.AllocatePort()
3618
    else:
3619
      network_port = None
3620

    
3621
    ##if self.op.vnc_bind_address is None:
3622
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3623

    
3624
    # this is needed because os.path.join does not accept None arguments
3625
    if self.op.file_storage_dir is None:
3626
      string_file_storage_dir = ""
3627
    else:
3628
      string_file_storage_dir = self.op.file_storage_dir
3629

    
3630
    # build the full file storage dir path
3631
    file_storage_dir = os.path.normpath(os.path.join(
3632
                                        self.cfg.GetFileStorageDir(),
3633
                                        string_file_storage_dir, instance))
3634

    
3635

    
3636
    disks = _GenerateDiskTemplate(self,
3637
                                  self.op.disk_template,
3638
                                  instance, pnode_name,
3639
                                  self.secondaries, self.op.disk_size,
3640
                                  self.op.swap_size,
3641
                                  file_storage_dir,
3642
                                  self.op.file_driver)
3643

    
3644
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3645
                            primary_node=pnode_name,
3646
                            nics=[nic], disks=disks,
3647
                            disk_template=self.op.disk_template,
3648
                            status=self.instance_status,
3649
                            network_port=network_port,
3650
                            beparams=self.op.beparams,
3651
                            hvparams=self.op.hvparams,
3652
                            hypervisor=self.op.hypervisor,
3653
                            )
3654

    
3655
    feedback_fn("* creating instance disks...")
3656
    if not _CreateDisks(self, iobj):
3657
      _RemoveDisks(self, iobj)
3658
      self.cfg.ReleaseDRBDMinors(instance)
3659
      raise errors.OpExecError("Device creation failed, reverting...")
3660

    
3661
    feedback_fn("adding instance %s to cluster config" % instance)
3662

    
3663
    self.cfg.AddInstance(iobj)
3664
    # Declare that we don't want to remove the instance lock anymore, as we've
3665
    # added the instance to the config
3666
    del self.remove_locks[locking.LEVEL_INSTANCE]
3667
    # Remove the temp. assignements for the instance's drbds
3668
    self.cfg.ReleaseDRBDMinors(instance)
3669

    
3670
    if self.op.wait_for_sync:
3671
      disk_abort = not _WaitForSync(self, iobj)
3672
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3673
      # make sure the disks are not degraded (still sync-ing is ok)
3674
      time.sleep(15)
3675
      feedback_fn("* checking mirrors status")
3676
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3677
    else:
3678
      disk_abort = False
3679

    
3680
    if disk_abort:
3681
      _RemoveDisks(self, iobj)
3682
      self.cfg.RemoveInstance(iobj.name)
3683
      # Make sure the instance lock gets removed
3684
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3685
      raise errors.OpExecError("There are some degraded disks for"
3686
                               " this instance")
3687

    
3688
    feedback_fn("creating os for instance %s on node %s" %
3689
                (instance, pnode_name))
3690

    
3691
    if iobj.disk_template != constants.DT_DISKLESS:
3692
      if self.op.mode == constants.INSTANCE_CREATE:
3693
        feedback_fn("* running the instance OS create scripts...")
3694
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3695
          raise errors.OpExecError("could not add os for instance %s"
3696
                                   " on node %s" %
3697
                                   (instance, pnode_name))
3698

    
3699
      elif self.op.mode == constants.INSTANCE_IMPORT:
3700
        feedback_fn("* running the instance OS import scripts...")
3701
        src_node = self.op.src_node
3702
        src_image = self.src_image
3703
        cluster_name = self.cfg.GetClusterName()
3704
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3705
                                                         src_node, [src_image],
3706
                                                         cluster_name)
3707
        if import_result[0]:
3708
          raise errors.OpExecError("Could not import disks for instance"
3709
                                   " %s on node %s" %
3710
                                   (instance, pnode_name))
3711
      else:
3712
        # also checked in the prereq part
3713
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3714
                                     % self.op.mode)
3715

    
3716
    if self.op.start:
3717
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3718
      feedback_fn("* starting instance...")
3719
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3720
        raise errors.OpExecError("Could not start instance")
3721

    
3722

    
3723
class LUConnectConsole(NoHooksLU):
3724
  """Connect to an instance's console.
3725

3726
  This is somewhat special in that it returns the command line that
3727
  you need to run on the master node in order to connect to the
3728
  console.
3729

3730
  """
3731
  _OP_REQP = ["instance_name"]
3732
  REQ_BGL = False
3733

    
3734
  def ExpandNames(self):
3735
    self._ExpandAndLockInstance()
3736

    
3737
  def CheckPrereq(self):
3738
    """Check prerequisites.
3739

3740
    This checks that the instance is in the cluster.
3741

3742
    """
3743
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3744
    assert self.instance is not None, \
3745
      "Cannot retrieve locked instance %s" % self.op.instance_name
3746

    
3747
  def Exec(self, feedback_fn):
3748
    """Connect to the console of an instance
3749

3750
    """
3751
    instance = self.instance
3752
    node = instance.primary_node
3753

    
3754
    node_insts = self.rpc.call_instance_list([node],
3755
                                             [instance.hypervisor])[node]
3756
    if node_insts is False:
3757
      raise errors.OpExecError("Can't connect to node %s." % node)
3758

    
3759
    if instance.name not in node_insts:
3760
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3761

    
3762
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3763

    
3764
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3765
    console_cmd = hyper.GetShellCommandForConsole(instance)
3766

    
3767
    # build ssh cmdline
3768
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3769

    
3770

    
3771
class LUReplaceDisks(LogicalUnit):
3772
  """Replace the disks of an instance.
3773

3774
  """
3775
  HPATH = "mirrors-replace"
3776
  HTYPE = constants.HTYPE_INSTANCE
3777
  _OP_REQP = ["instance_name", "mode", "disks"]
3778
  REQ_BGL = False
3779

    
3780
  def ExpandNames(self):
3781
    self._ExpandAndLockInstance()
3782

    
3783
    if not hasattr(self.op, "remote_node"):
3784
      self.op.remote_node = None
3785

    
3786
    ia_name = getattr(self.op, "iallocator", None)
3787
    if ia_name is not None:
3788
      if self.op.remote_node is not None:
3789
        raise errors.OpPrereqError("Give either the iallocator or the new"
3790
                                   " secondary, not both")
3791
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3792
    elif self.op.remote_node is not None:
3793
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3794
      if remote_node is None:
3795
        raise errors.OpPrereqError("Node '%s' not known" %
3796
                                   self.op.remote_node)
3797
      self.op.remote_node = remote_node
3798
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3799
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3800
    else:
3801
      self.needed_locks[locking.LEVEL_NODE] = []
3802
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3803

    
3804
  def DeclareLocks(self, level):
3805
    # If we're not already locking all nodes in the set we have to declare the
3806
    # instance's primary/secondary nodes.
3807
    if (level == locking.LEVEL_NODE and
3808
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3809
      self._LockInstancesNodes()
3810

    
3811
  def _RunAllocator(self):
3812
    """Compute a new secondary node using an IAllocator.
3813

3814
    """
3815
    ial = IAllocator(self,
3816
                     mode=constants.IALLOCATOR_MODE_RELOC,
3817
                     name=self.op.instance_name,
3818
                     relocate_from=[self.sec_node])
3819

    
3820
    ial.Run(self.op.iallocator)
3821

    
3822
    if not ial.success:
3823
      raise errors.OpPrereqError("Can't compute nodes using"
3824
                                 " iallocator '%s': %s" % (self.op.iallocator,
3825
                                                           ial.info))
3826
    if len(ial.nodes) != ial.required_nodes:
3827
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3828
                                 " of nodes (%s), required %s" %
3829
                                 (len(ial.nodes), ial.required_nodes))
3830
    self.op.remote_node = ial.nodes[0]
3831
    self.LogInfo("Selected new secondary for the instance: %s",
3832
                 self.op.remote_node)
3833

    
3834
  def BuildHooksEnv(self):
3835
    """Build hooks env.
3836

3837
    This runs on the master, the primary and all the secondaries.
3838

3839
    """
3840
    env = {
3841
      "MODE": self.op.mode,
3842
      "NEW_SECONDARY": self.op.remote_node,
3843
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3844
      }
3845
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3846
    nl = [
3847
      self.cfg.GetMasterNode(),
3848
      self.instance.primary_node,
3849
      ]
3850
    if self.op.remote_node is not None:
3851
      nl.append(self.op.remote_node)
3852
    return env, nl, nl
3853

    
3854
  def CheckPrereq(self):
3855
    """Check prerequisites.
3856

3857
    This checks that the instance is in the cluster.
3858

3859
    """
3860
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3861
    assert instance is not None, \
3862
      "Cannot retrieve locked instance %s" % self.op.instance_name
3863
    self.instance = instance
3864

    
3865
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3866
      raise errors.OpPrereqError("Instance's disk layout is not"
3867
                                 " network mirrored.")
3868

    
3869
    if len(instance.secondary_nodes) != 1:
3870
      raise errors.OpPrereqError("The instance has a strange layout,"
3871
                                 " expected one secondary but found %d" %
3872
                                 len(instance.secondary_nodes))
3873

    
3874
    self.sec_node = instance.secondary_nodes[0]
3875

    
3876
    ia_name = getattr(self.op, "iallocator", None)
3877
    if ia_name is not None:
3878
      self._RunAllocator()
3879

    
3880
    remote_node = self.op.remote_node
3881
    if remote_node is not None:
3882
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3883
      assert self.remote_node_info is not None, \
3884
        "Cannot retrieve locked node %s" % remote_node
3885
    else:
3886
      self.remote_node_info = None
3887
    if remote_node == instance.primary_node:
3888
      raise errors.OpPrereqError("The specified node is the primary node of"
3889
                                 " the instance.")
3890
    elif remote_node == self.sec_node:
3891
      if self.op.mode == constants.REPLACE_DISK_SEC:
3892
        # this is for DRBD8, where we can't execute the same mode of
3893
        # replacement as for drbd7 (no different port allocated)
3894
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3895
                                   " replacement")
3896
    if instance.disk_template == constants.DT_DRBD8:
3897
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3898
          remote_node is not None):
3899
        # switch to replace secondary mode
3900
        self.op.mode = constants.REPLACE_DISK_SEC
3901

    
3902
      if self.op.mode == constants.REPLACE_DISK_ALL:
3903
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3904
                                   " secondary disk replacement, not"
3905
                                   " both at once")
3906
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3907
        if remote_node is not None:
3908
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3909
                                     " the secondary while doing a primary"
3910
                                     " node disk replacement")
3911
        self.tgt_node = instance.primary_node
3912
        self.oth_node = instance.secondary_nodes[0]
3913
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3914
        self.new_node = remote_node # this can be None, in which case
3915
                                    # we don't change the secondary
3916
        self.tgt_node = instance.secondary_nodes[0]
3917
        self.oth_node = instance.primary_node
3918
      else:
3919
        raise errors.ProgrammerError("Unhandled disk replace mode")
3920

    
3921
    for name in self.op.disks:
3922
      if instance.FindDisk(name) is None:
3923
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3924
                                   (name, instance.name))
3925

    
3926
  def _ExecD8DiskOnly(self, feedback_fn):
3927
    """Replace a disk on the primary or secondary for dbrd8.
3928

3929
    The algorithm for replace is quite complicated:
3930
      - for each disk to be replaced:
3931
        - create new LVs on the target node with unique names
3932
        - detach old LVs from the drbd device
3933
        - rename old LVs to name_replaced.<time_t>
3934
        - rename new LVs to old LVs
3935
        - attach the new LVs (with the old names now) to the drbd device
3936
      - wait for sync across all devices
3937
      - for each modified disk:
3938
        - remove old LVs (which have the name name_replaces.<time_t>)
3939

3940
    Failures are not very well handled.
3941

3942
    """
3943
    steps_total = 6
3944
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3945
    instance = self.instance
3946
    iv_names = {}
3947
    vgname = self.cfg.GetVGName()
3948
    # start of work
3949
    cfg = self.cfg
3950
    tgt_node = self.tgt_node
3951
    oth_node = self.oth_node
3952

    
3953
    # Step: check device activation
3954
    self.proc.LogStep(1, steps_total, "check device existence")
3955
    info("checking volume groups")
3956
    my_vg = cfg.GetVGName()
3957
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3958
    if not results:
3959
      raise errors.OpExecError("Can't list volume groups on the nodes")
3960
    for node in oth_node, tgt_node:
3961
      res = results.get(node, False)
3962
      if not res or my_vg not in res:
3963
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3964
                                 (my_vg, node))
3965
    for dev in instance.disks:
3966
      if not dev.iv_name in self.op.disks:
3967
        continue
3968
      for node in tgt_node, oth_node:
3969
        info("checking %s on %s" % (dev.iv_name, node))
3970
        cfg.SetDiskID(dev, node)
3971
        if not self.rpc.call_blockdev_find(node, dev):
3972
          raise errors.OpExecError("Can't find device %s on node %s" %
3973
                                   (dev.iv_name, node))
3974

    
3975
    # Step: check other node consistency
3976
    self.proc.LogStep(2, steps_total, "check peer consistency")
3977
    for dev in instance.disks:
3978
      if not dev.iv_name in self.op.disks:
3979
        continue
3980
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3981
      if not _CheckDiskConsistency(self, dev, oth_node,
3982
                                   oth_node==instance.primary_node):
3983
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3984
                                 " to replace disks on this node (%s)" %
3985
                                 (oth_node, tgt_node))
3986

    
3987
    # Step: create new storage
3988
    self.proc.LogStep(3, steps_total, "allocate new storage")
3989
    for dev in instance.disks:
3990
      if not dev.iv_name in self.op.disks:
3991
        continue
3992
      size = dev.size
3993
      cfg.SetDiskID(dev, tgt_node)
3994
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3995
      names = _GenerateUniqueNames(self, lv_names)
3996
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3997
                             logical_id=(vgname, names[0]))
3998
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3999
                             logical_id=(vgname, names[1]))
4000
      new_lvs = [lv_data, lv_meta]
4001
      old_lvs = dev.children
4002
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4003
      info("creating new local storage on %s for %s" %
4004
           (tgt_node, dev.iv_name))
4005
      # since we *always* want to create this LV, we use the
4006
      # _Create...OnPrimary (which forces the creation), even if we
4007
      # are talking about the secondary node
4008
      for new_lv in new_lvs:
4009
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4010
                                        _GetInstanceInfoText(instance)):
4011
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4012
                                   " node '%s'" %
4013
                                   (new_lv.logical_id[1], tgt_node))
4014

    
4015
    # Step: for each lv, detach+rename*2+attach
4016
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4017
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4018
      info("detaching %s drbd from local storage" % dev.iv_name)
4019
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4020
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4021
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4022
      #dev.children = []
4023
      #cfg.Update(instance)
4024

    
4025
      # ok, we created the new LVs, so now we know we have the needed
4026
      # storage; as such, we proceed on the target node to rename
4027
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4028
      # using the assumption that logical_id == physical_id (which in
4029
      # turn is the unique_id on that node)
4030

    
4031
      # FIXME(iustin): use a better name for the replaced LVs
4032
      temp_suffix = int(time.time())
4033
      ren_fn = lambda d, suff: (d.physical_id[0],
4034
                                d.physical_id[1] + "_replaced-%s" % suff)
4035
      # build the rename list based on what LVs exist on the node
4036
      rlist = []
4037
      for to_ren in old_lvs:
4038
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4039
        if find_res is not None: # device exists
4040
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4041

    
4042
      info("renaming the old LVs on the target node")
4043
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4044
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4045
      # now we rename the new LVs to the old LVs
4046
      info("renaming the new LVs on the target node")
4047
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4048
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4049
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4050

    
4051
      for old, new in zip(old_lvs, new_lvs):
4052
        new.logical_id = old.logical_id
4053
        cfg.SetDiskID(new, tgt_node)
4054

    
4055
      for disk in old_lvs:
4056
        disk.logical_id = ren_fn(disk, temp_suffix)
4057
        cfg.SetDiskID(disk, tgt_node)
4058

    
4059
      # now that the new lvs have the old name, we can add them to the device
4060
      info("adding new mirror component on %s" % tgt_node)
4061
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4062
        for new_lv in new_lvs:
4063
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4064
            warning("Can't rollback device %s", hint="manually cleanup unused"
4065
                    " logical volumes")
4066
        raise errors.OpExecError("Can't add local storage to drbd")
4067

    
4068
      dev.children = new_lvs
4069
      cfg.Update(instance)
4070

    
4071
    # Step: wait for sync
4072

    
4073
    # this can fail as the old devices are degraded and _WaitForSync
4074
    # does a combined result over all disks, so we don't check its
4075
    # return value
4076
    self.proc.LogStep(5, steps_total, "sync devices")
4077
    _WaitForSync(self, instance, unlock=True)
4078

    
4079
    # so check manually all the devices
4080
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4081
      cfg.SetDiskID(dev, instance.primary_node)
4082
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4083
      if is_degr:
4084
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4085

    
4086
    # Step: remove old storage
4087
    self.proc.LogStep(6, steps_total, "removing old storage")
4088
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4089
      info("remove logical volumes for %s" % name)
4090
      for lv in old_lvs:
4091
        cfg.SetDiskID(lv, tgt_node)
4092
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4093
          warning("Can't remove old LV", hint="manually remove unused LVs")
4094
          continue
4095

    
4096
  def _ExecD8Secondary(self, feedback_fn):
4097
    """Replace the secondary node for drbd8.
4098

4099
    The algorithm for replace is quite complicated:
4100
      - for all disks of the instance:
4101
        - create new LVs on the new node with same names
4102
        - shutdown the drbd device on the old secondary
4103
        - disconnect the drbd network on the primary
4104
        - create the drbd device on the new secondary
4105
        - network attach the drbd on the primary, using an artifice:
4106
          the drbd code for Attach() will connect to the network if it
4107
          finds a device which is connected to the good local disks but
4108
          not network enabled
4109
      - wait for sync across all devices
4110
      - remove all disks from the old secondary
4111

4112
    Failures are not very well handled.
4113

4114
    """
4115
    steps_total = 6
4116
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4117
    instance = self.instance
4118
    iv_names = {}
4119
    vgname = self.cfg.GetVGName()
4120
    # start of work
4121
    cfg = self.cfg
4122
    old_node = self.tgt_node
4123
    new_node = self.new_node
4124
    pri_node = instance.primary_node
4125

    
4126
    # Step: check device activation
4127
    self.proc.LogStep(1, steps_total, "check device existence")
4128
    info("checking volume groups")
4129
    my_vg = cfg.GetVGName()
4130
    results = self.rpc.call_vg_list([pri_node, new_node])
4131
    if not results:
4132
      raise errors.OpExecError("Can't list volume groups on the nodes")
4133
    for node in pri_node, new_node:
4134
      res = results.get(node, False)
4135
      if not res or my_vg not in res:
4136
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4137
                                 (my_vg, node))
4138
    for dev in instance.disks:
4139
      if not dev.iv_name in self.op.disks:
4140
        continue
4141
      info("checking %s on %s" % (dev.iv_name, pri_node))
4142
      cfg.SetDiskID(dev, pri_node)
4143
      if not self.rpc.call_blockdev_find(pri_node, dev):
4144
        raise errors.OpExecError("Can't find device %s on node %s" %
4145
                                 (dev.iv_name, pri_node))
4146

    
4147
    # Step: check other node consistency
4148
    self.proc.LogStep(2, steps_total, "check peer consistency")
4149
    for dev in instance.disks:
4150
      if not dev.iv_name in self.op.disks:
4151
        continue
4152
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4153
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4154
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4155
                                 " unsafe to replace the secondary" %
4156
                                 pri_node)
4157

    
4158
    # Step: create new storage
4159
    self.proc.LogStep(3, steps_total, "allocate new storage")
4160
    for dev in instance.disks:
4161
      size = dev.size
4162
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4163
      # since we *always* want to create this LV, we use the
4164
      # _Create...OnPrimary (which forces the creation), even if we
4165
      # are talking about the secondary node
4166
      for new_lv in dev.children:
4167
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4168
                                        _GetInstanceInfoText(instance)):
4169
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4170
                                   " node '%s'" %
4171
                                   (new_lv.logical_id[1], new_node))
4172

    
4173

    
4174
    # Step 4: dbrd minors and drbd setups changes
4175
    # after this, we must manually remove the drbd minors on both the
4176
    # error and the success paths
4177
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4178
                                   instance.name)
4179
    logging.debug("Allocated minors %s" % (minors,))
4180
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4181
    for dev, new_minor in zip(instance.disks, minors):
4182
      size = dev.size
4183
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4184
      # create new devices on new_node
4185
      if pri_node == dev.logical_id[0]:
4186
        new_logical_id = (pri_node, new_node,
4187
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4188
                          dev.logical_id[5])
4189
      else:
4190
        new_logical_id = (new_node, pri_node,
4191
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4192
                          dev.logical_id[5])
4193
      iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4194
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4195
                    new_logical_id)
4196
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4197
                              logical_id=new_logical_id,
4198
                              children=dev.children)
4199
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4200
                                        new_drbd, False,
4201
                                        _GetInstanceInfoText(instance)):
4202
        self.cfg.ReleaseDRBDMinors(instance.name)
4203
        raise errors.OpExecError("Failed to create new DRBD on"
4204
                                 " node '%s'" % new_node)
4205

    
4206
    for dev in instance.disks:
4207
      # we have new devices, shutdown the drbd on the old secondary
4208
      info("shutting down drbd for %s on old node" % dev.iv_name)
4209
      cfg.SetDiskID(dev, old_node)
4210
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4211
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4212
                hint="Please cleanup this device manually as soon as possible")
4213

    
4214
    info("detaching primary drbds from the network (=> standalone)")
4215
    done = 0
4216
    for dev in instance.disks:
4217
      cfg.SetDiskID(dev, pri_node)
4218
      # set the network part of the physical (unique in bdev terms) id
4219
      # to None, meaning detach from network
4220
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4221
      # and 'find' the device, which will 'fix' it to match the
4222
      # standalone state
4223
      if self.rpc.call_blockdev_find(pri_node, dev):
4224
        done += 1
4225
      else:
4226
        warning("Failed to detach drbd %s from network, unusual case" %
4227
                dev.iv_name)
4228

    
4229
    if not done:
4230
      # no detaches succeeded (very unlikely)
4231
      self.cfg.ReleaseDRBDMinors(instance.name)
4232
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4233

    
4234
    # if we managed to detach at least one, we update all the disks of
4235
    # the instance to point to the new secondary
4236
    info("updating instance configuration")
4237
    for dev, _, new_logical_id in iv_names.itervalues():
4238
      dev.logical_id = new_logical_id
4239
      cfg.SetDiskID(dev, pri_node)
4240
    cfg.Update(instance)
4241
    # we can remove now the temp minors as now the new values are
4242
    # written to the config file (and therefore stable)
4243
    self.cfg.ReleaseDRBDMinors(instance.name)
4244

    
4245
    # and now perform the drbd attach
4246
    info("attaching primary drbds to new secondary (standalone => connected)")
4247
    failures = []
4248
    for dev in instance.disks:
4249
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4250
      # since the attach is smart, it's enough to 'find' the device,
4251
      # it will automatically activate the network, if the physical_id
4252
      # is correct
4253
      cfg.SetDiskID(dev, pri_node)
4254
      logging.debug("Disk to attach: %s", dev)
4255
      if not self.rpc.call_blockdev_find(pri_node, dev):
4256
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4257
                "please do a gnt-instance info to see the status of disks")
4258

    
4259
    # this can fail as the old devices are degraded and _WaitForSync
4260
    # does a combined result over all disks, so we don't check its
4261
    # return value
4262
    self.proc.LogStep(5, steps_total, "sync devices")
4263
    _WaitForSync(self, instance, unlock=True)
4264

    
4265
    # so check manually all the devices
4266
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4267
      cfg.SetDiskID(dev, pri_node)
4268
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4269
      if is_degr:
4270
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4271

    
4272
    self.proc.LogStep(6, steps_total, "removing old storage")
4273
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4274
      info("remove logical volumes for %s" % name)
4275
      for lv in old_lvs:
4276
        cfg.SetDiskID(lv, old_node)
4277
        if not self.rpc.call_blockdev_remove(old_node, lv):
4278
          warning("Can't remove LV on old secondary",
4279
                  hint="Cleanup stale volumes by hand")
4280

    
4281
  def Exec(self, feedback_fn):
4282
    """Execute disk replacement.
4283

4284
    This dispatches the disk replacement to the appropriate handler.
4285

4286
    """
4287
    instance = self.instance
4288

    
4289
    # Activate the instance disks if we're replacing them on a down instance
4290
    if instance.status == "down":
4291
      _StartInstanceDisks(self, instance, True)
4292

    
4293
    if instance.disk_template == constants.DT_DRBD8:
4294
      if self.op.remote_node is None:
4295
        fn = self._ExecD8DiskOnly
4296
      else:
4297
        fn = self._ExecD8Secondary
4298
    else:
4299
      raise errors.ProgrammerError("Unhandled disk replacement case")
4300

    
4301
    ret = fn(feedback_fn)
4302

    
4303
    # Deactivate the instance disks if we're replacing them on a down instance
4304
    if instance.status == "down":
4305
      _SafeShutdownInstanceDisks(self, instance)
4306

    
4307
    return ret
4308

    
4309

    
4310
class LUGrowDisk(LogicalUnit):
4311
  """Grow a disk of an instance.
4312

4313
  """
4314
  HPATH = "disk-grow"
4315
  HTYPE = constants.HTYPE_INSTANCE
4316
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4317
  REQ_BGL = False
4318

    
4319
  def ExpandNames(self):
4320
    self._ExpandAndLockInstance()
4321
    self.needed_locks[locking.LEVEL_NODE] = []
4322
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4323

    
4324
  def DeclareLocks(self, level):
4325
    if level == locking.LEVEL_NODE:
4326
      self._LockInstancesNodes()
4327

    
4328
  def BuildHooksEnv(self):
4329
    """Build hooks env.
4330

4331
    This runs on the master, the primary and all the secondaries.
4332

4333
    """
4334
    env = {
4335
      "DISK": self.op.disk,
4336
      "AMOUNT": self.op.amount,
4337
      }
4338
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4339
    nl = [
4340
      self.cfg.GetMasterNode(),
4341
      self.instance.primary_node,
4342
      ]
4343
    return env, nl, nl
4344

    
4345
  def CheckPrereq(self):
4346
    """Check prerequisites.
4347

4348
    This checks that the instance is in the cluster.
4349

4350
    """
4351
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4352
    assert instance is not None, \
4353
      "Cannot retrieve locked instance %s" % self.op.instance_name
4354

    
4355
    self.instance = instance
4356

    
4357
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4358
      raise errors.OpPrereqError("Instance's disk layout does not support"
4359
                                 " growing.")
4360

    
4361
    if instance.FindDisk(self.op.disk) is None:
4362
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4363
                                 (self.op.disk, instance.name))
4364

    
4365
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4366
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4367
                                       instance.hypervisor)
4368
    for node in nodenames:
4369
      info = nodeinfo.get(node, None)
4370
      if not info:
4371
        raise errors.OpPrereqError("Cannot get current information"
4372
                                   " from node '%s'" % node)
4373
      vg_free = info.get('vg_free', None)
4374
      if not isinstance(vg_free, int):
4375
        raise errors.OpPrereqError("Can't compute free disk space on"
4376
                                   " node %s" % node)
4377
      if self.op.amount > info['vg_free']:
4378
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4379
                                   " %d MiB available, %d MiB required" %
4380
                                   (node, info['vg_free'], self.op.amount))
4381

    
4382
  def Exec(self, feedback_fn):
4383
    """Execute disk grow.
4384

4385
    """
4386
    instance = self.instance
4387
    disk = instance.FindDisk(self.op.disk)
4388
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4389
      self.cfg.SetDiskID(disk, node)
4390
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4391
      if (not result or not isinstance(result, (list, tuple)) or
4392
          len(result) != 2):
4393
        raise errors.OpExecError("grow request failed to node %s" % node)
4394
      elif not result[0]:
4395
        raise errors.OpExecError("grow request failed to node %s: %s" %
4396
                                 (node, result[1]))
4397
    disk.RecordGrow(self.op.amount)
4398
    self.cfg.Update(instance)
4399
    if self.op.wait_for_sync:
4400
      disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4401
      if disk_abort:
4402
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4403
                             " status.\nPlease check the instance.")
4404

    
4405

    
4406
class LUQueryInstanceData(NoHooksLU):
4407
  """Query runtime instance data.
4408

4409
  """
4410
  _OP_REQP = ["instances", "static"]
4411
  REQ_BGL = False
4412

    
4413
  def ExpandNames(self):
4414
    self.needed_locks = {}
4415
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4416

    
4417
    if not isinstance(self.op.instances, list):
4418
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4419

    
4420
    if self.op.instances:
4421
      self.wanted_names = []
4422
      for name in self.op.instances:
4423
        full_name = self.cfg.ExpandInstanceName(name)
4424
        if full_name is None:
4425
          raise errors.OpPrereqError("Instance '%s' not known" %
4426
                                     self.op.instance_name)
4427
        self.wanted_names.append(full_name)
4428
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4429
    else:
4430
      self.wanted_names = None
4431
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4432

    
4433
    self.needed_locks[locking.LEVEL_NODE] = []
4434
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4435

    
4436
  def DeclareLocks(self, level):
4437
    if level == locking.LEVEL_NODE:
4438
      self._LockInstancesNodes()
4439

    
4440
  def CheckPrereq(self):
4441
    """Check prerequisites.
4442

4443
    This only checks the optional instance list against the existing names.
4444

4445
    """
4446
    if self.wanted_names is None:
4447
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4448

    
4449
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4450
                             in self.wanted_names]
4451
    return
4452

    
4453
  def _ComputeDiskStatus(self, instance, snode, dev):
4454
    """Compute block device status.
4455

4456
    """
4457
    static = self.op.static
4458
    if not static:
4459
      self.cfg.SetDiskID(dev, instance.primary_node)
4460
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4461
    else:
4462
      dev_pstatus = None
4463

    
4464
    if dev.dev_type in constants.LDS_DRBD:
4465
      # we change the snode then (otherwise we use the one passed in)
4466
      if dev.logical_id[0] == instance.primary_node:
4467
        snode = dev.logical_id[1]
4468
      else:
4469
        snode = dev.logical_id[0]
4470

    
4471
    if snode and not static:
4472
      self.cfg.SetDiskID(dev, snode)
4473
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4474
    else:
4475
      dev_sstatus = None
4476

    
4477
    if dev.children:
4478
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4479
                      for child in dev.children]
4480
    else:
4481
      dev_children = []
4482

    
4483
    data = {
4484
      "iv_name": dev.iv_name,
4485
      "dev_type": dev.dev_type,
4486
      "logical_id": dev.logical_id,
4487
      "physical_id": dev.physical_id,
4488
      "pstatus": dev_pstatus,
4489
      "sstatus": dev_sstatus,
4490
      "children": dev_children,
4491
      }
4492

    
4493
    return data
4494

    
4495
  def Exec(self, feedback_fn):
4496
    """Gather and return data"""
4497
    result = {}
4498

    
4499
    cluster = self.cfg.GetClusterInfo()
4500

    
4501
    for instance in self.wanted_instances:
4502
      if not self.op.static:
4503
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4504
                                                  instance.name,
4505
                                                  instance.hypervisor)
4506
        if remote_info and "state" in remote_info:
4507
          remote_state = "up"
4508
        else:
4509
          remote_state = "down"
4510
      else:
4511
        remote_state = None
4512
      if instance.status == "down":
4513
        config_state = "down"
4514
      else:
4515
        config_state = "up"
4516

    
4517
      disks = [self._ComputeDiskStatus(instance, None, device)
4518
               for device in instance.disks]
4519

    
4520
      idict = {
4521
        "name": instance.name,
4522
        "config_state": config_state,
4523
        "run_state": remote_state,
4524
        "pnode": instance.primary_node,
4525
        "snodes": instance.secondary_nodes,
4526
        "os": instance.os,
4527
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4528
        "disks": disks,
4529
        "hypervisor": instance.hypervisor,
4530
        "network_port": instance.network_port,
4531
        "hv_instance": instance.hvparams,
4532
        "hv_actual": cluster.FillHV(instance),
4533
        "be_instance": instance.beparams,
4534
        "be_actual": cluster.FillBE(instance),
4535
        }
4536

    
4537
      result[instance.name] = idict
4538

    
4539
    return result
4540

    
4541

    
4542
class LUSetInstanceParams(LogicalUnit):
4543
  """Modifies an instances's parameters.
4544

4545
  """
4546
  HPATH = "instance-modify"
4547
  HTYPE = constants.HTYPE_INSTANCE
4548
  _OP_REQP = ["instance_name", "hvparams"]
4549
  REQ_BGL = False
4550

    
4551
  def ExpandNames(self):
4552
    self._ExpandAndLockInstance()
4553
    self.needed_locks[locking.LEVEL_NODE] = []
4554
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4555

    
4556

    
4557
  def DeclareLocks(self, level):
4558
    if level == locking.LEVEL_NODE:
4559
      self._LockInstancesNodes()
4560

    
4561
  def BuildHooksEnv(self):
4562
    """Build hooks env.
4563

4564
    This runs on the master, primary and secondaries.
4565

4566
    """
4567
    args = dict()
4568
    if constants.BE_MEMORY in self.be_new:
4569
      args['memory'] = self.be_new[constants.BE_MEMORY]
4570
    if constants.BE_VCPUS in self.be_new:
4571
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4572
    if self.do_ip or self.do_bridge or self.mac:
4573
      if self.do_ip:
4574
        ip = self.ip
4575
      else:
4576
        ip = self.instance.nics[0].ip
4577
      if self.bridge:
4578
        bridge = self.bridge
4579
      else:
4580
        bridge = self.instance.nics[0].bridge
4581
      if self.mac:
4582
        mac = self.mac
4583
      else:
4584
        mac = self.instance.nics[0].mac
4585
      args['nics'] = [(ip, bridge, mac)]
4586
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4587
    nl = [self.cfg.GetMasterNode(),
4588
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4589
    return env, nl, nl
4590

    
4591
  def CheckPrereq(self):
4592
    """Check prerequisites.
4593

4594
    This only checks the instance list against the existing names.
4595

4596
    """
4597
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4598
    # a separate CheckArguments function, if we implement one, so the operation
4599
    # can be aborted without waiting for any lock, should it have an error...
4600
    self.ip = getattr(self.op, "ip", None)
4601
    self.mac = getattr(self.op, "mac", None)
4602
    self.bridge = getattr(self.op, "bridge", None)
4603
    self.kernel_path = getattr(self.op, "kernel_path", None)
4604
    self.initrd_path = getattr(self.op, "initrd_path", None)
4605
    self.force = getattr(self.op, "force", None)
4606
    all_parms = [self.ip, self.bridge, self.mac]
4607
    if (all_parms.count(None) == len(all_parms) and
4608
        not self.op.hvparams and
4609
        not self.op.beparams):
4610
      raise errors.OpPrereqError("No changes submitted")
4611
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4612
      val = self.op.beparams.get(item, None)
4613
      if val is not None:
4614
        try:
4615
          val = int(val)
4616
        except ValueError, err:
4617
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4618
        self.op.beparams[item] = val
4619
    if self.ip is not None:
4620
      self.do_ip = True
4621
      if self.ip.lower() == "none":
4622
        self.ip = None
4623
      else:
4624
        if not utils.IsValidIP(self.ip):
4625
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4626
    else:
4627
      self.do_ip = False
4628
    self.do_bridge = (self.bridge is not None)
4629
    if self.mac is not None:
4630
      if self.cfg.IsMacInUse(self.mac):
4631
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4632
                                   self.mac)
4633
      if not utils.IsValidMac(self.mac):
4634
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4635

    
4636
    # checking the new params on the primary/secondary nodes
4637

    
4638
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4639
    assert self.instance is not None, \
4640
      "Cannot retrieve locked instance %s" % self.op.instance_name
4641
    pnode = self.instance.primary_node
4642
    nodelist = [pnode]
4643
    nodelist.extend(instance.secondary_nodes)
4644

    
4645
    # hvparams processing
4646
    if self.op.hvparams:
4647
      i_hvdict = copy.deepcopy(instance.hvparams)
4648
      for key, val in self.op.hvparams.iteritems():
4649
        if val is None:
4650
          try:
4651
            del i_hvdict[key]
4652
          except KeyError:
4653
            pass
4654
        else:
4655
          i_hvdict[key] = val
4656
      cluster = self.cfg.GetClusterInfo()
4657
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4658
                                i_hvdict)
4659
      # local check
4660
      hypervisor.GetHypervisor(
4661
        instance.hypervisor).CheckParameterSyntax(hv_new)
4662
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4663
      self.hv_new = hv_new # the new actual values
4664
      self.hv_inst = i_hvdict # the new dict (without defaults)
4665
    else:
4666
      self.hv_new = self.hv_inst = {}
4667

    
4668
    # beparams processing
4669
    if self.op.beparams:
4670
      i_bedict = copy.deepcopy(instance.beparams)
4671
      for key, val in self.op.beparams.iteritems():
4672
        if val is None:
4673
          try:
4674
            del i_bedict[key]
4675
          except KeyError:
4676
            pass
4677
        else:
4678
          i_bedict[key] = val
4679
      cluster = self.cfg.GetClusterInfo()
4680
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4681
                                i_bedict)
4682
      self.be_new = be_new # the new actual values
4683
      self.be_inst = i_bedict # the new dict (without defaults)
4684
    else:
4685
      self.hv_new = self.hv_inst = {}
4686

    
4687
    self.warn = []
4688

    
4689
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4690
      mem_check_list = [pnode]
4691
      if be_new[constants.BE_AUTO_BALANCE]:
4692
        # either we changed auto_balance to yes or it was from before
4693
        mem_check_list.extend(instance.secondary_nodes)
4694
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4695
                                                  instance.hypervisor)
4696
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4697
                                         instance.hypervisor)
4698

    
4699
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4700
        # Assume the primary node is unreachable and go ahead
4701
        self.warn.append("Can't get info from primary node %s" % pnode)
4702
      else:
4703
        if instance_info:
4704
          current_mem = instance_info['memory']
4705
        else:
4706
          # Assume instance not running
4707
          # (there is a slight race condition here, but it's not very probable,
4708
          # and we have no other way to check)
4709
          current_mem = 0
4710
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4711
                    nodeinfo[pnode]['memory_free'])
4712
        if miss_mem > 0:
4713
          raise errors.OpPrereqError("This change will prevent the instance"
4714
                                     " from starting, due to %d MB of memory"
4715
                                     " missing on its primary node" % miss_mem)
4716

    
4717
      if be_new[constants.BE_AUTO_BALANCE]:
4718
        for node in instance.secondary_nodes:
4719
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4720
            self.warn.append("Can't get info from secondary node %s" % node)
4721
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4722
            self.warn.append("Not enough memory to failover instance to"
4723
                             " secondary node %s" % node)
4724

    
4725
    return
4726

    
4727
  def Exec(self, feedback_fn):
4728
    """Modifies an instance.
4729

4730
    All parameters take effect only at the next restart of the instance.
4731
    """
4732
    # Process here the warnings from CheckPrereq, as we don't have a
4733
    # feedback_fn there.
4734
    for warn in self.warn:
4735
      feedback_fn("WARNING: %s" % warn)
4736

    
4737
    result = []
4738
    instance = self.instance
4739
    if self.do_ip:
4740
      instance.nics[0].ip = self.ip
4741
      result.append(("ip", self.ip))
4742
    if self.bridge:
4743
      instance.nics[0].bridge = self.bridge
4744
      result.append(("bridge", self.bridge))
4745
    if self.mac:
4746
      instance.nics[0].mac = self.mac
4747
      result.append(("mac", self.mac))
4748
    if self.op.hvparams:
4749
      instance.hvparams = self.hv_new
4750
      for key, val in self.op.hvparams.iteritems():
4751
        result.append(("hv/%s" % key, val))
4752
    if self.op.beparams:
4753
      instance.beparams = self.be_inst
4754
      for key, val in self.op.beparams.iteritems():
4755
        result.append(("be/%s" % key, val))
4756

    
4757
    self.cfg.Update(instance)
4758

    
4759
    return result
4760

    
4761

    
4762
class LUQueryExports(NoHooksLU):
4763
  """Query the exports list
4764

4765
  """
4766
  _OP_REQP = ['nodes']
4767
  REQ_BGL = False
4768

    
4769
  def ExpandNames(self):
4770
    self.needed_locks = {}
4771
    self.share_locks[locking.LEVEL_NODE] = 1
4772
    if not self.op.nodes:
4773
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4774
    else:
4775
      self.needed_locks[locking.LEVEL_NODE] = \
4776
        _GetWantedNodes(self, self.op.nodes)
4777

    
4778
  def CheckPrereq(self):
4779
    """Check prerequisites.
4780

4781
    """
4782
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4783

    
4784
  def Exec(self, feedback_fn):
4785
    """Compute the list of all the exported system images.
4786

4787
    Returns:
4788
      a dictionary with the structure node->(export-list)
4789
      where export-list is a list of the instances exported on
4790
      that node.
4791

4792
    """
4793
    return self.rpc.call_export_list(self.nodes)
4794

    
4795

    
4796
class LUExportInstance(LogicalUnit):
4797
  """Export an instance to an image in the cluster.
4798

4799
  """
4800
  HPATH = "instance-export"
4801
  HTYPE = constants.HTYPE_INSTANCE
4802
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4803
  REQ_BGL = False
4804

    
4805
  def ExpandNames(self):
4806
    self._ExpandAndLockInstance()
4807
    # FIXME: lock only instance primary and destination node
4808
    #
4809
    # Sad but true, for now we have do lock all nodes, as we don't know where
4810
    # the previous export might be, and and in this LU we search for it and
4811
    # remove it from its current node. In the future we could fix this by:
4812
    #  - making a tasklet to search (share-lock all), then create the new one,
4813
    #    then one to remove, after
4814
    #  - removing the removal operation altoghether
4815
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4816

    
4817
  def DeclareLocks(self, level):
4818
    """Last minute lock declaration."""
4819
    # All nodes are locked anyway, so nothing to do here.
4820

    
4821
  def BuildHooksEnv(self):
4822
    """Build hooks env.
4823

4824
    This will run on the master, primary node and target node.
4825

4826
    """
4827
    env = {
4828
      "EXPORT_NODE": self.op.target_node,
4829
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4830
      }
4831
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4832
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4833
          self.op.target_node]
4834
    return env, nl, nl
4835

    
4836
  def CheckPrereq(self):
4837
    """Check prerequisites.
4838

4839
    This checks that the instance and node names are valid.
4840

4841
    """
4842
    instance_name = self.op.instance_name
4843
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4844
    assert self.instance is not None, \
4845
          "Cannot retrieve locked instance %s" % self.op.instance_name
4846

    
4847
    self.dst_node = self.cfg.GetNodeInfo(
4848
      self.cfg.ExpandNodeName(self.op.target_node))
4849

    
4850
    assert self.dst_node is not None, \
4851
          "Cannot retrieve locked node %s" % self.op.target_node
4852

    
4853
    # instance disk type verification
4854
    for disk in self.instance.disks:
4855
      if disk.dev_type == constants.LD_FILE:
4856
        raise errors.OpPrereqError("Export not supported for instances with"
4857
                                   " file-based disks")
4858

    
4859
  def Exec(self, feedback_fn):
4860
    """Export an instance to an image in the cluster.
4861

4862
    """
4863
    instance = self.instance
4864
    dst_node = self.dst_node
4865
    src_node = instance.primary_node
4866
    if self.op.shutdown:
4867
      # shutdown the instance, but not the disks
4868
      if not self.rpc.call_instance_shutdown(src_node, instance):
4869
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4870
                                 (instance.name, src_node))
4871

    
4872
    vgname = self.cfg.GetVGName()
4873

    
4874
    snap_disks = []
4875

    
4876
    try:
4877
      for disk in instance.disks:
4878
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4879
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4880

    
4881
        if not new_dev_name:
4882
          self.LogWarning("Could not snapshot block device %s on node %s",
4883
                          disk.logical_id[1], src_node)
4884
          snap_disks.append(False)
4885
        else:
4886
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4887
                                 logical_id=(vgname, new_dev_name),
4888
                                 physical_id=(vgname, new_dev_name),
4889
                                 iv_name=disk.iv_name)
4890
          snap_disks.append(new_dev)
4891

    
4892
    finally:
4893
      if self.op.shutdown and instance.status == "up":
4894
        if not self.rpc.call_instance_start(src_node, instance, None):
4895
          _ShutdownInstanceDisks(self, instance)
4896
          raise errors.OpExecError("Could not start instance")
4897

    
4898
    # TODO: check for size
4899

    
4900
    cluster_name = self.cfg.GetClusterName()
4901
    for dev in snap_disks:
4902
      if dev:
4903
        if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4904
                                             instance, cluster_name):
4905
          self.LogWarning("Could not export block device %s from node %s to"
4906
                          " node %s", dev.logical_id[1], src_node,
4907
                          dst_node.name)
4908
        if not self.rpc.call_blockdev_remove(src_node, dev):
4909
          self.LogWarning("Could not remove snapshot block device %s from node"
4910
                          " %s", dev.logical_id[1], src_node)
4911

    
4912
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4913
      self.LogWarning("Could not finalize export for instance %s on node %s",
4914
                      instance.name, dst_node.name)
4915

    
4916
    nodelist = self.cfg.GetNodeList()
4917
    nodelist.remove(dst_node.name)
4918

    
4919
    # on one-node clusters nodelist will be empty after the removal
4920
    # if we proceed the backup would be removed because OpQueryExports
4921
    # substitutes an empty list with the full cluster node list.
4922
    if nodelist:
4923
      exportlist = self.rpc.call_export_list(nodelist)
4924
      for node in exportlist:
4925
        if instance.name in exportlist[node]:
4926
          if not self.rpc.call_export_remove(node, instance.name):
4927
            self.LogWarning("Could not remove older export for instance %s"
4928
                            " on node %s", instance.name, node)
4929

    
4930

    
4931
class LURemoveExport(NoHooksLU):
4932
  """Remove exports related to the named instance.
4933

4934
  """
4935
  _OP_REQP = ["instance_name"]
4936
  REQ_BGL = False
4937

    
4938
  def ExpandNames(self):
4939
    self.needed_locks = {}
4940
    # We need all nodes to be locked in order for RemoveExport to work, but we
4941
    # don't need to lock the instance itself, as nothing will happen to it (and
4942
    # we can remove exports also for a removed instance)
4943
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4944

    
4945
  def CheckPrereq(self):
4946
    """Check prerequisites.
4947
    """
4948
    pass
4949

    
4950
  def Exec(self, feedback_fn):
4951
    """Remove any export.
4952

4953
    """
4954
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4955
    # If the instance was not found we'll try with the name that was passed in.
4956
    # This will only work if it was an FQDN, though.
4957
    fqdn_warn = False
4958
    if not instance_name:
4959
      fqdn_warn = True
4960
      instance_name = self.op.instance_name
4961

    
4962
    exportlist = self.rpc.call_export_list(self.acquired_locks[
4963
      locking.LEVEL_NODE])
4964
    found = False
4965
    for node in exportlist:
4966
      if instance_name in exportlist[node]:
4967
        found = True
4968
        if not self.rpc.call_export_remove(node, instance_name):
4969
          logging.error("Could not remove export for instance %s"
4970
                        " on node %s", instance_name, node)
4971

    
4972
    if fqdn_warn and not found:
4973
      feedback_fn("Export not found. If trying to remove an export belonging"
4974
                  " to a deleted instance please use its Fully Qualified"
4975
                  " Domain Name.")
4976

    
4977

    
4978
class TagsLU(NoHooksLU):
4979
  """Generic tags LU.
4980

4981
  This is an abstract class which is the parent of all the other tags LUs.
4982

4983
  """
4984

    
4985
  def ExpandNames(self):
4986
    self.needed_locks = {}
4987
    if self.op.kind == constants.TAG_NODE:
4988
      name = self.cfg.ExpandNodeName(self.op.name)
4989
      if name is None:
4990
        raise errors.OpPrereqError("Invalid node name (%s)" %
4991
                                   (self.op.name,))
4992
      self.op.name = name
4993
      self.needed_locks[locking.LEVEL_NODE] = name
4994
    elif self.op.kind == constants.TAG_INSTANCE:
4995
      name = self.cfg.ExpandInstanceName(self.op.name)
4996
      if name is None:
4997
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4998
                                   (self.op.name,))
4999
      self.op.name = name
5000
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5001

    
5002
  def CheckPrereq(self):
5003
    """Check prerequisites.
5004

5005
    """
5006
    if self.op.kind == constants.TAG_CLUSTER:
5007
      self.target = self.cfg.GetClusterInfo()
5008
    elif self.op.kind == constants.TAG_NODE:
5009
      self.target = self.cfg.GetNodeInfo(self.op.name)
5010
    elif self.op.kind == constants.TAG_INSTANCE:
5011
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5012
    else:
5013
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5014
                                 str(self.op.kind))
5015

    
5016

    
5017
class LUGetTags(TagsLU):
5018
  """Returns the tags of a given object.
5019

5020
  """
5021
  _OP_REQP = ["kind", "name"]
5022
  REQ_BGL = False
5023

    
5024
  def Exec(self, feedback_fn):
5025
    """Returns the tag list.
5026

5027
    """
5028
    return list(self.target.GetTags())
5029

    
5030

    
5031
class LUSearchTags(NoHooksLU):
5032
  """Searches the tags for a given pattern.
5033

5034
  """
5035
  _OP_REQP = ["pattern"]
5036
  REQ_BGL = False
5037

    
5038
  def ExpandNames(self):
5039
    self.needed_locks = {}
5040

    
5041
  def CheckPrereq(self):
5042
    """Check prerequisites.
5043

5044
    This checks the pattern passed for validity by compiling it.
5045

5046
    """
5047
    try:
5048
      self.re = re.compile(self.op.pattern)
5049
    except re.error, err:
5050
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5051
                                 (self.op.pattern, err))
5052

    
5053
  def Exec(self, feedback_fn):
5054
    """Returns the tag list.
5055

5056
    """
5057
    cfg = self.cfg
5058
    tgts = [("/cluster", cfg.GetClusterInfo())]
5059
    ilist = cfg.GetAllInstancesInfo().values()
5060
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5061
    nlist = cfg.GetAllNodesInfo().values()
5062
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5063
    results = []
5064
    for path, target in tgts:
5065
      for tag in target.GetTags():
5066
        if self.re.search(tag):
5067
          results.append((path, tag))
5068
    return results
5069

    
5070

    
5071
class LUAddTags(TagsLU):
5072
  """Sets a tag on a given object.
5073

5074
  """
5075
  _OP_REQP = ["kind", "name", "tags"]
5076
  REQ_BGL = False
5077

    
5078
  def CheckPrereq(self):
5079
    """Check prerequisites.
5080

5081
    This checks the type and length of the tag name and value.
5082

5083
    """
5084
    TagsLU.CheckPrereq(self)
5085
    for tag in self.op.tags:
5086
      objects.TaggableObject.ValidateTag(tag)
5087

    
5088
  def Exec(self, feedback_fn):
5089
    """Sets the tag.
5090

5091
    """
5092
    try:
5093
      for tag in self.op.tags:
5094
        self.target.AddTag(tag)
5095
    except errors.TagError, err:
5096
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5097
    try:
5098
      self.cfg.Update(self.target)
5099
    except errors.ConfigurationError:
5100
      raise errors.OpRetryError("There has been a modification to the"
5101
                                " config file and the operation has been"
5102
                                " aborted. Please retry.")
5103

    
5104

    
5105
class LUDelTags(TagsLU):
5106
  """Delete a list of tags from a given object.
5107

5108
  """
5109
  _OP_REQP = ["kind", "name", "tags"]
5110
  REQ_BGL = False
5111

    
5112
  def CheckPrereq(self):
5113
    """Check prerequisites.
5114

5115
    This checks that we have the given tag.
5116

5117
    """
5118
    TagsLU.CheckPrereq(self)
5119
    for tag in self.op.tags:
5120
      objects.TaggableObject.ValidateTag(tag)
5121
    del_tags = frozenset(self.op.tags)
5122
    cur_tags = self.target.GetTags()
5123
    if not del_tags <= cur_tags:
5124
      diff_tags = del_tags - cur_tags
5125
      diff_names = ["'%s'" % tag for tag in diff_tags]
5126
      diff_names.sort()
5127
      raise errors.OpPrereqError("Tag(s) %s not found" %
5128
                                 (",".join(diff_names)))
5129

    
5130
  def Exec(self, feedback_fn):
5131
    """Remove the tag from the object.
5132

5133
    """
5134
    for tag in self.op.tags:
5135
      self.target.RemoveTag(tag)
5136
    try:
5137
      self.cfg.Update(self.target)
5138
    except errors.ConfigurationError:
5139
      raise errors.OpRetryError("There has been a modification to the"
5140
                                " config file and the operation has been"
5141
                                " aborted. Please retry.")
5142

    
5143

    
5144
class LUTestDelay(NoHooksLU):
5145
  """Sleep for a specified amount of time.
5146

5147
  This LU sleeps on the master and/or nodes for a specified amount of
5148
  time.
5149

5150
  """
5151
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5152
  REQ_BGL = False
5153

    
5154
  def ExpandNames(self):
5155
    """Expand names and set required locks.
5156

5157
    This expands the node list, if any.
5158

5159
    """
5160
    self.needed_locks = {}
5161
    if self.op.on_nodes:
5162
      # _GetWantedNodes can be used here, but is not always appropriate to use
5163
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5164
      # more information.
5165
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5166
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5167

    
5168
  def CheckPrereq(self):
5169
    """Check prerequisites.
5170

5171
    """
5172

    
5173
  def Exec(self, feedback_fn):
5174
    """Do the actual sleep.
5175

5176
    """
5177
    if self.op.on_master:
5178
      if not utils.TestDelay(self.op.duration):
5179
        raise errors.OpExecError("Error during master delay test")
5180
    if self.op.on_nodes:
5181
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5182
      if not result:
5183
        raise errors.OpExecError("Complete failure from rpc call")
5184
      for node, node_result in result.items():
5185
        if not node_result:
5186
          raise errors.OpExecError("Failure during rpc call to node %s,"
5187
                                   " result: %s" % (node, node_result))
5188

    
5189

    
5190
class IAllocator(object):
5191
  """IAllocator framework.
5192

5193
  An IAllocator instance has three sets of attributes:
5194
    - cfg that is needed to query the cluster
5195
    - input data (all members of the _KEYS class attribute are required)
5196
    - four buffer attributes (in|out_data|text), that represent the
5197
      input (to the external script) in text and data structure format,
5198
      and the output from it, again in two formats
5199
    - the result variables from the script (success, info, nodes) for
5200
      easy usage
5201

5202
  """
5203
  _ALLO_KEYS = [
5204
    "mem_size", "disks", "disk_template",
5205
    "os", "tags", "nics", "vcpus",
5206
    ]
5207
  _RELO_KEYS = [
5208
    "relocate_from",
5209
    ]
5210

    
5211
  def __init__(self, lu, mode, name, **kwargs):
5212
    self.lu = lu
5213
    # init buffer variables
5214
    self.in_text = self.out_text = self.in_data = self.out_data = None
5215
    # init all input fields so that pylint is happy
5216
    self.mode = mode
5217
    self.name = name
5218
    self.mem_size = self.disks = self.disk_template = None
5219
    self.os = self.tags = self.nics = self.vcpus = None
5220
    self.relocate_from = None
5221
    # computed fields
5222
    self.required_nodes = None
5223
    # init result fields
5224
    self.success = self.info = self.nodes = None
5225
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5226
      keyset = self._ALLO_KEYS
5227
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5228
      keyset = self._RELO_KEYS
5229
    else:
5230
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5231
                                   " IAllocator" % self.mode)
5232
    for key in kwargs:
5233
      if key not in keyset:
5234
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5235
                                     " IAllocator" % key)
5236
      setattr(self, key, kwargs[key])
5237
    for key in keyset:
5238
      if key not in kwargs:
5239
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5240
                                     " IAllocator" % key)
5241
    self._BuildInputData()
5242

    
5243
  def _ComputeClusterData(self):
5244
    """Compute the generic allocator input data.
5245

5246
    This is the data that is independent of the actual operation.
5247

5248
    """
5249
    cfg = self.lu.cfg
5250
    cluster_info = cfg.GetClusterInfo()
5251
    # cluster data
5252
    data = {
5253
      "version": 1,
5254
      "cluster_name": cfg.GetClusterName(),
5255
      "cluster_tags": list(cluster_info.GetTags()),
5256
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5257
      # we don't have job IDs
5258
      }
5259

    
5260
    i_list = []
5261
    cluster = self.cfg.GetClusterInfo()
5262
    for iname in cfg.GetInstanceList():
5263
      i_obj = cfg.GetInstanceInfo(iname)
5264
      i_list.append((i_obj, cluster.FillBE(i_obj)))
5265

    
5266
    # node data
5267
    node_results = {}
5268
    node_list = cfg.GetNodeList()
5269
    # FIXME: here we have only one hypervisor information, but
5270
    # instance can belong to different hypervisors
5271
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5272
                                           cfg.GetHypervisorType())
5273
    for nname in node_list:
5274
      ninfo = cfg.GetNodeInfo(nname)
5275
      if nname not in node_data or not isinstance(node_data[nname], dict):
5276
        raise errors.OpExecError("Can't get data for node %s" % nname)
5277
      remote_info = node_data[nname]
5278
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5279
                   'vg_size', 'vg_free', 'cpu_total']:
5280
        if attr not in remote_info:
5281
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5282
                                   (nname, attr))
5283
        try:
5284
          remote_info[attr] = int(remote_info[attr])
5285
        except ValueError, err:
5286
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5287
                                   " %s" % (nname, attr, str(err)))
5288
      # compute memory used by primary instances
5289
      i_p_mem = i_p_up_mem = 0
5290
      for iinfo, beinfo in i_list:
5291
        if iinfo.primary_node == nname:
5292
          i_p_mem += beinfo[constants.BE_MEMORY]
5293
          if iinfo.status == "up":
5294
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5295

    
5296
      # compute memory used by instances
5297
      pnr = {
5298
        "tags": list(ninfo.GetTags()),
5299
        "total_memory": remote_info['memory_total'],
5300
        "reserved_memory": remote_info['memory_dom0'],
5301
        "free_memory": remote_info['memory_free'],
5302
        "i_pri_memory": i_p_mem,
5303
        "i_pri_up_memory": i_p_up_mem,
5304
        "total_disk": remote_info['vg_size'],
5305
        "free_disk": remote_info['vg_free'],
5306
        "primary_ip": ninfo.primary_ip,
5307
        "secondary_ip": ninfo.secondary_ip,
5308
        "total_cpus": remote_info['cpu_total'],
5309
        }
5310
      node_results[nname] = pnr
5311
    data["nodes"] = node_results
5312

    
5313
    # instance data
5314
    instance_data = {}
5315
    for iinfo, beinfo in i_list:
5316
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5317
                  for n in iinfo.nics]
5318
      pir = {
5319
        "tags": list(iinfo.GetTags()),
5320
        "should_run": iinfo.status == "up",
5321
        "vcpus": beinfo[constants.BE_VCPUS],
5322
        "memory": beinfo[constants.BE_MEMORY],
5323
        "os": iinfo.os,
5324
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5325
        "nics": nic_data,
5326
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5327
        "disk_template": iinfo.disk_template,
5328
        "hypervisor": iinfo.hypervisor,
5329
        }
5330
      instance_data[iinfo.name] = pir
5331

    
5332
    data["instances"] = instance_data
5333

    
5334
    self.in_data = data
5335

    
5336
  def _AddNewInstance(self):
5337
    """Add new instance data to allocator structure.
5338

5339
    This in combination with _AllocatorGetClusterData will create the
5340
    correct structure needed as input for the allocator.
5341

5342
    The checks for the completeness of the opcode must have already been
5343
    done.
5344

5345
    """
5346
    data = self.in_data
5347
    if len(self.disks) != 2:
5348
      raise errors.OpExecError("Only two-disk configurations supported")
5349

    
5350
    disk_space = _ComputeDiskSize(self.disk_template,
5351
                                  self.disks[0]["size"], self.disks[1]["size"])
5352

    
5353
    if self.disk_template in constants.DTS_NET_MIRROR:
5354
      self.required_nodes = 2
5355
    else:
5356
      self.required_nodes = 1
5357
    request = {
5358
      "type": "allocate",
5359
      "name": self.name,
5360
      "disk_template": self.disk_template,
5361
      "tags": self.tags,
5362
      "os": self.os,
5363
      "vcpus": self.vcpus,
5364
      "memory": self.mem_size,
5365
      "disks": self.disks,
5366
      "disk_space_total": disk_space,
5367
      "nics": self.nics,
5368
      "required_nodes": self.required_nodes,
5369
      }
5370
    data["request"] = request
5371

    
5372
  def _AddRelocateInstance(self):
5373
    """Add relocate instance data to allocator structure.
5374

5375
    This in combination with _IAllocatorGetClusterData will create the
5376
    correct structure needed as input for the allocator.
5377

5378
    The checks for the completeness of the opcode must have already been
5379
    done.
5380

5381
    """
5382
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5383
    if instance is None:
5384
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5385
                                   " IAllocator" % self.name)
5386

    
5387
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5388
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5389

    
5390
    if len(instance.secondary_nodes) != 1:
5391
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5392

    
5393
    self.required_nodes = 1
5394

    
5395
    disk_space = _ComputeDiskSize(instance.disk_template,
5396
                                  instance.disks[0].size,
5397
                                  instance.disks[1].size)
5398

    
5399
    request = {
5400
      "type": "relocate",
5401
      "name": self.name,
5402
      "disk_space_total": disk_space,
5403
      "required_nodes": self.required_nodes,
5404
      "relocate_from": self.relocate_from,
5405
      }
5406
    self.in_data["request"] = request
5407

    
5408
  def _BuildInputData(self):
5409
    """Build input data structures.
5410

5411
    """
5412
    self._ComputeClusterData()
5413

    
5414
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5415
      self._AddNewInstance()
5416
    else:
5417
      self._AddRelocateInstance()
5418

    
5419
    self.in_text = serializer.Dump(self.in_data)
5420

    
5421
  def Run(self, name, validate=True, call_fn=None):
5422
    """Run an instance allocator and return the results.
5423

5424
    """
5425
    if call_fn is None:
5426
      call_fn = self.lu.rpc.call_iallocator_runner
5427
    data = self.in_text
5428

    
5429
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5430

    
5431
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5432
      raise errors.OpExecError("Invalid result from master iallocator runner")
5433

    
5434
    rcode, stdout, stderr, fail = result
5435

    
5436
    if rcode == constants.IARUN_NOTFOUND:
5437
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5438
    elif rcode == constants.IARUN_FAILURE:
5439
      raise errors.OpExecError("Instance allocator call failed: %s,"
5440
                               " output: %s" % (fail, stdout+stderr))
5441
    self.out_text = stdout
5442
    if validate:
5443
      self._ValidateResult()
5444

    
5445
  def _ValidateResult(self):
5446
    """Process the allocator results.
5447

5448
    This will process and if successful save the result in
5449
    self.out_data and the other parameters.
5450

5451
    """
5452
    try:
5453
      rdict = serializer.Load(self.out_text)
5454
    except Exception, err:
5455
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5456

    
5457
    if not isinstance(rdict, dict):
5458
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5459

    
5460
    for key in "success", "info", "nodes":
5461
      if key not in rdict:
5462
        raise errors.OpExecError("Can't parse iallocator results:"
5463
                                 " missing key '%s'" % key)
5464
      setattr(self, key, rdict[key])
5465

    
5466
    if not isinstance(rdict["nodes"], list):
5467
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5468
                               " is not a list")
5469
    self.out_data = rdict
5470

    
5471

    
5472
class LUTestAllocator(NoHooksLU):
5473
  """Run allocator tests.
5474

5475
  This LU runs the allocator tests
5476

5477
  """
5478
  _OP_REQP = ["direction", "mode", "name"]
5479

    
5480
  def CheckPrereq(self):
5481
    """Check prerequisites.
5482

5483
    This checks the opcode parameters depending on the director and mode test.
5484

5485
    """
5486
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5487
      for attr in ["name", "mem_size", "disks", "disk_template",
5488
                   "os", "tags", "nics", "vcpus"]:
5489
        if not hasattr(self.op, attr):
5490
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5491
                                     attr)
5492
      iname = self.cfg.ExpandInstanceName(self.op.name)
5493
      if iname is not None:
5494
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5495
                                   iname)
5496
      if not isinstance(self.op.nics, list):
5497
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5498
      for row in self.op.nics:
5499
        if (not isinstance(row, dict) or
5500
            "mac" not in row or
5501
            "ip" not in row or
5502
            "bridge" not in row):
5503
          raise errors.OpPrereqError("Invalid contents of the"
5504
                                     " 'nics' parameter")
5505
      if not isinstance(self.op.disks, list):
5506
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5507
      if len(self.op.disks) != 2:
5508
        raise errors.OpPrereqError("Only two-disk configurations supported")
5509
      for row in self.op.disks:
5510
        if (not isinstance(row, dict) or
5511
            "size" not in row or
5512
            not isinstance(row["size"], int) or
5513
            "mode" not in row or
5514
            row["mode"] not in ['r', 'w']):
5515
          raise errors.OpPrereqError("Invalid contents of the"
5516
                                     " 'disks' parameter")
5517
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5518
      if not hasattr(self.op, "name"):
5519
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5520
      fname = self.cfg.ExpandInstanceName(self.op.name)
5521
      if fname is None:
5522
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5523
                                   self.op.name)
5524
      self.op.name = fname
5525
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5526
    else:
5527
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5528
                                 self.op.mode)
5529

    
5530
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5531
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5532
        raise errors.OpPrereqError("Missing allocator name")
5533
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5534
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5535
                                 self.op.direction)
5536

    
5537
  def Exec(self, feedback_fn):
5538
    """Run the allocator test.
5539

5540
    """
5541
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5542
      ial = IAllocator(self,
5543
                       mode=self.op.mode,
5544
                       name=self.op.name,
5545
                       mem_size=self.op.mem_size,
5546
                       disks=self.op.disks,
5547
                       disk_template=self.op.disk_template,
5548
                       os=self.op.os,
5549
                       tags=self.op.tags,
5550
                       nics=self.op.nics,
5551
                       vcpus=self.op.vcpus,
5552
                       )
5553
    else:
5554
      ial = IAllocator(self,
5555
                       mode=self.op.mode,
5556
                       name=self.op.name,
5557
                       relocate_from=list(self.relocate_from),
5558
                       )
5559

    
5560
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5561
      result = ial.in_text
5562
    else:
5563
      ial.Run(self.op.allocator, validate=False)
5564
      result = ial.out_text
5565
    return result