Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 1a05d855

History | View | Annotate | Download (191 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_MASTER = True
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = self.cfg.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130
      - Use an empty dict if you don't need any lock
131
      - If you don't need any lock at a particular level omit that level
132
      - Don't put anything for the BGL level
133
      - If you want all locks at a level use locking.ALL_SET as a value
134

135
    If you need to share locks (rather than acquire them exclusively) at one
136
    level you can modify self.share_locks, setting a true value (usually 1) for
137
    that level. By default locks are not shared.
138

139
    Examples:
140
    # Acquire all nodes and one instance
141
    self.needed_locks = {
142
      locking.LEVEL_NODE: locking.ALL_SET,
143
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144
    }
145
    # Acquire just two nodes
146
    self.needed_locks = {
147
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148
    }
149
    # Acquire no locks
150
    self.needed_locks = {} # No, you can't leave it to the default value None
151

152
    """
153
    # The implementation of this method is mandatory only if the new LU is
154
    # concurrent, so that old LUs don't need to be changed all at the same
155
    # time.
156
    if self.REQ_BGL:
157
      self.needed_locks = {} # Exclusive LUs don't need locks.
158
    else:
159
      raise NotImplementedError
160

    
161
  def DeclareLocks(self, level):
162
    """Declare LU locking needs for a level
163

164
    While most LUs can just declare their locking needs at ExpandNames time,
165
    sometimes there's the need to calculate some locks after having acquired
166
    the ones before. This function is called just before acquiring locks at a
167
    particular level, but after acquiring the ones at lower levels, and permits
168
    such calculations. It can be used to modify self.needed_locks, and by
169
    default it does nothing.
170

171
    This function is only called if you have something already set in
172
    self.needed_locks for the level.
173

174
    @param level: Locking level which is going to be locked
175
    @type level: member of ganeti.locking.LEVELS
176

177
    """
178

    
179
  def CheckPrereq(self):
180
    """Check prerequisites for this LU.
181

182
    This method should check that the prerequisites for the execution
183
    of this LU are fulfilled. It can do internode communication, but
184
    it should be idempotent - no cluster or system changes are
185
    allowed.
186

187
    The method should raise errors.OpPrereqError in case something is
188
    not fulfilled. Its return value is ignored.
189

190
    This method should also update all the parameters of the opcode to
191
    their canonical form if it hasn't been done by ExpandNames before.
192

193
    """
194
    raise NotImplementedError
195

    
196
  def Exec(self, feedback_fn):
197
    """Execute the LU.
198

199
    This method should implement the actual work. It should raise
200
    errors.OpExecError for failures that are somewhat dealt with in
201
    code, or expected.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def BuildHooksEnv(self):
207
    """Build hooks environment for this LU.
208

209
    This method should return a three-node tuple consisting of: a dict
210
    containing the environment that will be used for running the
211
    specific hook for this LU, a list of node names on which the hook
212
    should run before the execution, and a list of node names on which
213
    the hook should run after the execution.
214

215
    The keys of the dict must not have 'GANETI_' prefixed as this will
216
    be handled in the hooks runner. Also note additional keys will be
217
    added by the hooks runner. If the LU doesn't define any
218
    environment, an empty dict (and not None) should be returned.
219

220
    No nodes should be returned as an empty list (and not None).
221

222
    Note that if the HPATH for a LU class is None, this function will
223
    not be called.
224

225
    """
226
    raise NotImplementedError
227

    
228
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229
    """Notify the LU about the results of its hooks.
230

231
    This method is called every time a hooks phase is executed, and notifies
232
    the Logical Unit about the hooks' result. The LU can then use it to alter
233
    its result based on the hooks.  By default the method does nothing and the
234
    previous result is passed back unchanged but any LU can define it if it
235
    wants to use the local cluster hook-scripts somehow.
236

237
    Args:
238
      phase: the hooks phase that has just been run
239
      hooks_results: the results of the multi-node hooks rpc call
240
      feedback_fn: function to send feedback back to the caller
241
      lu_result: the previous result this LU had, or None in the PRE phase.
242

243
    """
244
    return lu_result
245

    
246
  def _ExpandAndLockInstance(self):
247
    """Helper function to expand and lock an instance.
248

249
    Many LUs that work on an instance take its name in self.op.instance_name
250
    and need to expand it and then declare the expanded name for locking. This
251
    function does it, and then updates self.op.instance_name to the expanded
252
    name. It also initializes needed_locks as a dict, if this hasn't been done
253
    before.
254

255
    """
256
    if self.needed_locks is None:
257
      self.needed_locks = {}
258
    else:
259
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260
        "_ExpandAndLockInstance called with instance-level locks set"
261
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262
    if expanded_name is None:
263
      raise errors.OpPrereqError("Instance '%s' not known" %
264
                                  self.op.instance_name)
265
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266
    self.op.instance_name = expanded_name
267

    
268
  def _LockInstancesNodes(self, primary_only=False):
269
    """Helper function to declare instances' nodes for locking.
270

271
    This function should be called after locking one or more instances to lock
272
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273
    with all primary or secondary nodes for instances already locked and
274
    present in self.needed_locks[locking.LEVEL_INSTANCE].
275

276
    It should be called from DeclareLocks, and for safety only works if
277
    self.recalculate_locks[locking.LEVEL_NODE] is set.
278

279
    In the future it may grow parameters to just lock some instance's nodes, or
280
    to just lock primaries or secondary nodes, if needed.
281

282
    If should be called in DeclareLocks in a way similar to:
283

284
    if level == locking.LEVEL_NODE:
285
      self._LockInstancesNodes()
286

287
    @type primary_only: boolean
288
    @param primary_only: only lock primary nodes of locked instances
289

290
    """
291
    assert locking.LEVEL_NODE in self.recalculate_locks, \
292
      "_LockInstancesNodes helper function called with no nodes to recalculate"
293

    
294
    # TODO: check if we're really been called with the instance locks held
295

    
296
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297
    # future we might want to have different behaviors depending on the value
298
    # of self.recalculate_locks[locking.LEVEL_NODE]
299
    wanted_nodes = []
300
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301
      instance = self.context.cfg.GetInstanceInfo(instance_name)
302
      wanted_nodes.append(instance.primary_node)
303
      if not primary_only:
304
        wanted_nodes.extend(instance.secondary_nodes)
305

    
306
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310

    
311
    del self.recalculate_locks[locking.LEVEL_NODE]
312

    
313

    
314
class NoHooksLU(LogicalUnit):
315
  """Simple LU which runs no hooks.
316

317
  This LU is intended as a parent for other LogicalUnits which will
318
  run no hooks, in order to reduce duplicate code.
319

320
  """
321
  HPATH = None
322
  HTYPE = None
323

    
324

    
325
def _GetWantedNodes(lu, nodes):
326
  """Returns list of checked and expanded node names.
327

328
  Args:
329
    nodes: List of nodes (strings) or None for all
330

331
  """
332
  if not isinstance(nodes, list):
333
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
334

    
335
  if not nodes:
336
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337
      " non-empty list of nodes whose name is to be expanded.")
338

    
339
  wanted = []
340
  for name in nodes:
341
    node = lu.cfg.ExpandNodeName(name)
342
    if node is None:
343
      raise errors.OpPrereqError("No such node name '%s'" % name)
344
    wanted.append(node)
345

    
346
  return utils.NiceSort(wanted)
347

    
348

    
349
def _GetWantedInstances(lu, instances):
350
  """Returns list of checked and expanded instance names.
351

352
  Args:
353
    instances: List of instances (strings) or None for all
354

355
  """
356
  if not isinstance(instances, list):
357
    raise errors.OpPrereqError("Invalid argument type 'instances'")
358

    
359
  if instances:
360
    wanted = []
361

    
362
    for name in instances:
363
      instance = lu.cfg.ExpandInstanceName(name)
364
      if instance is None:
365
        raise errors.OpPrereqError("No such instance name '%s'" % name)
366
      wanted.append(instance)
367

    
368
  else:
369
    wanted = lu.cfg.GetInstanceList()
370
  return utils.NiceSort(wanted)
371

    
372

    
373
def _CheckOutputFields(static, dynamic, selected):
374
  """Checks whether all selected fields are valid.
375

376
  Args:
377
    static: Static fields
378
    dynamic: Dynamic fields
379

380
  """
381
  static_fields = frozenset(static)
382
  dynamic_fields = frozenset(dynamic)
383

    
384
  all_fields = static_fields | dynamic_fields
385

    
386
  if not all_fields.issuperset(selected):
387
    raise errors.OpPrereqError("Unknown output fields selected: %s"
388
                               % ",".join(frozenset(selected).
389
                                          difference(all_fields)))
390

    
391

    
392
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393
                          memory, vcpus, nics):
394
  """Builds instance related env variables for hooks from single variables.
395

396
  Args:
397
    secondary_nodes: List of secondary nodes as strings
398
  """
399
  env = {
400
    "OP_TARGET": name,
401
    "INSTANCE_NAME": name,
402
    "INSTANCE_PRIMARY": primary_node,
403
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404
    "INSTANCE_OS_TYPE": os_type,
405
    "INSTANCE_STATUS": status,
406
    "INSTANCE_MEMORY": memory,
407
    "INSTANCE_VCPUS": vcpus,
408
  }
409

    
410
  if nics:
411
    nic_count = len(nics)
412
    for idx, (ip, bridge, mac) in enumerate(nics):
413
      if ip is None:
414
        ip = ""
415
      env["INSTANCE_NIC%d_IP" % idx] = ip
416
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418
  else:
419
    nic_count = 0
420

    
421
  env["INSTANCE_NIC_COUNT"] = nic_count
422

    
423
  return env
424

    
425

    
426
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
427
  """Builds instance related env variables for hooks from an object.
428

429
  Args:
430
    instance: objects.Instance object of instance
431
    override: dict of values to override
432
  """
433
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
434
  args = {
435
    'name': instance.name,
436
    'primary_node': instance.primary_node,
437
    'secondary_nodes': instance.secondary_nodes,
438
    'os_type': instance.os,
439
    'status': instance.os,
440
    'memory': bep[constants.BE_MEMORY],
441
    'vcpus': bep[constants.BE_VCPUS],
442
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
443
  }
444
  if override:
445
    args.update(override)
446
  return _BuildInstanceHookEnv(**args)
447

    
448

    
449
def _CheckInstanceBridgesExist(lu, instance):
450
  """Check that the brigdes needed by an instance exist.
451

452
  """
453
  # check bridges existance
454
  brlist = [nic.bridge for nic in instance.nics]
455
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
456
    raise errors.OpPrereqError("one or more target bridges %s does not"
457
                               " exist on destination node '%s'" %
458
                               (brlist, instance.primary_node))
459

    
460

    
461
class LUDestroyCluster(NoHooksLU):
462
  """Logical unit for destroying the cluster.
463

464
  """
465
  _OP_REQP = []
466

    
467
  def CheckPrereq(self):
468
    """Check prerequisites.
469

470
    This checks whether the cluster is empty.
471

472
    Any errors are signalled by raising errors.OpPrereqError.
473

474
    """
475
    master = self.cfg.GetMasterNode()
476

    
477
    nodelist = self.cfg.GetNodeList()
478
    if len(nodelist) != 1 or nodelist[0] != master:
479
      raise errors.OpPrereqError("There are still %d node(s) in"
480
                                 " this cluster." % (len(nodelist) - 1))
481
    instancelist = self.cfg.GetInstanceList()
482
    if instancelist:
483
      raise errors.OpPrereqError("There are still %d instance(s) in"
484
                                 " this cluster." % len(instancelist))
485

    
486
  def Exec(self, feedback_fn):
487
    """Destroys the cluster.
488

489
    """
490
    master = self.cfg.GetMasterNode()
491
    if not self.rpc.call_node_stop_master(master, False):
492
      raise errors.OpExecError("Could not disable the master role")
493
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
494
    utils.CreateBackup(priv_key)
495
    utils.CreateBackup(pub_key)
496
    return master
497

    
498

    
499
class LUVerifyCluster(LogicalUnit):
500
  """Verifies the cluster status.
501

502
  """
503
  HPATH = "cluster-verify"
504
  HTYPE = constants.HTYPE_CLUSTER
505
  _OP_REQP = ["skip_checks"]
506
  REQ_BGL = False
507

    
508
  def ExpandNames(self):
509
    self.needed_locks = {
510
      locking.LEVEL_NODE: locking.ALL_SET,
511
      locking.LEVEL_INSTANCE: locking.ALL_SET,
512
    }
513
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
514

    
515
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
516
                  remote_version, feedback_fn):
517
    """Run multiple tests against a node.
518

519
    Test list:
520
      - compares ganeti version
521
      - checks vg existance and size > 20G
522
      - checks config file checksum
523
      - checks ssh to other nodes
524

525
    Args:
526
      node: name of the node to check
527
      file_list: required list of files
528
      local_cksum: dictionary of local files and their checksums
529

530
    """
531
    # compares ganeti version
532
    local_version = constants.PROTOCOL_VERSION
533
    if not remote_version:
534
      feedback_fn("  - ERROR: connection to %s failed" % (node))
535
      return True
536

    
537
    if local_version != remote_version:
538
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
539
                      (local_version, node, remote_version))
540
      return True
541

    
542
    # checks vg existance and size > 20G
543

    
544
    bad = False
545
    if not vglist:
546
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
547
                      (node,))
548
      bad = True
549
    else:
550
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
551
                                            constants.MIN_VG_SIZE)
552
      if vgstatus:
553
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
554
        bad = True
555

    
556
    if not node_result:
557
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
558
      return True
559

    
560
    # checks config file checksum
561
    # checks ssh to any
562

    
563
    if 'filelist' not in node_result:
564
      bad = True
565
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
566
    else:
567
      remote_cksum = node_result['filelist']
568
      for file_name in file_list:
569
        if file_name not in remote_cksum:
570
          bad = True
571
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
572
        elif remote_cksum[file_name] != local_cksum[file_name]:
573
          bad = True
574
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
575

    
576
    if 'nodelist' not in node_result:
577
      bad = True
578
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
579
    else:
580
      if node_result['nodelist']:
581
        bad = True
582
        for node in node_result['nodelist']:
583
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
584
                          (node, node_result['nodelist'][node]))
585
    if 'node-net-test' not in node_result:
586
      bad = True
587
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
588
    else:
589
      if node_result['node-net-test']:
590
        bad = True
591
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
592
        for node in nlist:
593
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
594
                          (node, node_result['node-net-test'][node]))
595

    
596
    hyp_result = node_result.get('hypervisor', None)
597
    if isinstance(hyp_result, dict):
598
      for hv_name, hv_result in hyp_result.iteritems():
599
        if hv_result is not None:
600
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
601
                      (hv_name, hv_result))
602
    return bad
603

    
604
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
605
                      node_instance, feedback_fn):
606
    """Verify an instance.
607

608
    This function checks to see if the required block devices are
609
    available on the instance's node.
610

611
    """
612
    bad = False
613

    
614
    node_current = instanceconfig.primary_node
615

    
616
    node_vol_should = {}
617
    instanceconfig.MapLVsByNode(node_vol_should)
618

    
619
    for node in node_vol_should:
620
      for volume in node_vol_should[node]:
621
        if node not in node_vol_is or volume not in node_vol_is[node]:
622
          feedback_fn("  - ERROR: volume %s missing on node %s" %
623
                          (volume, node))
624
          bad = True
625

    
626
    if not instanceconfig.status == 'down':
627
      if (node_current not in node_instance or
628
          not instance in node_instance[node_current]):
629
        feedback_fn("  - ERROR: instance %s not running on node %s" %
630
                        (instance, node_current))
631
        bad = True
632

    
633
    for node in node_instance:
634
      if (not node == node_current):
635
        if instance in node_instance[node]:
636
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
637
                          (instance, node))
638
          bad = True
639

    
640
    return bad
641

    
642
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
643
    """Verify if there are any unknown volumes in the cluster.
644

645
    The .os, .swap and backup volumes are ignored. All other volumes are
646
    reported as unknown.
647

648
    """
649
    bad = False
650

    
651
    for node in node_vol_is:
652
      for volume in node_vol_is[node]:
653
        if node not in node_vol_should or volume not in node_vol_should[node]:
654
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
655
                      (volume, node))
656
          bad = True
657
    return bad
658

    
659
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
660
    """Verify the list of running instances.
661

662
    This checks what instances are running but unknown to the cluster.
663

664
    """
665
    bad = False
666
    for node in node_instance:
667
      for runninginstance in node_instance[node]:
668
        if runninginstance not in instancelist:
669
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
670
                          (runninginstance, node))
671
          bad = True
672
    return bad
673

    
674
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
675
    """Verify N+1 Memory Resilience.
676

677
    Check that if one single node dies we can still start all the instances it
678
    was primary for.
679

680
    """
681
    bad = False
682

    
683
    for node, nodeinfo in node_info.iteritems():
684
      # This code checks that every node which is now listed as secondary has
685
      # enough memory to host all instances it is supposed to should a single
686
      # other node in the cluster fail.
687
      # FIXME: not ready for failover to an arbitrary node
688
      # FIXME: does not support file-backed instances
689
      # WARNING: we currently take into account down instances as well as up
690
      # ones, considering that even if they're down someone might want to start
691
      # them even in the event of a node failure.
692
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
693
        needed_mem = 0
694
        for instance in instances:
695
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
696
          if bep[constants.BE_AUTO_BALANCE]:
697
            needed_mem += bep[constants.BE_MEMORY]
698
        if nodeinfo['mfree'] < needed_mem:
699
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
700
                      " failovers should node %s fail" % (node, prinode))
701
          bad = True
702
    return bad
703

    
704
  def CheckPrereq(self):
705
    """Check prerequisites.
706

707
    Transform the list of checks we're going to skip into a set and check that
708
    all its members are valid.
709

710
    """
711
    self.skip_set = frozenset(self.op.skip_checks)
712
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
713
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
714

    
715
  def BuildHooksEnv(self):
716
    """Build hooks env.
717

718
    Cluster-Verify hooks just rone in the post phase and their failure makes
719
    the output be logged in the verify output and the verification to fail.
720

721
    """
722
    all_nodes = self.cfg.GetNodeList()
723
    # TODO: populate the environment with useful information for verify hooks
724
    env = {}
725
    return env, [], all_nodes
726

    
727
  def Exec(self, feedback_fn):
728
    """Verify integrity of cluster, performing various test on nodes.
729

730
    """
731
    bad = False
732
    feedback_fn("* Verifying global settings")
733
    for msg in self.cfg.VerifyConfig():
734
      feedback_fn("  - ERROR: %s" % msg)
735

    
736
    vg_name = self.cfg.GetVGName()
737
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
738
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
739
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
740
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
741
    i_non_redundant = [] # Non redundant instances
742
    i_non_a_balanced = [] # Non auto-balanced instances
743
    node_volume = {}
744
    node_instance = {}
745
    node_info = {}
746
    instance_cfg = {}
747

    
748
    # FIXME: verify OS list
749
    # do local checksums
750
    file_names = []
751
    file_names.append(constants.SSL_CERT_FILE)
752
    file_names.append(constants.CLUSTER_CONF_FILE)
753
    local_checksums = utils.FingerprintFiles(file_names)
754

    
755
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
756
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
757
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
758
    all_vglist = self.rpc.call_vg_list(nodelist)
759
    node_verify_param = {
760
      'filelist': file_names,
761
      'nodelist': nodelist,
762
      'hypervisor': hypervisors,
763
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
764
                        for node in nodeinfo]
765
      }
766
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
767
                                           self.cfg.GetClusterName())
768
    all_rversion = self.rpc.call_version(nodelist)
769
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
770
                                        self.cfg.GetHypervisorType())
771

    
772
    cluster = self.cfg.GetClusterInfo()
773
    for node in nodelist:
774
      feedback_fn("* Verifying node %s" % node)
775
      result = self._VerifyNode(node, file_names, local_checksums,
776
                                all_vglist[node], all_nvinfo[node],
777
                                all_rversion[node], feedback_fn)
778
      bad = bad or result
779

    
780
      # node_volume
781
      volumeinfo = all_volumeinfo[node]
782

    
783
      if isinstance(volumeinfo, basestring):
784
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
785
                    (node, volumeinfo[-400:].encode('string_escape')))
786
        bad = True
787
        node_volume[node] = {}
788
      elif not isinstance(volumeinfo, dict):
789
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
790
        bad = True
791
        continue
792
      else:
793
        node_volume[node] = volumeinfo
794

    
795
      # node_instance
796
      nodeinstance = all_instanceinfo[node]
797
      if type(nodeinstance) != list:
798
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
799
        bad = True
800
        continue
801

    
802
      node_instance[node] = nodeinstance
803

    
804
      # node_info
805
      nodeinfo = all_ninfo[node]
806
      if not isinstance(nodeinfo, dict):
807
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
808
        bad = True
809
        continue
810

    
811
      try:
812
        node_info[node] = {
813
          "mfree": int(nodeinfo['memory_free']),
814
          "dfree": int(nodeinfo['vg_free']),
815
          "pinst": [],
816
          "sinst": [],
817
          # dictionary holding all instances this node is secondary for,
818
          # grouped by their primary node. Each key is a cluster node, and each
819
          # value is a list of instances which have the key as primary and the
820
          # current node as secondary.  this is handy to calculate N+1 memory
821
          # availability if you can only failover from a primary to its
822
          # secondary.
823
          "sinst-by-pnode": {},
824
        }
825
      except ValueError:
826
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
827
        bad = True
828
        continue
829

    
830
    node_vol_should = {}
831

    
832
    for instance in instancelist:
833
      feedback_fn("* Verifying instance %s" % instance)
834
      inst_config = self.cfg.GetInstanceInfo(instance)
835
      result =  self._VerifyInstance(instance, inst_config, node_volume,
836
                                     node_instance, feedback_fn)
837
      bad = bad or result
838

    
839
      inst_config.MapLVsByNode(node_vol_should)
840

    
841
      instance_cfg[instance] = inst_config
842

    
843
      pnode = inst_config.primary_node
844
      if pnode in node_info:
845
        node_info[pnode]['pinst'].append(instance)
846
      else:
847
        feedback_fn("  - ERROR: instance %s, connection to primary node"
848
                    " %s failed" % (instance, pnode))
849
        bad = True
850

    
851
      # If the instance is non-redundant we cannot survive losing its primary
852
      # node, so we are not N+1 compliant. On the other hand we have no disk
853
      # templates with more than one secondary so that situation is not well
854
      # supported either.
855
      # FIXME: does not support file-backed instances
856
      if len(inst_config.secondary_nodes) == 0:
857
        i_non_redundant.append(instance)
858
      elif len(inst_config.secondary_nodes) > 1:
859
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
860
                    % instance)
861

    
862
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
863
        i_non_a_balanced.append(instance)
864

    
865
      for snode in inst_config.secondary_nodes:
866
        if snode in node_info:
867
          node_info[snode]['sinst'].append(instance)
868
          if pnode not in node_info[snode]['sinst-by-pnode']:
869
            node_info[snode]['sinst-by-pnode'][pnode] = []
870
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
871
        else:
872
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
873
                      " %s failed" % (instance, snode))
874

    
875
    feedback_fn("* Verifying orphan volumes")
876
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
877
                                       feedback_fn)
878
    bad = bad or result
879

    
880
    feedback_fn("* Verifying remaining instances")
881
    result = self._VerifyOrphanInstances(instancelist, node_instance,
882
                                         feedback_fn)
883
    bad = bad or result
884

    
885
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
886
      feedback_fn("* Verifying N+1 Memory redundancy")
887
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
888
      bad = bad or result
889

    
890
    feedback_fn("* Other Notes")
891
    if i_non_redundant:
892
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
893
                  % len(i_non_redundant))
894

    
895
    if i_non_a_balanced:
896
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
897
                  % len(i_non_a_balanced))
898

    
899
    return not bad
900

    
901
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
902
    """Analize the post-hooks' result, handle it, and send some
903
    nicely-formatted feedback back to the user.
904

905
    Args:
906
      phase: the hooks phase that has just been run
907
      hooks_results: the results of the multi-node hooks rpc call
908
      feedback_fn: function to send feedback back to the caller
909
      lu_result: previous Exec result
910

911
    """
912
    # We only really run POST phase hooks, and are only interested in
913
    # their results
914
    if phase == constants.HOOKS_PHASE_POST:
915
      # Used to change hooks' output to proper indentation
916
      indent_re = re.compile('^', re.M)
917
      feedback_fn("* Hooks Results")
918
      if not hooks_results:
919
        feedback_fn("  - ERROR: general communication failure")
920
        lu_result = 1
921
      else:
922
        for node_name in hooks_results:
923
          show_node_header = True
924
          res = hooks_results[node_name]
925
          if res is False or not isinstance(res, list):
926
            feedback_fn("    Communication failure")
927
            lu_result = 1
928
            continue
929
          for script, hkr, output in res:
930
            if hkr == constants.HKR_FAIL:
931
              # The node header is only shown once, if there are
932
              # failing hooks on that node
933
              if show_node_header:
934
                feedback_fn("  Node %s:" % node_name)
935
                show_node_header = False
936
              feedback_fn("    ERROR: Script %s failed, output:" % script)
937
              output = indent_re.sub('      ', output)
938
              feedback_fn("%s" % output)
939
              lu_result = 1
940

    
941
      return lu_result
942

    
943

    
944
class LUVerifyDisks(NoHooksLU):
945
  """Verifies the cluster disks status.
946

947
  """
948
  _OP_REQP = []
949
  REQ_BGL = False
950

    
951
  def ExpandNames(self):
952
    self.needed_locks = {
953
      locking.LEVEL_NODE: locking.ALL_SET,
954
      locking.LEVEL_INSTANCE: locking.ALL_SET,
955
    }
956
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
957

    
958
  def CheckPrereq(self):
959
    """Check prerequisites.
960

961
    This has no prerequisites.
962

963
    """
964
    pass
965

    
966
  def Exec(self, feedback_fn):
967
    """Verify integrity of cluster disks.
968

969
    """
970
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
971

    
972
    vg_name = self.cfg.GetVGName()
973
    nodes = utils.NiceSort(self.cfg.GetNodeList())
974
    instances = [self.cfg.GetInstanceInfo(name)
975
                 for name in self.cfg.GetInstanceList()]
976

    
977
    nv_dict = {}
978
    for inst in instances:
979
      inst_lvs = {}
980
      if (inst.status != "up" or
981
          inst.disk_template not in constants.DTS_NET_MIRROR):
982
        continue
983
      inst.MapLVsByNode(inst_lvs)
984
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
985
      for node, vol_list in inst_lvs.iteritems():
986
        for vol in vol_list:
987
          nv_dict[(node, vol)] = inst
988

    
989
    if not nv_dict:
990
      return result
991

    
992
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
993

    
994
    to_act = set()
995
    for node in nodes:
996
      # node_volume
997
      lvs = node_lvs[node]
998

    
999
      if isinstance(lvs, basestring):
1000
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1001
        res_nlvm[node] = lvs
1002
      elif not isinstance(lvs, dict):
1003
        logging.warning("Connection to node %s failed or invalid data"
1004
                        " returned", node)
1005
        res_nodes.append(node)
1006
        continue
1007

    
1008
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1009
        inst = nv_dict.pop((node, lv_name), None)
1010
        if (not lv_online and inst is not None
1011
            and inst.name not in res_instances):
1012
          res_instances.append(inst.name)
1013

    
1014
    # any leftover items in nv_dict are missing LVs, let's arrange the
1015
    # data better
1016
    for key, inst in nv_dict.iteritems():
1017
      if inst.name not in res_missing:
1018
        res_missing[inst.name] = []
1019
      res_missing[inst.name].append(key)
1020

    
1021
    return result
1022

    
1023

    
1024
class LURenameCluster(LogicalUnit):
1025
  """Rename the cluster.
1026

1027
  """
1028
  HPATH = "cluster-rename"
1029
  HTYPE = constants.HTYPE_CLUSTER
1030
  _OP_REQP = ["name"]
1031

    
1032
  def BuildHooksEnv(self):
1033
    """Build hooks env.
1034

1035
    """
1036
    env = {
1037
      "OP_TARGET": self.cfg.GetClusterName(),
1038
      "NEW_NAME": self.op.name,
1039
      }
1040
    mn = self.cfg.GetMasterNode()
1041
    return env, [mn], [mn]
1042

    
1043
  def CheckPrereq(self):
1044
    """Verify that the passed name is a valid one.
1045

1046
    """
1047
    hostname = utils.HostInfo(self.op.name)
1048

    
1049
    new_name = hostname.name
1050
    self.ip = new_ip = hostname.ip
1051
    old_name = self.cfg.GetClusterName()
1052
    old_ip = self.cfg.GetMasterIP()
1053
    if new_name == old_name and new_ip == old_ip:
1054
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1055
                                 " cluster has changed")
1056
    if new_ip != old_ip:
1057
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1058
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1059
                                   " reachable on the network. Aborting." %
1060
                                   new_ip)
1061

    
1062
    self.op.name = new_name
1063

    
1064
  def Exec(self, feedback_fn):
1065
    """Rename the cluster.
1066

1067
    """
1068
    clustername = self.op.name
1069
    ip = self.ip
1070

    
1071
    # shutdown the master IP
1072
    master = self.cfg.GetMasterNode()
1073
    if not self.rpc.call_node_stop_master(master, False):
1074
      raise errors.OpExecError("Could not disable the master role")
1075

    
1076
    try:
1077
      # modify the sstore
1078
      # TODO: sstore
1079
      ss.SetKey(ss.SS_MASTER_IP, ip)
1080
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1081

    
1082
      # Distribute updated ss config to all nodes
1083
      myself = self.cfg.GetNodeInfo(master)
1084
      dist_nodes = self.cfg.GetNodeList()
1085
      if myself.name in dist_nodes:
1086
        dist_nodes.remove(myself.name)
1087

    
1088
      logging.debug("Copying updated ssconf data to all nodes")
1089
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1090
        fname = ss.KeyToFilename(keyname)
1091
        result = self.rpc.call_upload_file(dist_nodes, fname)
1092
        for to_node in dist_nodes:
1093
          if not result[to_node]:
1094
            self.LogWarning("Copy of file %s to node %s failed",
1095
                            fname, to_node)
1096
    finally:
1097
      if not self.rpc.call_node_start_master(master, False):
1098
        self.LogWarning("Could not re-enable the master role on"
1099
                        " the master, please restart manually.")
1100

    
1101

    
1102
def _RecursiveCheckIfLVMBased(disk):
1103
  """Check if the given disk or its children are lvm-based.
1104

1105
  Args:
1106
    disk: ganeti.objects.Disk object
1107

1108
  Returns:
1109
    boolean indicating whether a LD_LV dev_type was found or not
1110

1111
  """
1112
  if disk.children:
1113
    for chdisk in disk.children:
1114
      if _RecursiveCheckIfLVMBased(chdisk):
1115
        return True
1116
  return disk.dev_type == constants.LD_LV
1117

    
1118

    
1119
class LUSetClusterParams(LogicalUnit):
1120
  """Change the parameters of the cluster.
1121

1122
  """
1123
  HPATH = "cluster-modify"
1124
  HTYPE = constants.HTYPE_CLUSTER
1125
  _OP_REQP = []
1126
  REQ_BGL = False
1127

    
1128
  def ExpandNames(self):
1129
    # FIXME: in the future maybe other cluster params won't require checking on
1130
    # all nodes to be modified.
1131
    self.needed_locks = {
1132
      locking.LEVEL_NODE: locking.ALL_SET,
1133
    }
1134
    self.share_locks[locking.LEVEL_NODE] = 1
1135

    
1136
  def BuildHooksEnv(self):
1137
    """Build hooks env.
1138

1139
    """
1140
    env = {
1141
      "OP_TARGET": self.cfg.GetClusterName(),
1142
      "NEW_VG_NAME": self.op.vg_name,
1143
      }
1144
    mn = self.cfg.GetMasterNode()
1145
    return env, [mn], [mn]
1146

    
1147
  def CheckPrereq(self):
1148
    """Check prerequisites.
1149

1150
    This checks whether the given params don't conflict and
1151
    if the given volume group is valid.
1152

1153
    """
1154
    # FIXME: This only works because there is only one parameter that can be
1155
    # changed or removed.
1156
    if self.op.vg_name is not None and not self.op.vg_name:
1157
      instances = self.cfg.GetAllInstancesInfo().values()
1158
      for inst in instances:
1159
        for disk in inst.disks:
1160
          if _RecursiveCheckIfLVMBased(disk):
1161
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1162
                                       " lvm-based instances exist")
1163

    
1164
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1165

    
1166
    # if vg_name not None, checks given volume group on all nodes
1167
    if self.op.vg_name:
1168
      vglist = self.rpc.call_vg_list(node_list)
1169
      for node in node_list:
1170
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1171
                                              constants.MIN_VG_SIZE)
1172
        if vgstatus:
1173
          raise errors.OpPrereqError("Error on node '%s': %s" %
1174
                                     (node, vgstatus))
1175

    
1176
    self.cluster = cluster = self.cfg.GetClusterInfo()
1177
    # beparams changes do not need validation (we can't validate?),
1178
    # but we still process here
1179
    if self.op.beparams:
1180
      self.new_beparams = cluster.FillDict(
1181
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1182

    
1183
    # hypervisor list/parameters
1184
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1185
    if self.op.hvparams:
1186
      if not isinstance(self.op.hvparams, dict):
1187
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1188
      for hv_name, hv_dict in self.op.hvparams.items():
1189
        if hv_name not in self.new_hvparams:
1190
          self.new_hvparams[hv_name] = hv_dict
1191
        else:
1192
          self.new_hvparams[hv_name].update(hv_dict)
1193

    
1194
    if self.op.enabled_hypervisors is not None:
1195
      self.hv_list = self.op.enabled_hypervisors
1196
    else:
1197
      self.hv_list = cluster.enabled_hypervisors
1198

    
1199
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1200
      # either the enabled list has changed, or the parameters have, validate
1201
      for hv_name, hv_params in self.new_hvparams.items():
1202
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1203
            (self.op.enabled_hypervisors and
1204
             hv_name in self.op.enabled_hypervisors)):
1205
          # either this is a new hypervisor, or its parameters have changed
1206
          hv_class = hypervisor.GetHypervisor(hv_name)
1207
          hv_class.CheckParameterSyntax(hv_params)
1208
          _CheckHVParams(self, node_list, hv_name, hv_params)
1209

    
1210
  def Exec(self, feedback_fn):
1211
    """Change the parameters of the cluster.
1212

1213
    """
1214
    if self.op.vg_name is not None:
1215
      if self.op.vg_name != self.cfg.GetVGName():
1216
        self.cfg.SetVGName(self.op.vg_name)
1217
      else:
1218
        feedback_fn("Cluster LVM configuration already in desired"
1219
                    " state, not changing")
1220
    if self.op.hvparams:
1221
      self.cluster.hvparams = self.new_hvparams
1222
    if self.op.enabled_hypervisors is not None:
1223
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1224
    if self.op.beparams:
1225
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1226
    self.cfg.Update(self.cluster)
1227

    
1228

    
1229
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1230
  """Sleep and poll for an instance's disk to sync.
1231

1232
  """
1233
  if not instance.disks:
1234
    return True
1235

    
1236
  if not oneshot:
1237
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1238

    
1239
  node = instance.primary_node
1240

    
1241
  for dev in instance.disks:
1242
    lu.cfg.SetDiskID(dev, node)
1243

    
1244
  retries = 0
1245
  while True:
1246
    max_time = 0
1247
    done = True
1248
    cumul_degraded = False
1249
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1250
    if not rstats:
1251
      lu.LogWarning("Can't get any data from node %s", node)
1252
      retries += 1
1253
      if retries >= 10:
1254
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1255
                                 " aborting." % node)
1256
      time.sleep(6)
1257
      continue
1258
    retries = 0
1259
    for i in range(len(rstats)):
1260
      mstat = rstats[i]
1261
      if mstat is None:
1262
        lu.LogWarning("Can't compute data for node %s/%s",
1263
                           node, instance.disks[i].iv_name)
1264
        continue
1265
      # we ignore the ldisk parameter
1266
      perc_done, est_time, is_degraded, _ = mstat
1267
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1268
      if perc_done is not None:
1269
        done = False
1270
        if est_time is not None:
1271
          rem_time = "%d estimated seconds remaining" % est_time
1272
          max_time = est_time
1273
        else:
1274
          rem_time = "no time estimate"
1275
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1276
                        (instance.disks[i].iv_name, perc_done, rem_time))
1277
    if done or oneshot:
1278
      break
1279

    
1280
    time.sleep(min(60, max_time))
1281

    
1282
  if done:
1283
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1284
  return not cumul_degraded
1285

    
1286

    
1287
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1288
  """Check that mirrors are not degraded.
1289

1290
  The ldisk parameter, if True, will change the test from the
1291
  is_degraded attribute (which represents overall non-ok status for
1292
  the device(s)) to the ldisk (representing the local storage status).
1293

1294
  """
1295
  lu.cfg.SetDiskID(dev, node)
1296
  if ldisk:
1297
    idx = 6
1298
  else:
1299
    idx = 5
1300

    
1301
  result = True
1302
  if on_primary or dev.AssembleOnSecondary():
1303
    rstats = lu.rpc.call_blockdev_find(node, dev)
1304
    if not rstats:
1305
      logging.warning("Node %s: disk degraded, not found or node down", node)
1306
      result = False
1307
    else:
1308
      result = result and (not rstats[idx])
1309
  if dev.children:
1310
    for child in dev.children:
1311
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1312

    
1313
  return result
1314

    
1315

    
1316
class LUDiagnoseOS(NoHooksLU):
1317
  """Logical unit for OS diagnose/query.
1318

1319
  """
1320
  _OP_REQP = ["output_fields", "names"]
1321
  REQ_BGL = False
1322

    
1323
  def ExpandNames(self):
1324
    if self.op.names:
1325
      raise errors.OpPrereqError("Selective OS query not supported")
1326

    
1327
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1328
    _CheckOutputFields(static=[],
1329
                       dynamic=self.dynamic_fields,
1330
                       selected=self.op.output_fields)
1331

    
1332
    # Lock all nodes, in shared mode
1333
    self.needed_locks = {}
1334
    self.share_locks[locking.LEVEL_NODE] = 1
1335
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1336

    
1337
  def CheckPrereq(self):
1338
    """Check prerequisites.
1339

1340
    """
1341

    
1342
  @staticmethod
1343
  def _DiagnoseByOS(node_list, rlist):
1344
    """Remaps a per-node return list into an a per-os per-node dictionary
1345

1346
      Args:
1347
        node_list: a list with the names of all nodes
1348
        rlist: a map with node names as keys and OS objects as values
1349

1350
      Returns:
1351
        map: a map with osnames as keys and as value another map, with
1352
             nodes as
1353
             keys and list of OS objects as values
1354
             e.g. {"debian-etch": {"node1": [<object>,...],
1355
                                   "node2": [<object>,]}
1356
                  }
1357

1358
    """
1359
    all_os = {}
1360
    for node_name, nr in rlist.iteritems():
1361
      if not nr:
1362
        continue
1363
      for os_obj in nr:
1364
        if os_obj.name not in all_os:
1365
          # build a list of nodes for this os containing empty lists
1366
          # for each node in node_list
1367
          all_os[os_obj.name] = {}
1368
          for nname in node_list:
1369
            all_os[os_obj.name][nname] = []
1370
        all_os[os_obj.name][node_name].append(os_obj)
1371
    return all_os
1372

    
1373
  def Exec(self, feedback_fn):
1374
    """Compute the list of OSes.
1375

1376
    """
1377
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1378
    node_data = self.rpc.call_os_diagnose(node_list)
1379
    if node_data == False:
1380
      raise errors.OpExecError("Can't gather the list of OSes")
1381
    pol = self._DiagnoseByOS(node_list, node_data)
1382
    output = []
1383
    for os_name, os_data in pol.iteritems():
1384
      row = []
1385
      for field in self.op.output_fields:
1386
        if field == "name":
1387
          val = os_name
1388
        elif field == "valid":
1389
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1390
        elif field == "node_status":
1391
          val = {}
1392
          for node_name, nos_list in os_data.iteritems():
1393
            val[node_name] = [(v.status, v.path) for v in nos_list]
1394
        else:
1395
          raise errors.ParameterError(field)
1396
        row.append(val)
1397
      output.append(row)
1398

    
1399
    return output
1400

    
1401

    
1402
class LURemoveNode(LogicalUnit):
1403
  """Logical unit for removing a node.
1404

1405
  """
1406
  HPATH = "node-remove"
1407
  HTYPE = constants.HTYPE_NODE
1408
  _OP_REQP = ["node_name"]
1409

    
1410
  def BuildHooksEnv(self):
1411
    """Build hooks env.
1412

1413
    This doesn't run on the target node in the pre phase as a failed
1414
    node would then be impossible to remove.
1415

1416
    """
1417
    env = {
1418
      "OP_TARGET": self.op.node_name,
1419
      "NODE_NAME": self.op.node_name,
1420
      }
1421
    all_nodes = self.cfg.GetNodeList()
1422
    all_nodes.remove(self.op.node_name)
1423
    return env, all_nodes, all_nodes
1424

    
1425
  def CheckPrereq(self):
1426
    """Check prerequisites.
1427

1428
    This checks:
1429
     - the node exists in the configuration
1430
     - it does not have primary or secondary instances
1431
     - it's not the master
1432

1433
    Any errors are signalled by raising errors.OpPrereqError.
1434

1435
    """
1436
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1437
    if node is None:
1438
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1439

    
1440
    instance_list = self.cfg.GetInstanceList()
1441

    
1442
    masternode = self.cfg.GetMasterNode()
1443
    if node.name == masternode:
1444
      raise errors.OpPrereqError("Node is the master node,"
1445
                                 " you need to failover first.")
1446

    
1447
    for instance_name in instance_list:
1448
      instance = self.cfg.GetInstanceInfo(instance_name)
1449
      if node.name == instance.primary_node:
1450
        raise errors.OpPrereqError("Instance %s still running on the node,"
1451
                                   " please remove first." % instance_name)
1452
      if node.name in instance.secondary_nodes:
1453
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1454
                                   " please remove first." % instance_name)
1455
    self.op.node_name = node.name
1456
    self.node = node
1457

    
1458
  def Exec(self, feedback_fn):
1459
    """Removes the node from the cluster.
1460

1461
    """
1462
    node = self.node
1463
    logging.info("Stopping the node daemon and removing configs from node %s",
1464
                 node.name)
1465

    
1466
    self.context.RemoveNode(node.name)
1467

    
1468
    self.rpc.call_node_leave_cluster(node.name)
1469

    
1470

    
1471
class LUQueryNodes(NoHooksLU):
1472
  """Logical unit for querying nodes.
1473

1474
  """
1475
  _OP_REQP = ["output_fields", "names"]
1476
  REQ_BGL = False
1477

    
1478
  def ExpandNames(self):
1479
    self.dynamic_fields = frozenset([
1480
      "dtotal", "dfree",
1481
      "mtotal", "mnode", "mfree",
1482
      "bootid",
1483
      "ctotal",
1484
      ])
1485

    
1486
    self.static_fields = frozenset([
1487
      "name", "pinst_cnt", "sinst_cnt",
1488
      "pinst_list", "sinst_list",
1489
      "pip", "sip", "tags",
1490
      "serial_no",
1491
      ])
1492

    
1493
    _CheckOutputFields(static=self.static_fields,
1494
                       dynamic=self.dynamic_fields,
1495
                       selected=self.op.output_fields)
1496

    
1497
    self.needed_locks = {}
1498
    self.share_locks[locking.LEVEL_NODE] = 1
1499

    
1500
    if self.op.names:
1501
      self.wanted = _GetWantedNodes(self, self.op.names)
1502
    else:
1503
      self.wanted = locking.ALL_SET
1504

    
1505
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1506
    if self.do_locking:
1507
      # if we don't request only static fields, we need to lock the nodes
1508
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1509

    
1510

    
1511
  def CheckPrereq(self):
1512
    """Check prerequisites.
1513

1514
    """
1515
    # The validation of the node list is done in the _GetWantedNodes,
1516
    # if non empty, and if empty, there's no validation to do
1517
    pass
1518

    
1519
  def Exec(self, feedback_fn):
1520
    """Computes the list of nodes and their attributes.
1521

1522
    """
1523
    all_info = self.cfg.GetAllNodesInfo()
1524
    if self.do_locking:
1525
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1526
    elif self.wanted != locking.ALL_SET:
1527
      nodenames = self.wanted
1528
      missing = set(nodenames).difference(all_info.keys())
1529
      if missing:
1530
        raise errors.OpExecError(
1531
          "Some nodes were removed before retrieving their data: %s" % missing)
1532
    else:
1533
      nodenames = all_info.keys()
1534

    
1535
    nodenames = utils.NiceSort(nodenames)
1536
    nodelist = [all_info[name] for name in nodenames]
1537

    
1538
    # begin data gathering
1539

    
1540
    if self.dynamic_fields.intersection(self.op.output_fields):
1541
      live_data = {}
1542
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1543
                                          self.cfg.GetHypervisorType())
1544
      for name in nodenames:
1545
        nodeinfo = node_data.get(name, None)
1546
        if nodeinfo:
1547
          live_data[name] = {
1548
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1549
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1550
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1551
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1552
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1553
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1554
            "bootid": nodeinfo['bootid'],
1555
            }
1556
        else:
1557
          live_data[name] = {}
1558
    else:
1559
      live_data = dict.fromkeys(nodenames, {})
1560

    
1561
    node_to_primary = dict([(name, set()) for name in nodenames])
1562
    node_to_secondary = dict([(name, set()) for name in nodenames])
1563

    
1564
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1565
                             "sinst_cnt", "sinst_list"))
1566
    if inst_fields & frozenset(self.op.output_fields):
1567
      instancelist = self.cfg.GetInstanceList()
1568

    
1569
      for instance_name in instancelist:
1570
        inst = self.cfg.GetInstanceInfo(instance_name)
1571
        if inst.primary_node in node_to_primary:
1572
          node_to_primary[inst.primary_node].add(inst.name)
1573
        for secnode in inst.secondary_nodes:
1574
          if secnode in node_to_secondary:
1575
            node_to_secondary[secnode].add(inst.name)
1576

    
1577
    # end data gathering
1578

    
1579
    output = []
1580
    for node in nodelist:
1581
      node_output = []
1582
      for field in self.op.output_fields:
1583
        if field == "name":
1584
          val = node.name
1585
        elif field == "pinst_list":
1586
          val = list(node_to_primary[node.name])
1587
        elif field == "sinst_list":
1588
          val = list(node_to_secondary[node.name])
1589
        elif field == "pinst_cnt":
1590
          val = len(node_to_primary[node.name])
1591
        elif field == "sinst_cnt":
1592
          val = len(node_to_secondary[node.name])
1593
        elif field == "pip":
1594
          val = node.primary_ip
1595
        elif field == "sip":
1596
          val = node.secondary_ip
1597
        elif field == "tags":
1598
          val = list(node.GetTags())
1599
        elif field == "serial_no":
1600
          val = node.serial_no
1601
        elif field in self.dynamic_fields:
1602
          val = live_data[node.name].get(field, None)
1603
        else:
1604
          raise errors.ParameterError(field)
1605
        node_output.append(val)
1606
      output.append(node_output)
1607

    
1608
    return output
1609

    
1610

    
1611
class LUQueryNodeVolumes(NoHooksLU):
1612
  """Logical unit for getting volumes on node(s).
1613

1614
  """
1615
  _OP_REQP = ["nodes", "output_fields"]
1616
  REQ_BGL = False
1617

    
1618
  def ExpandNames(self):
1619
    _CheckOutputFields(static=["node"],
1620
                       dynamic=["phys", "vg", "name", "size", "instance"],
1621
                       selected=self.op.output_fields)
1622

    
1623
    self.needed_locks = {}
1624
    self.share_locks[locking.LEVEL_NODE] = 1
1625
    if not self.op.nodes:
1626
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1627
    else:
1628
      self.needed_locks[locking.LEVEL_NODE] = \
1629
        _GetWantedNodes(self, self.op.nodes)
1630

    
1631
  def CheckPrereq(self):
1632
    """Check prerequisites.
1633

1634
    This checks that the fields required are valid output fields.
1635

1636
    """
1637
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1638

    
1639
  def Exec(self, feedback_fn):
1640
    """Computes the list of nodes and their attributes.
1641

1642
    """
1643
    nodenames = self.nodes
1644
    volumes = self.rpc.call_node_volumes(nodenames)
1645

    
1646
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1647
             in self.cfg.GetInstanceList()]
1648

    
1649
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1650

    
1651
    output = []
1652
    for node in nodenames:
1653
      if node not in volumes or not volumes[node]:
1654
        continue
1655

    
1656
      node_vols = volumes[node][:]
1657
      node_vols.sort(key=lambda vol: vol['dev'])
1658

    
1659
      for vol in node_vols:
1660
        node_output = []
1661
        for field in self.op.output_fields:
1662
          if field == "node":
1663
            val = node
1664
          elif field == "phys":
1665
            val = vol['dev']
1666
          elif field == "vg":
1667
            val = vol['vg']
1668
          elif field == "name":
1669
            val = vol['name']
1670
          elif field == "size":
1671
            val = int(float(vol['size']))
1672
          elif field == "instance":
1673
            for inst in ilist:
1674
              if node not in lv_by_node[inst]:
1675
                continue
1676
              if vol['name'] in lv_by_node[inst][node]:
1677
                val = inst.name
1678
                break
1679
            else:
1680
              val = '-'
1681
          else:
1682
            raise errors.ParameterError(field)
1683
          node_output.append(str(val))
1684

    
1685
        output.append(node_output)
1686

    
1687
    return output
1688

    
1689

    
1690
class LUAddNode(LogicalUnit):
1691
  """Logical unit for adding node to the cluster.
1692

1693
  """
1694
  HPATH = "node-add"
1695
  HTYPE = constants.HTYPE_NODE
1696
  _OP_REQP = ["node_name"]
1697

    
1698
  def BuildHooksEnv(self):
1699
    """Build hooks env.
1700

1701
    This will run on all nodes before, and on all nodes + the new node after.
1702

1703
    """
1704
    env = {
1705
      "OP_TARGET": self.op.node_name,
1706
      "NODE_NAME": self.op.node_name,
1707
      "NODE_PIP": self.op.primary_ip,
1708
      "NODE_SIP": self.op.secondary_ip,
1709
      }
1710
    nodes_0 = self.cfg.GetNodeList()
1711
    nodes_1 = nodes_0 + [self.op.node_name, ]
1712
    return env, nodes_0, nodes_1
1713

    
1714
  def CheckPrereq(self):
1715
    """Check prerequisites.
1716

1717
    This checks:
1718
     - the new node is not already in the config
1719
     - it is resolvable
1720
     - its parameters (single/dual homed) matches the cluster
1721

1722
    Any errors are signalled by raising errors.OpPrereqError.
1723

1724
    """
1725
    node_name = self.op.node_name
1726
    cfg = self.cfg
1727

    
1728
    dns_data = utils.HostInfo(node_name)
1729

    
1730
    node = dns_data.name
1731
    primary_ip = self.op.primary_ip = dns_data.ip
1732
    secondary_ip = getattr(self.op, "secondary_ip", None)
1733
    if secondary_ip is None:
1734
      secondary_ip = primary_ip
1735
    if not utils.IsValidIP(secondary_ip):
1736
      raise errors.OpPrereqError("Invalid secondary IP given")
1737
    self.op.secondary_ip = secondary_ip
1738

    
1739
    node_list = cfg.GetNodeList()
1740
    if not self.op.readd and node in node_list:
1741
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1742
                                 node)
1743
    elif self.op.readd and node not in node_list:
1744
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1745

    
1746
    for existing_node_name in node_list:
1747
      existing_node = cfg.GetNodeInfo(existing_node_name)
1748

    
1749
      if self.op.readd and node == existing_node_name:
1750
        if (existing_node.primary_ip != primary_ip or
1751
            existing_node.secondary_ip != secondary_ip):
1752
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1753
                                     " address configuration as before")
1754
        continue
1755

    
1756
      if (existing_node.primary_ip == primary_ip or
1757
          existing_node.secondary_ip == primary_ip or
1758
          existing_node.primary_ip == secondary_ip or
1759
          existing_node.secondary_ip == secondary_ip):
1760
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1761
                                   " existing node %s" % existing_node.name)
1762

    
1763
    # check that the type of the node (single versus dual homed) is the
1764
    # same as for the master
1765
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1766
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1767
    newbie_singlehomed = secondary_ip == primary_ip
1768
    if master_singlehomed != newbie_singlehomed:
1769
      if master_singlehomed:
1770
        raise errors.OpPrereqError("The master has no private ip but the"
1771
                                   " new node has one")
1772
      else:
1773
        raise errors.OpPrereqError("The master has a private ip but the"
1774
                                   " new node doesn't have one")
1775

    
1776
    # checks reachablity
1777
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1778
      raise errors.OpPrereqError("Node not reachable by ping")
1779

    
1780
    if not newbie_singlehomed:
1781
      # check reachability from my secondary ip to newbie's secondary ip
1782
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1783
                           source=myself.secondary_ip):
1784
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1785
                                   " based ping to noded port")
1786

    
1787
    self.new_node = objects.Node(name=node,
1788
                                 primary_ip=primary_ip,
1789
                                 secondary_ip=secondary_ip)
1790

    
1791
  def Exec(self, feedback_fn):
1792
    """Adds the new node to the cluster.
1793

1794
    """
1795
    new_node = self.new_node
1796
    node = new_node.name
1797

    
1798
    # check connectivity
1799
    result = self.rpc.call_version([node])[node]
1800
    if result:
1801
      if constants.PROTOCOL_VERSION == result:
1802
        logging.info("Communication to node %s fine, sw version %s match",
1803
                     node, result)
1804
      else:
1805
        raise errors.OpExecError("Version mismatch master version %s,"
1806
                                 " node version %s" %
1807
                                 (constants.PROTOCOL_VERSION, result))
1808
    else:
1809
      raise errors.OpExecError("Cannot get version from the new node")
1810

    
1811
    # setup ssh on node
1812
    logging.info("Copy ssh key to node %s", node)
1813
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1814
    keyarray = []
1815
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1816
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1817
                priv_key, pub_key]
1818

    
1819
    for i in keyfiles:
1820
      f = open(i, 'r')
1821
      try:
1822
        keyarray.append(f.read())
1823
      finally:
1824
        f.close()
1825

    
1826
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1827
                                    keyarray[2],
1828
                                    keyarray[3], keyarray[4], keyarray[5])
1829

    
1830
    if not result:
1831
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1832

    
1833
    # Add node to our /etc/hosts, and add key to known_hosts
1834
    utils.AddHostToEtcHosts(new_node.name)
1835

    
1836
    if new_node.secondary_ip != new_node.primary_ip:
1837
      if not self.rpc.call_node_has_ip_address(new_node.name,
1838
                                               new_node.secondary_ip):
1839
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1840
                                 " you gave (%s). Please fix and re-run this"
1841
                                 " command." % new_node.secondary_ip)
1842

    
1843
    node_verify_list = [self.cfg.GetMasterNode()]
1844
    node_verify_param = {
1845
      'nodelist': [node],
1846
      # TODO: do a node-net-test as well?
1847
    }
1848

    
1849
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1850
                                       self.cfg.GetClusterName())
1851
    for verifier in node_verify_list:
1852
      if not result[verifier]:
1853
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1854
                                 " for remote verification" % verifier)
1855
      if result[verifier]['nodelist']:
1856
        for failed in result[verifier]['nodelist']:
1857
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1858
                      (verifier, result[verifier]['nodelist'][failed]))
1859
        raise errors.OpExecError("ssh/hostname verification failed.")
1860

    
1861
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1862
    # including the node just added
1863
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1864
    dist_nodes = self.cfg.GetNodeList()
1865
    if not self.op.readd:
1866
      dist_nodes.append(node)
1867
    if myself.name in dist_nodes:
1868
      dist_nodes.remove(myself.name)
1869

    
1870
    logging.debug("Copying hosts and known_hosts to all nodes")
1871
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1872
      result = self.rpc.call_upload_file(dist_nodes, fname)
1873
      for to_node in dist_nodes:
1874
        if not result[to_node]:
1875
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1876

    
1877
    to_copy = []
1878
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1879
      to_copy.append(constants.VNC_PASSWORD_FILE)
1880
    for fname in to_copy:
1881
      result = self.rpc.call_upload_file([node], fname)
1882
      if not result[node]:
1883
        logging.error("Could not copy file %s to node %s", fname, node)
1884

    
1885
    if self.op.readd:
1886
      self.context.ReaddNode(new_node)
1887
    else:
1888
      self.context.AddNode(new_node)
1889

    
1890

    
1891
class LUQueryClusterInfo(NoHooksLU):
1892
  """Query cluster configuration.
1893

1894
  """
1895
  _OP_REQP = []
1896
  REQ_MASTER = False
1897
  REQ_BGL = False
1898

    
1899
  def ExpandNames(self):
1900
    self.needed_locks = {}
1901

    
1902
  def CheckPrereq(self):
1903
    """No prerequsites needed for this LU.
1904

1905
    """
1906
    pass
1907

    
1908
  def Exec(self, feedback_fn):
1909
    """Return cluster config.
1910

1911
    """
1912
    cluster = self.cfg.GetClusterInfo()
1913
    result = {
1914
      "software_version": constants.RELEASE_VERSION,
1915
      "protocol_version": constants.PROTOCOL_VERSION,
1916
      "config_version": constants.CONFIG_VERSION,
1917
      "os_api_version": constants.OS_API_VERSION,
1918
      "export_version": constants.EXPORT_VERSION,
1919
      "architecture": (platform.architecture()[0], platform.machine()),
1920
      "name": cluster.cluster_name,
1921
      "master": cluster.master_node,
1922
      "default_hypervisor": cluster.default_hypervisor,
1923
      "enabled_hypervisors": cluster.enabled_hypervisors,
1924
      "hvparams": cluster.hvparams,
1925
      "beparams": cluster.beparams,
1926
      }
1927

    
1928
    return result
1929

    
1930

    
1931
class LUQueryConfigValues(NoHooksLU):
1932
  """Return configuration values.
1933

1934
  """
1935
  _OP_REQP = []
1936
  REQ_BGL = False
1937

    
1938
  def ExpandNames(self):
1939
    self.needed_locks = {}
1940

    
1941
    static_fields = ["cluster_name", "master_node", "drain_flag"]
1942
    _CheckOutputFields(static=static_fields,
1943
                       dynamic=[],
1944
                       selected=self.op.output_fields)
1945

    
1946
  def CheckPrereq(self):
1947
    """No prerequisites.
1948

1949
    """
1950
    pass
1951

    
1952
  def Exec(self, feedback_fn):
1953
    """Dump a representation of the cluster config to the standard output.
1954

1955
    """
1956
    values = []
1957
    for field in self.op.output_fields:
1958
      if field == "cluster_name":
1959
        entry = self.cfg.GetClusterName()
1960
      elif field == "master_node":
1961
        entry = self.cfg.GetMasterNode()
1962
      elif field == "drain_flag":
1963
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1964
      else:
1965
        raise errors.ParameterError(field)
1966
      values.append(entry)
1967
    return values
1968

    
1969

    
1970
class LUActivateInstanceDisks(NoHooksLU):
1971
  """Bring up an instance's disks.
1972

1973
  """
1974
  _OP_REQP = ["instance_name"]
1975
  REQ_BGL = False
1976

    
1977
  def ExpandNames(self):
1978
    self._ExpandAndLockInstance()
1979
    self.needed_locks[locking.LEVEL_NODE] = []
1980
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1981

    
1982
  def DeclareLocks(self, level):
1983
    if level == locking.LEVEL_NODE:
1984
      self._LockInstancesNodes()
1985

    
1986
  def CheckPrereq(self):
1987
    """Check prerequisites.
1988

1989
    This checks that the instance is in the cluster.
1990

1991
    """
1992
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1993
    assert self.instance is not None, \
1994
      "Cannot retrieve locked instance %s" % self.op.instance_name
1995

    
1996
  def Exec(self, feedback_fn):
1997
    """Activate the disks.
1998

1999
    """
2000
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2001
    if not disks_ok:
2002
      raise errors.OpExecError("Cannot activate block devices")
2003

    
2004
    return disks_info
2005

    
2006

    
2007
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2008
  """Prepare the block devices for an instance.
2009

2010
  This sets up the block devices on all nodes.
2011

2012
  Args:
2013
    instance: a ganeti.objects.Instance object
2014
    ignore_secondaries: if true, errors on secondary nodes won't result
2015
                        in an error return from the function
2016

2017
  Returns:
2018
    false if the operation failed
2019
    list of (host, instance_visible_name, node_visible_name) if the operation
2020
         suceeded with the mapping from node devices to instance devices
2021
  """
2022
  device_info = []
2023
  disks_ok = True
2024
  iname = instance.name
2025
  # With the two passes mechanism we try to reduce the window of
2026
  # opportunity for the race condition of switching DRBD to primary
2027
  # before handshaking occured, but we do not eliminate it
2028

    
2029
  # The proper fix would be to wait (with some limits) until the
2030
  # connection has been made and drbd transitions from WFConnection
2031
  # into any other network-connected state (Connected, SyncTarget,
2032
  # SyncSource, etc.)
2033

    
2034
  # 1st pass, assemble on all nodes in secondary mode
2035
  for inst_disk in instance.disks:
2036
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2037
      lu.cfg.SetDiskID(node_disk, node)
2038
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2039
      if not result:
2040
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2041
                           " (is_primary=False, pass=1)",
2042
                           inst_disk.iv_name, node)
2043
        if not ignore_secondaries:
2044
          disks_ok = False
2045

    
2046
  # FIXME: race condition on drbd migration to primary
2047

    
2048
  # 2nd pass, do only the primary node
2049
  for inst_disk in instance.disks:
2050
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2051
      if node != instance.primary_node:
2052
        continue
2053
      lu.cfg.SetDiskID(node_disk, node)
2054
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2055
      if not result:
2056
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2057
                           " (is_primary=True, pass=2)",
2058
                           inst_disk.iv_name, node)
2059
        disks_ok = False
2060
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2061

    
2062
  # leave the disks configured for the primary node
2063
  # this is a workaround that would be fixed better by
2064
  # improving the logical/physical id handling
2065
  for disk in instance.disks:
2066
    lu.cfg.SetDiskID(disk, instance.primary_node)
2067

    
2068
  return disks_ok, device_info
2069

    
2070

    
2071
def _StartInstanceDisks(lu, instance, force):
2072
  """Start the disks of an instance.
2073

2074
  """
2075
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2076
                                           ignore_secondaries=force)
2077
  if not disks_ok:
2078
    _ShutdownInstanceDisks(lu, instance)
2079
    if force is not None and not force:
2080
      lu.proc.LogWarning("", hint="If the message above refers to a"
2081
                         " secondary node,"
2082
                         " you can retry the operation using '--force'.")
2083
    raise errors.OpExecError("Disk consistency error")
2084

    
2085

    
2086
class LUDeactivateInstanceDisks(NoHooksLU):
2087
  """Shutdown an instance's disks.
2088

2089
  """
2090
  _OP_REQP = ["instance_name"]
2091
  REQ_BGL = False
2092

    
2093
  def ExpandNames(self):
2094
    self._ExpandAndLockInstance()
2095
    self.needed_locks[locking.LEVEL_NODE] = []
2096
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2097

    
2098
  def DeclareLocks(self, level):
2099
    if level == locking.LEVEL_NODE:
2100
      self._LockInstancesNodes()
2101

    
2102
  def CheckPrereq(self):
2103
    """Check prerequisites.
2104

2105
    This checks that the instance is in the cluster.
2106

2107
    """
2108
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2109
    assert self.instance is not None, \
2110
      "Cannot retrieve locked instance %s" % self.op.instance_name
2111

    
2112
  def Exec(self, feedback_fn):
2113
    """Deactivate the disks
2114

2115
    """
2116
    instance = self.instance
2117
    _SafeShutdownInstanceDisks(self, instance)
2118

    
2119

    
2120
def _SafeShutdownInstanceDisks(lu, instance):
2121
  """Shutdown block devices of an instance.
2122

2123
  This function checks if an instance is running, before calling
2124
  _ShutdownInstanceDisks.
2125

2126
  """
2127
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2128
                                      [instance.hypervisor])
2129
  ins_l = ins_l[instance.primary_node]
2130
  if not type(ins_l) is list:
2131
    raise errors.OpExecError("Can't contact node '%s'" %
2132
                             instance.primary_node)
2133

    
2134
  if instance.name in ins_l:
2135
    raise errors.OpExecError("Instance is running, can't shutdown"
2136
                             " block devices.")
2137

    
2138
  _ShutdownInstanceDisks(lu, instance)
2139

    
2140

    
2141
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2142
  """Shutdown block devices of an instance.
2143

2144
  This does the shutdown on all nodes of the instance.
2145

2146
  If the ignore_primary is false, errors on the primary node are
2147
  ignored.
2148

2149
  """
2150
  result = True
2151
  for disk in instance.disks:
2152
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2153
      lu.cfg.SetDiskID(top_disk, node)
2154
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2155
        logging.error("Could not shutdown block device %s on node %s",
2156
                      disk.iv_name, node)
2157
        if not ignore_primary or node != instance.primary_node:
2158
          result = False
2159
  return result
2160

    
2161

    
2162
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2163
  """Checks if a node has enough free memory.
2164

2165
  This function check if a given node has the needed amount of free
2166
  memory. In case the node has less memory or we cannot get the
2167
  information from the node, this function raise an OpPrereqError
2168
  exception.
2169

2170
  @type lu: C{LogicalUnit}
2171
  @param lu: a logical unit from which we get configuration data
2172
  @type node: C{str}
2173
  @param node: the node to check
2174
  @type reason: C{str}
2175
  @param reason: string to use in the error message
2176
  @type requested: C{int}
2177
  @param requested: the amount of memory in MiB to check for
2178
  @type hypervisor: C{str}
2179
  @param hypervisor: the hypervisor to ask for memory stats
2180
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2181
      we cannot check the node
2182

2183
  """
2184
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2185
  if not nodeinfo or not isinstance(nodeinfo, dict):
2186
    raise errors.OpPrereqError("Could not contact node %s for resource"
2187
                             " information" % (node,))
2188

    
2189
  free_mem = nodeinfo[node].get('memory_free')
2190
  if not isinstance(free_mem, int):
2191
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2192
                             " was '%s'" % (node, free_mem))
2193
  if requested > free_mem:
2194
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2195
                             " needed %s MiB, available %s MiB" %
2196
                             (node, reason, requested, free_mem))
2197

    
2198

    
2199
class LUStartupInstance(LogicalUnit):
2200
  """Starts an instance.
2201

2202
  """
2203
  HPATH = "instance-start"
2204
  HTYPE = constants.HTYPE_INSTANCE
2205
  _OP_REQP = ["instance_name", "force"]
2206
  REQ_BGL = False
2207

    
2208
  def ExpandNames(self):
2209
    self._ExpandAndLockInstance()
2210
    self.needed_locks[locking.LEVEL_NODE] = []
2211
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2212

    
2213
  def DeclareLocks(self, level):
2214
    if level == locking.LEVEL_NODE:
2215
      self._LockInstancesNodes()
2216

    
2217
  def BuildHooksEnv(self):
2218
    """Build hooks env.
2219

2220
    This runs on master, primary and secondary nodes of the instance.
2221

2222
    """
2223
    env = {
2224
      "FORCE": self.op.force,
2225
      }
2226
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2227
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2228
          list(self.instance.secondary_nodes))
2229
    return env, nl, nl
2230

    
2231
  def CheckPrereq(self):
2232
    """Check prerequisites.
2233

2234
    This checks that the instance is in the cluster.
2235

2236
    """
2237
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2238
    assert self.instance is not None, \
2239
      "Cannot retrieve locked instance %s" % self.op.instance_name
2240

    
2241
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2242
    # check bridges existance
2243
    _CheckInstanceBridgesExist(self, instance)
2244

    
2245
    _CheckNodeFreeMemory(self, instance.primary_node,
2246
                         "starting instance %s" % instance.name,
2247
                         bep[constants.BE_MEMORY], instance.hypervisor)
2248

    
2249
  def Exec(self, feedback_fn):
2250
    """Start the instance.
2251

2252
    """
2253
    instance = self.instance
2254
    force = self.op.force
2255
    extra_args = getattr(self.op, "extra_args", "")
2256

    
2257
    self.cfg.MarkInstanceUp(instance.name)
2258

    
2259
    node_current = instance.primary_node
2260

    
2261
    _StartInstanceDisks(self, instance, force)
2262

    
2263
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2264
      _ShutdownInstanceDisks(self, instance)
2265
      raise errors.OpExecError("Could not start instance")
2266

    
2267

    
2268
class LURebootInstance(LogicalUnit):
2269
  """Reboot an instance.
2270

2271
  """
2272
  HPATH = "instance-reboot"
2273
  HTYPE = constants.HTYPE_INSTANCE
2274
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2275
  REQ_BGL = False
2276

    
2277
  def ExpandNames(self):
2278
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2279
                                   constants.INSTANCE_REBOOT_HARD,
2280
                                   constants.INSTANCE_REBOOT_FULL]:
2281
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2282
                                  (constants.INSTANCE_REBOOT_SOFT,
2283
                                   constants.INSTANCE_REBOOT_HARD,
2284
                                   constants.INSTANCE_REBOOT_FULL))
2285
    self._ExpandAndLockInstance()
2286
    self.needed_locks[locking.LEVEL_NODE] = []
2287
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2288

    
2289
  def DeclareLocks(self, level):
2290
    if level == locking.LEVEL_NODE:
2291
      primary_only = not constants.INSTANCE_REBOOT_FULL
2292
      self._LockInstancesNodes(primary_only=primary_only)
2293

    
2294
  def BuildHooksEnv(self):
2295
    """Build hooks env.
2296

2297
    This runs on master, primary and secondary nodes of the instance.
2298

2299
    """
2300
    env = {
2301
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2302
      }
2303
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2304
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2305
          list(self.instance.secondary_nodes))
2306
    return env, nl, nl
2307

    
2308
  def CheckPrereq(self):
2309
    """Check prerequisites.
2310

2311
    This checks that the instance is in the cluster.
2312

2313
    """
2314
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2315
    assert self.instance is not None, \
2316
      "Cannot retrieve locked instance %s" % self.op.instance_name
2317

    
2318
    # check bridges existance
2319
    _CheckInstanceBridgesExist(self, instance)
2320

    
2321
  def Exec(self, feedback_fn):
2322
    """Reboot the instance.
2323

2324
    """
2325
    instance = self.instance
2326
    ignore_secondaries = self.op.ignore_secondaries
2327
    reboot_type = self.op.reboot_type
2328
    extra_args = getattr(self.op, "extra_args", "")
2329

    
2330
    node_current = instance.primary_node
2331

    
2332
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2333
                       constants.INSTANCE_REBOOT_HARD]:
2334
      if not self.rpc.call_instance_reboot(node_current, instance,
2335
                                           reboot_type, extra_args):
2336
        raise errors.OpExecError("Could not reboot instance")
2337
    else:
2338
      if not self.rpc.call_instance_shutdown(node_current, instance):
2339
        raise errors.OpExecError("could not shutdown instance for full reboot")
2340
      _ShutdownInstanceDisks(self, instance)
2341
      _StartInstanceDisks(self, instance, ignore_secondaries)
2342
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2343
        _ShutdownInstanceDisks(self, instance)
2344
        raise errors.OpExecError("Could not start instance for full reboot")
2345

    
2346
    self.cfg.MarkInstanceUp(instance.name)
2347

    
2348

    
2349
class LUShutdownInstance(LogicalUnit):
2350
  """Shutdown an instance.
2351

2352
  """
2353
  HPATH = "instance-stop"
2354
  HTYPE = constants.HTYPE_INSTANCE
2355
  _OP_REQP = ["instance_name"]
2356
  REQ_BGL = False
2357

    
2358
  def ExpandNames(self):
2359
    self._ExpandAndLockInstance()
2360
    self.needed_locks[locking.LEVEL_NODE] = []
2361
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2362

    
2363
  def DeclareLocks(self, level):
2364
    if level == locking.LEVEL_NODE:
2365
      self._LockInstancesNodes()
2366

    
2367
  def BuildHooksEnv(self):
2368
    """Build hooks env.
2369

2370
    This runs on master, primary and secondary nodes of the instance.
2371

2372
    """
2373
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2374
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2375
          list(self.instance.secondary_nodes))
2376
    return env, nl, nl
2377

    
2378
  def CheckPrereq(self):
2379
    """Check prerequisites.
2380

2381
    This checks that the instance is in the cluster.
2382

2383
    """
2384
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2385
    assert self.instance is not None, \
2386
      "Cannot retrieve locked instance %s" % self.op.instance_name
2387

    
2388
  def Exec(self, feedback_fn):
2389
    """Shutdown the instance.
2390

2391
    """
2392
    instance = self.instance
2393
    node_current = instance.primary_node
2394
    self.cfg.MarkInstanceDown(instance.name)
2395
    if not self.rpc.call_instance_shutdown(node_current, instance):
2396
      self.proc.LogWarning("Could not shutdown instance")
2397

    
2398
    _ShutdownInstanceDisks(self, instance)
2399

    
2400

    
2401
class LUReinstallInstance(LogicalUnit):
2402
  """Reinstall an instance.
2403

2404
  """
2405
  HPATH = "instance-reinstall"
2406
  HTYPE = constants.HTYPE_INSTANCE
2407
  _OP_REQP = ["instance_name"]
2408
  REQ_BGL = False
2409

    
2410
  def ExpandNames(self):
2411
    self._ExpandAndLockInstance()
2412
    self.needed_locks[locking.LEVEL_NODE] = []
2413
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2414

    
2415
  def DeclareLocks(self, level):
2416
    if level == locking.LEVEL_NODE:
2417
      self._LockInstancesNodes()
2418

    
2419
  def BuildHooksEnv(self):
2420
    """Build hooks env.
2421

2422
    This runs on master, primary and secondary nodes of the instance.
2423

2424
    """
2425
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2426
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2427
          list(self.instance.secondary_nodes))
2428
    return env, nl, nl
2429

    
2430
  def CheckPrereq(self):
2431
    """Check prerequisites.
2432

2433
    This checks that the instance is in the cluster and is not running.
2434

2435
    """
2436
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2437
    assert instance is not None, \
2438
      "Cannot retrieve locked instance %s" % self.op.instance_name
2439

    
2440
    if instance.disk_template == constants.DT_DISKLESS:
2441
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2442
                                 self.op.instance_name)
2443
    if instance.status != "down":
2444
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2445
                                 self.op.instance_name)
2446
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2447
                                              instance.name,
2448
                                              instance.hypervisor)
2449
    if remote_info:
2450
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2451
                                 (self.op.instance_name,
2452
                                  instance.primary_node))
2453

    
2454
    self.op.os_type = getattr(self.op, "os_type", None)
2455
    if self.op.os_type is not None:
2456
      # OS verification
2457
      pnode = self.cfg.GetNodeInfo(
2458
        self.cfg.ExpandNodeName(instance.primary_node))
2459
      if pnode is None:
2460
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2461
                                   self.op.pnode)
2462
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2463
      if not os_obj:
2464
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2465
                                   " primary node"  % self.op.os_type)
2466

    
2467
    self.instance = instance
2468

    
2469
  def Exec(self, feedback_fn):
2470
    """Reinstall the instance.
2471

2472
    """
2473
    inst = self.instance
2474

    
2475
    if self.op.os_type is not None:
2476
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2477
      inst.os = self.op.os_type
2478
      self.cfg.Update(inst)
2479

    
2480
    _StartInstanceDisks(self, inst, None)
2481
    try:
2482
      feedback_fn("Running the instance OS create scripts...")
2483
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2484
                                           "sda", "sdb"):
2485
        raise errors.OpExecError("Could not install OS for instance %s"
2486
                                 " on node %s" %
2487
                                 (inst.name, inst.primary_node))
2488
    finally:
2489
      _ShutdownInstanceDisks(self, inst)
2490

    
2491

    
2492
class LURenameInstance(LogicalUnit):
2493
  """Rename an instance.
2494

2495
  """
2496
  HPATH = "instance-rename"
2497
  HTYPE = constants.HTYPE_INSTANCE
2498
  _OP_REQP = ["instance_name", "new_name"]
2499

    
2500
  def BuildHooksEnv(self):
2501
    """Build hooks env.
2502

2503
    This runs on master, primary and secondary nodes of the instance.
2504

2505
    """
2506
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2507
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2508
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2509
          list(self.instance.secondary_nodes))
2510
    return env, nl, nl
2511

    
2512
  def CheckPrereq(self):
2513
    """Check prerequisites.
2514

2515
    This checks that the instance is in the cluster and is not running.
2516

2517
    """
2518
    instance = self.cfg.GetInstanceInfo(
2519
      self.cfg.ExpandInstanceName(self.op.instance_name))
2520
    if instance is None:
2521
      raise errors.OpPrereqError("Instance '%s' not known" %
2522
                                 self.op.instance_name)
2523
    if instance.status != "down":
2524
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2525
                                 self.op.instance_name)
2526
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2527
                                              instance.name,
2528
                                              instance.hypervisor)
2529
    if remote_info:
2530
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2531
                                 (self.op.instance_name,
2532
                                  instance.primary_node))
2533
    self.instance = instance
2534

    
2535
    # new name verification
2536
    name_info = utils.HostInfo(self.op.new_name)
2537

    
2538
    self.op.new_name = new_name = name_info.name
2539
    instance_list = self.cfg.GetInstanceList()
2540
    if new_name in instance_list:
2541
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2542
                                 new_name)
2543

    
2544
    if not getattr(self.op, "ignore_ip", False):
2545
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2546
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2547
                                   (name_info.ip, new_name))
2548

    
2549

    
2550
  def Exec(self, feedback_fn):
2551
    """Reinstall the instance.
2552

2553
    """
2554
    inst = self.instance
2555
    old_name = inst.name
2556

    
2557
    if inst.disk_template == constants.DT_FILE:
2558
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2559

    
2560
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2561
    # Change the instance lock. This is definitely safe while we hold the BGL
2562
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2563
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2564

    
2565
    # re-read the instance from the configuration after rename
2566
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2567

    
2568
    if inst.disk_template == constants.DT_FILE:
2569
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2570
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2571
                                                     old_file_storage_dir,
2572
                                                     new_file_storage_dir)
2573

    
2574
      if not result:
2575
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2576
                                 " directory '%s' to '%s' (but the instance"
2577
                                 " has been renamed in Ganeti)" % (
2578
                                 inst.primary_node, old_file_storage_dir,
2579
                                 new_file_storage_dir))
2580

    
2581
      if not result[0]:
2582
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2583
                                 " (but the instance has been renamed in"
2584
                                 " Ganeti)" % (old_file_storage_dir,
2585
                                               new_file_storage_dir))
2586

    
2587
    _StartInstanceDisks(self, inst, None)
2588
    try:
2589
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2590
                                               old_name):
2591
        msg = ("Could not run OS rename script for instance %s on node %s"
2592
               " (but the instance has been renamed in Ganeti)" %
2593
               (inst.name, inst.primary_node))
2594
        self.proc.LogWarning(msg)
2595
    finally:
2596
      _ShutdownInstanceDisks(self, inst)
2597

    
2598

    
2599
class LURemoveInstance(LogicalUnit):
2600
  """Remove an instance.
2601

2602
  """
2603
  HPATH = "instance-remove"
2604
  HTYPE = constants.HTYPE_INSTANCE
2605
  _OP_REQP = ["instance_name", "ignore_failures"]
2606
  REQ_BGL = False
2607

    
2608
  def ExpandNames(self):
2609
    self._ExpandAndLockInstance()
2610
    self.needed_locks[locking.LEVEL_NODE] = []
2611
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2612

    
2613
  def DeclareLocks(self, level):
2614
    if level == locking.LEVEL_NODE:
2615
      self._LockInstancesNodes()
2616

    
2617
  def BuildHooksEnv(self):
2618
    """Build hooks env.
2619

2620
    This runs on master, primary and secondary nodes of the instance.
2621

2622
    """
2623
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2624
    nl = [self.cfg.GetMasterNode()]
2625
    return env, nl, nl
2626

    
2627
  def CheckPrereq(self):
2628
    """Check prerequisites.
2629

2630
    This checks that the instance is in the cluster.
2631

2632
    """
2633
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2634
    assert self.instance is not None, \
2635
      "Cannot retrieve locked instance %s" % self.op.instance_name
2636

    
2637
  def Exec(self, feedback_fn):
2638
    """Remove the instance.
2639

2640
    """
2641
    instance = self.instance
2642
    logging.info("Shutting down instance %s on node %s",
2643
                 instance.name, instance.primary_node)
2644

    
2645
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2646
      if self.op.ignore_failures:
2647
        feedback_fn("Warning: can't shutdown instance")
2648
      else:
2649
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2650
                                 (instance.name, instance.primary_node))
2651

    
2652
    logging.info("Removing block devices for instance %s", instance.name)
2653

    
2654
    if not _RemoveDisks(self, instance):
2655
      if self.op.ignore_failures:
2656
        feedback_fn("Warning: can't remove instance's disks")
2657
      else:
2658
        raise errors.OpExecError("Can't remove instance's disks")
2659

    
2660
    logging.info("Removing instance %s out of cluster config", instance.name)
2661

    
2662
    self.cfg.RemoveInstance(instance.name)
2663
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2664

    
2665

    
2666
class LUQueryInstances(NoHooksLU):
2667
  """Logical unit for querying instances.
2668

2669
  """
2670
  _OP_REQP = ["output_fields", "names"]
2671
  REQ_BGL = False
2672

    
2673
  def ExpandNames(self):
2674
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2675
    hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2676
    bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2677
    self.static_fields = frozenset([
2678
      "name", "os", "pnode", "snodes",
2679
      "admin_state", "admin_ram",
2680
      "disk_template", "ip", "mac", "bridge",
2681
      "sda_size", "sdb_size", "vcpus", "tags",
2682
      "network_port", "beparams",
2683
      "serial_no", "hypervisor", "hvparams",
2684
      ] + hvp + bep)
2685

    
2686
    _CheckOutputFields(static=self.static_fields,
2687
                       dynamic=self.dynamic_fields,
2688
                       selected=self.op.output_fields)
2689

    
2690
    self.needed_locks = {}
2691
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2692
    self.share_locks[locking.LEVEL_NODE] = 1
2693

    
2694
    if self.op.names:
2695
      self.wanted = _GetWantedInstances(self, self.op.names)
2696
    else:
2697
      self.wanted = locking.ALL_SET
2698

    
2699
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2700
    if self.do_locking:
2701
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2702
      self.needed_locks[locking.LEVEL_NODE] = []
2703
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2704

    
2705
  def DeclareLocks(self, level):
2706
    if level == locking.LEVEL_NODE and self.do_locking:
2707
      self._LockInstancesNodes()
2708

    
2709
  def CheckPrereq(self):
2710
    """Check prerequisites.
2711

2712
    """
2713
    pass
2714

    
2715
  def Exec(self, feedback_fn):
2716
    """Computes the list of nodes and their attributes.
2717

2718
    """
2719
    all_info = self.cfg.GetAllInstancesInfo()
2720
    if self.do_locking:
2721
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2722
    elif self.wanted != locking.ALL_SET:
2723
      instance_names = self.wanted
2724
      missing = set(instance_names).difference(all_info.keys())
2725
      if missing:
2726
        raise errors.OpExecError(
2727
          "Some instances were removed before retrieving their data: %s"
2728
          % missing)
2729
    else:
2730
      instance_names = all_info.keys()
2731

    
2732
    instance_names = utils.NiceSort(instance_names)
2733
    instance_list = [all_info[iname] for iname in instance_names]
2734

    
2735
    # begin data gathering
2736

    
2737
    nodes = frozenset([inst.primary_node for inst in instance_list])
2738
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2739

    
2740
    bad_nodes = []
2741
    if self.dynamic_fields.intersection(self.op.output_fields):
2742
      live_data = {}
2743
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2744
      for name in nodes:
2745
        result = node_data[name]
2746
        if result:
2747
          live_data.update(result)
2748
        elif result == False:
2749
          bad_nodes.append(name)
2750
        # else no instance is alive
2751
    else:
2752
      live_data = dict([(name, {}) for name in instance_names])
2753

    
2754
    # end data gathering
2755

    
2756
    HVPREFIX = "hv/"
2757
    BEPREFIX = "be/"
2758
    output = []
2759
    for instance in instance_list:
2760
      iout = []
2761
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2762
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2763
      for field in self.op.output_fields:
2764
        if field == "name":
2765
          val = instance.name
2766
        elif field == "os":
2767
          val = instance.os
2768
        elif field == "pnode":
2769
          val = instance.primary_node
2770
        elif field == "snodes":
2771
          val = list(instance.secondary_nodes)
2772
        elif field == "admin_state":
2773
          val = (instance.status != "down")
2774
        elif field == "oper_state":
2775
          if instance.primary_node in bad_nodes:
2776
            val = None
2777
          else:
2778
            val = bool(live_data.get(instance.name))
2779
        elif field == "status":
2780
          if instance.primary_node in bad_nodes:
2781
            val = "ERROR_nodedown"
2782
          else:
2783
            running = bool(live_data.get(instance.name))
2784
            if running:
2785
              if instance.status != "down":
2786
                val = "running"
2787
              else:
2788
                val = "ERROR_up"
2789
            else:
2790
              if instance.status != "down":
2791
                val = "ERROR_down"
2792
              else:
2793
                val = "ADMIN_down"
2794
        elif field == "oper_ram":
2795
          if instance.primary_node in bad_nodes:
2796
            val = None
2797
          elif instance.name in live_data:
2798
            val = live_data[instance.name].get("memory", "?")
2799
          else:
2800
            val = "-"
2801
        elif field == "disk_template":
2802
          val = instance.disk_template
2803
        elif field == "ip":
2804
          val = instance.nics[0].ip
2805
        elif field == "bridge":
2806
          val = instance.nics[0].bridge
2807
        elif field == "mac":
2808
          val = instance.nics[0].mac
2809
        elif field == "sda_size" or field == "sdb_size":
2810
          disk = instance.FindDisk(field[:3])
2811
          if disk is None:
2812
            val = None
2813
          else:
2814
            val = disk.size
2815
        elif field == "tags":
2816
          val = list(instance.GetTags())
2817
        elif field == "serial_no":
2818
          val = instance.serial_no
2819
        elif field == "network_port":
2820
          val = instance.network_port
2821
        elif field == "hypervisor":
2822
          val = instance.hypervisor
2823
        elif field == "hvparams":
2824
          val = i_hv
2825
        elif (field.startswith(HVPREFIX) and
2826
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2827
          val = i_hv.get(field[len(HVPREFIX):], None)
2828
        elif field == "beparams":
2829
          val = i_be
2830
        elif (field.startswith(BEPREFIX) and
2831
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2832
          val = i_be.get(field[len(BEPREFIX):], None)
2833
        else:
2834
          raise errors.ParameterError(field)
2835
        iout.append(val)
2836
      output.append(iout)
2837

    
2838
    return output
2839

    
2840

    
2841
class LUFailoverInstance(LogicalUnit):
2842
  """Failover an instance.
2843

2844
  """
2845
  HPATH = "instance-failover"
2846
  HTYPE = constants.HTYPE_INSTANCE
2847
  _OP_REQP = ["instance_name", "ignore_consistency"]
2848
  REQ_BGL = False
2849

    
2850
  def ExpandNames(self):
2851
    self._ExpandAndLockInstance()
2852
    self.needed_locks[locking.LEVEL_NODE] = []
2853
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2854

    
2855
  def DeclareLocks(self, level):
2856
    if level == locking.LEVEL_NODE:
2857
      self._LockInstancesNodes()
2858

    
2859
  def BuildHooksEnv(self):
2860
    """Build hooks env.
2861

2862
    This runs on master, primary and secondary nodes of the instance.
2863

2864
    """
2865
    env = {
2866
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2867
      }
2868
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2869
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2870
    return env, nl, nl
2871

    
2872
  def CheckPrereq(self):
2873
    """Check prerequisites.
2874

2875
    This checks that the instance is in the cluster.
2876

2877
    """
2878
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2879
    assert self.instance is not None, \
2880
      "Cannot retrieve locked instance %s" % self.op.instance_name
2881

    
2882
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2883
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2884
      raise errors.OpPrereqError("Instance's disk layout is not"
2885
                                 " network mirrored, cannot failover.")
2886

    
2887
    secondary_nodes = instance.secondary_nodes
2888
    if not secondary_nodes:
2889
      raise errors.ProgrammerError("no secondary node but using "
2890
                                   "a mirrored disk template")
2891

    
2892
    target_node = secondary_nodes[0]
2893
    # check memory requirements on the secondary node
2894
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2895
                         instance.name, bep[constants.BE_MEMORY],
2896
                         instance.hypervisor)
2897

    
2898
    # check bridge existance
2899
    brlist = [nic.bridge for nic in instance.nics]
2900
    if not self.rpc.call_bridges_exist(target_node, brlist):
2901
      raise errors.OpPrereqError("One or more target bridges %s does not"
2902
                                 " exist on destination node '%s'" %
2903
                                 (brlist, target_node))
2904

    
2905
  def Exec(self, feedback_fn):
2906
    """Failover an instance.
2907

2908
    The failover is done by shutting it down on its present node and
2909
    starting it on the secondary.
2910

2911
    """
2912
    instance = self.instance
2913

    
2914
    source_node = instance.primary_node
2915
    target_node = instance.secondary_nodes[0]
2916

    
2917
    feedback_fn("* checking disk consistency between source and target")
2918
    for dev in instance.disks:
2919
      # for drbd, these are drbd over lvm
2920
      if not _CheckDiskConsistency(self, dev, target_node, False):
2921
        if instance.status == "up" and not self.op.ignore_consistency:
2922
          raise errors.OpExecError("Disk %s is degraded on target node,"
2923
                                   " aborting failover." % dev.iv_name)
2924

    
2925
    feedback_fn("* shutting down instance on source node")
2926
    logging.info("Shutting down instance %s on node %s",
2927
                 instance.name, source_node)
2928

    
2929
    if not self.rpc.call_instance_shutdown(source_node, instance):
2930
      if self.op.ignore_consistency:
2931
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
2932
                             " Proceeding"
2933
                             " anyway. Please make sure node %s is down",
2934
                             instance.name, source_node, source_node)
2935
      else:
2936
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2937
                                 (instance.name, source_node))
2938

    
2939
    feedback_fn("* deactivating the instance's disks on source node")
2940
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2941
      raise errors.OpExecError("Can't shut down the instance's disks.")
2942

    
2943
    instance.primary_node = target_node
2944
    # distribute new instance config to the other nodes
2945
    self.cfg.Update(instance)
2946

    
2947
    # Only start the instance if it's marked as up
2948
    if instance.status == "up":
2949
      feedback_fn("* activating the instance's disks on target node")
2950
      logging.info("Starting instance %s on node %s",
2951
                   instance.name, target_node)
2952

    
2953
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2954
                                               ignore_secondaries=True)
2955
      if not disks_ok:
2956
        _ShutdownInstanceDisks(self, instance)
2957
        raise errors.OpExecError("Can't activate the instance's disks")
2958

    
2959
      feedback_fn("* starting the instance on the target node")
2960
      if not self.rpc.call_instance_start(target_node, instance, None):
2961
        _ShutdownInstanceDisks(self, instance)
2962
        raise errors.OpExecError("Could not start instance %s on node %s." %
2963
                                 (instance.name, target_node))
2964

    
2965

    
2966
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2967
  """Create a tree of block devices on the primary node.
2968

2969
  This always creates all devices.
2970

2971
  """
2972
  if device.children:
2973
    for child in device.children:
2974
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2975
        return False
2976

    
2977
  lu.cfg.SetDiskID(device, node)
2978
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2979
                                       instance.name, True, info)
2980
  if not new_id:
2981
    return False
2982
  if device.physical_id is None:
2983
    device.physical_id = new_id
2984
  return True
2985

    
2986

    
2987
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2988
  """Create a tree of block devices on a secondary node.
2989

2990
  If this device type has to be created on secondaries, create it and
2991
  all its children.
2992

2993
  If not, just recurse to children keeping the same 'force' value.
2994

2995
  """
2996
  if device.CreateOnSecondary():
2997
    force = True
2998
  if device.children:
2999
    for child in device.children:
3000
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3001
                                        child, force, info):
3002
        return False
3003

    
3004
  if not force:
3005
    return True
3006
  lu.cfg.SetDiskID(device, node)
3007
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3008
                                       instance.name, False, info)
3009
  if not new_id:
3010
    return False
3011
  if device.physical_id is None:
3012
    device.physical_id = new_id
3013
  return True
3014

    
3015

    
3016
def _GenerateUniqueNames(lu, exts):
3017
  """Generate a suitable LV name.
3018

3019
  This will generate a logical volume name for the given instance.
3020

3021
  """
3022
  results = []
3023
  for val in exts:
3024
    new_id = lu.cfg.GenerateUniqueID()
3025
    results.append("%s%s" % (new_id, val))
3026
  return results
3027

    
3028

    
3029
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3030
                         p_minor, s_minor):
3031
  """Generate a drbd8 device complete with its children.
3032

3033
  """
3034
  port = lu.cfg.AllocatePort()
3035
  vgname = lu.cfg.GetVGName()
3036
  shared_secret = lu.cfg.GenerateDRBDSecret()
3037
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3038
                          logical_id=(vgname, names[0]))
3039
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3040
                          logical_id=(vgname, names[1]))
3041
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3042
                          logical_id=(primary, secondary, port,
3043
                                      p_minor, s_minor,
3044
                                      shared_secret),
3045
                          children=[dev_data, dev_meta],
3046
                          iv_name=iv_name)
3047
  return drbd_dev
3048

    
3049

    
3050
def _GenerateDiskTemplate(lu, template_name,
3051
                          instance_name, primary_node,
3052
                          secondary_nodes, disk_sz, swap_sz,
3053
                          file_storage_dir, file_driver):
3054
  """Generate the entire disk layout for a given template type.
3055

3056
  """
3057
  #TODO: compute space requirements
3058

    
3059
  vgname = lu.cfg.GetVGName()
3060
  if template_name == constants.DT_DISKLESS:
3061
    disks = []
3062
  elif template_name == constants.DT_PLAIN:
3063
    if len(secondary_nodes) != 0:
3064
      raise errors.ProgrammerError("Wrong template configuration")
3065

    
3066
    names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3067
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3068
                           logical_id=(vgname, names[0]),
3069
                           iv_name = "sda")
3070
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3071
                           logical_id=(vgname, names[1]),
3072
                           iv_name = "sdb")
3073
    disks = [sda_dev, sdb_dev]
3074
  elif template_name == constants.DT_DRBD8:
3075
    if len(secondary_nodes) != 1:
3076
      raise errors.ProgrammerError("Wrong template configuration")
3077
    remote_node = secondary_nodes[0]
3078
    (minor_pa, minor_pb,
3079
     minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3080
      [primary_node, primary_node, remote_node, remote_node], instance_name)
3081

    
3082
    names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3083
                                      ".sdb_data", ".sdb_meta"])
3084
    drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3085
                                        disk_sz, names[0:2], "sda",
3086
                                        minor_pa, minor_sa)
3087
    drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3088
                                        swap_sz, names[2:4], "sdb",
3089
                                        minor_pb, minor_sb)
3090
    disks = [drbd_sda_dev, drbd_sdb_dev]
3091
  elif template_name == constants.DT_FILE:
3092
    if len(secondary_nodes) != 0:
3093
      raise errors.ProgrammerError("Wrong template configuration")
3094

    
3095
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3096
                                iv_name="sda", logical_id=(file_driver,
3097
                                "%s/sda" % file_storage_dir))
3098
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3099
                                iv_name="sdb", logical_id=(file_driver,
3100
                                "%s/sdb" % file_storage_dir))
3101
    disks = [file_sda_dev, file_sdb_dev]
3102
  else:
3103
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3104
  return disks
3105

    
3106

    
3107
def _GetInstanceInfoText(instance):
3108
  """Compute that text that should be added to the disk's metadata.
3109

3110
  """
3111
  return "originstname+%s" % instance.name
3112

    
3113

    
3114
def _CreateDisks(lu, instance):
3115
  """Create all disks for an instance.
3116

3117
  This abstracts away some work from AddInstance.
3118

3119
  Args:
3120
    instance: the instance object
3121

3122
  Returns:
3123
    True or False showing the success of the creation process
3124

3125
  """
3126
  info = _GetInstanceInfoText(instance)
3127

    
3128
  if instance.disk_template == constants.DT_FILE:
3129
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3130
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3131
                                                 file_storage_dir)
3132

    
3133
    if not result:
3134
      logging.error("Could not connect to node '%s'", instance.primary_node)
3135
      return False
3136

    
3137
    if not result[0]:
3138
      logging.error("Failed to create directory '%s'", file_storage_dir)
3139
      return False
3140

    
3141
  for device in instance.disks:
3142
    logging.info("Creating volume %s for instance %s",
3143
                 device.iv_name, instance.name)
3144
    #HARDCODE
3145
    for secondary_node in instance.secondary_nodes:
3146
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3147
                                        device, False, info):
3148
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3149
                      device.iv_name, device, secondary_node)
3150
        return False
3151
    #HARDCODE
3152
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3153
                                    instance, device, info):
3154
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3155
      return False
3156

    
3157
  return True
3158

    
3159

    
3160
def _RemoveDisks(lu, instance):
3161
  """Remove all disks for an instance.
3162

3163
  This abstracts away some work from `AddInstance()` and
3164
  `RemoveInstance()`. Note that in case some of the devices couldn't
3165
  be removed, the removal will continue with the other ones (compare
3166
  with `_CreateDisks()`).
3167

3168
  Args:
3169
    instance: the instance object
3170

3171
  Returns:
3172
    True or False showing the success of the removal proces
3173

3174
  """
3175
  logging.info("Removing block devices for instance %s", instance.name)
3176

    
3177
  result = True
3178
  for device in instance.disks:
3179
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3180
      lu.cfg.SetDiskID(disk, node)
3181
      if not lu.rpc.call_blockdev_remove(node, disk):
3182
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3183
                           " continuing anyway", device.iv_name, node)
3184
        result = False
3185

    
3186
  if instance.disk_template == constants.DT_FILE:
3187
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3188
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3189
                                               file_storage_dir):
3190
      logging.error("Could not remove directory '%s'", file_storage_dir)
3191
      result = False
3192

    
3193
  return result
3194

    
3195

    
3196
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3197
  """Compute disk size requirements in the volume group
3198

3199
  This is currently hard-coded for the two-drive layout.
3200

3201
  """
3202
  # Required free disk space as a function of disk and swap space
3203
  req_size_dict = {
3204
    constants.DT_DISKLESS: None,
3205
    constants.DT_PLAIN: disk_size + swap_size,
3206
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3207
    constants.DT_DRBD8: disk_size + swap_size + 256,
3208
    constants.DT_FILE: None,
3209
  }
3210

    
3211
  if disk_template not in req_size_dict:
3212
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3213
                                 " is unknown" %  disk_template)
3214

    
3215
  return req_size_dict[disk_template]
3216

    
3217

    
3218
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3219
  """Hypervisor parameter validation.
3220

3221
  This function abstract the hypervisor parameter validation to be
3222
  used in both instance create and instance modify.
3223

3224
  @type lu: L{LogicalUnit}
3225
  @param lu: the logical unit for which we check
3226
  @type nodenames: list
3227
  @param nodenames: the list of nodes on which we should check
3228
  @type hvname: string
3229
  @param hvname: the name of the hypervisor we should use
3230
  @type hvparams: dict
3231
  @param hvparams: the parameters which we need to check
3232
  @raise errors.OpPrereqError: if the parameters are not valid
3233

3234
  """
3235
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3236
                                                  hvname,
3237
                                                  hvparams)
3238
  for node in nodenames:
3239
    info = hvinfo.get(node, None)
3240
    if not info or not isinstance(info, (tuple, list)):
3241
      raise errors.OpPrereqError("Cannot get current information"
3242
                                 " from node '%s' (%s)" % (node, info))
3243
    if not info[0]:
3244
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3245
                                 " %s" % info[1])
3246

    
3247

    
3248
class LUCreateInstance(LogicalUnit):
3249
  """Create an instance.
3250

3251
  """
3252
  HPATH = "instance-add"
3253
  HTYPE = constants.HTYPE_INSTANCE
3254
  _OP_REQP = ["instance_name", "disk_size",
3255
              "disk_template", "swap_size", "mode", "start",
3256
              "wait_for_sync", "ip_check", "mac",
3257
              "hvparams", "beparams"]
3258
  REQ_BGL = False
3259

    
3260
  def _ExpandNode(self, node):
3261
    """Expands and checks one node name.
3262

3263
    """
3264
    node_full = self.cfg.ExpandNodeName(node)
3265
    if node_full is None:
3266
      raise errors.OpPrereqError("Unknown node %s" % node)
3267
    return node_full
3268

    
3269
  def ExpandNames(self):
3270
    """ExpandNames for CreateInstance.
3271

3272
    Figure out the right locks for instance creation.
3273

3274
    """
3275
    self.needed_locks = {}
3276

    
3277
    # set optional parameters to none if they don't exist
3278
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3279
      if not hasattr(self.op, attr):
3280
        setattr(self.op, attr, None)
3281

    
3282
    # cheap checks, mostly valid constants given
3283

    
3284
    # verify creation mode
3285
    if self.op.mode not in (constants.INSTANCE_CREATE,
3286
                            constants.INSTANCE_IMPORT):
3287
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3288
                                 self.op.mode)
3289

    
3290
    # disk template and mirror node verification
3291
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3292
      raise errors.OpPrereqError("Invalid disk template name")
3293

    
3294
    if self.op.hypervisor is None:
3295
      self.op.hypervisor = self.cfg.GetHypervisorType()
3296

    
3297
    cluster = self.cfg.GetClusterInfo()
3298
    enabled_hvs = cluster.enabled_hypervisors
3299
    if self.op.hypervisor not in enabled_hvs:
3300
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3301
                                 " cluster (%s)" % (self.op.hypervisor,
3302
                                  ",".join(enabled_hvs)))
3303

    
3304
    # check hypervisor parameter syntax (locally)
3305

    
3306
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3307
                                  self.op.hvparams)
3308
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3309
    hv_type.CheckParameterSyntax(filled_hvp)
3310

    
3311
    # fill and remember the beparams dict
3312
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3313
                                    self.op.beparams)
3314

    
3315
    #### instance parameters check
3316

    
3317
    # instance name verification
3318
    hostname1 = utils.HostInfo(self.op.instance_name)
3319
    self.op.instance_name = instance_name = hostname1.name
3320

    
3321
    # this is just a preventive check, but someone might still add this
3322
    # instance in the meantime, and creation will fail at lock-add time
3323
    if instance_name in self.cfg.GetInstanceList():
3324
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3325
                                 instance_name)
3326

    
3327
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3328

    
3329
    # ip validity checks
3330
    ip = getattr(self.op, "ip", None)
3331
    if ip is None or ip.lower() == "none":
3332
      inst_ip = None
3333
    elif ip.lower() == constants.VALUE_AUTO:
3334
      inst_ip = hostname1.ip
3335
    else:
3336
      if not utils.IsValidIP(ip):
3337
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3338
                                   " like a valid IP" % ip)
3339
      inst_ip = ip
3340
    self.inst_ip = self.op.ip = inst_ip
3341
    # used in CheckPrereq for ip ping check
3342
    self.check_ip = hostname1.ip
3343

    
3344
    # MAC address verification
3345
    if self.op.mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3346
      if not utils.IsValidMac(self.op.mac.lower()):
3347
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3348
                                   self.op.mac)
3349

    
3350
    # file storage checks
3351
    if (self.op.file_driver and
3352
        not self.op.file_driver in constants.FILE_DRIVER):
3353
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3354
                                 self.op.file_driver)
3355

    
3356
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3357
      raise errors.OpPrereqError("File storage directory path not absolute")
3358

    
3359
    ### Node/iallocator related checks
3360
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3361
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3362
                                 " node must be given")
3363

    
3364
    if self.op.iallocator:
3365
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3366
    else:
3367
      self.op.pnode = self._ExpandNode(self.op.pnode)
3368
      nodelist = [self.op.pnode]
3369
      if self.op.snode is not None:
3370
        self.op.snode = self._ExpandNode(self.op.snode)
3371
        nodelist.append(self.op.snode)
3372
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3373

    
3374
    # in case of import lock the source node too
3375
    if self.op.mode == constants.INSTANCE_IMPORT:
3376
      src_node = getattr(self.op, "src_node", None)
3377
      src_path = getattr(self.op, "src_path", None)
3378

    
3379
      if src_node is None or src_path is None:
3380
        raise errors.OpPrereqError("Importing an instance requires source"
3381
                                   " node and path options")
3382

    
3383
      if not os.path.isabs(src_path):
3384
        raise errors.OpPrereqError("The source path must be absolute")
3385

    
3386
      self.op.src_node = src_node = self._ExpandNode(src_node)
3387
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3388
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3389

    
3390
    else: # INSTANCE_CREATE
3391
      if getattr(self.op, "os_type", None) is None:
3392
        raise errors.OpPrereqError("No guest OS specified")
3393

    
3394
  def _RunAllocator(self):
3395
    """Run the allocator based on input opcode.
3396

3397
    """
3398
    disks = [{"size": self.op.disk_size, "mode": "w"},
3399
             {"size": self.op.swap_size, "mode": "w"}]
3400
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3401
             "bridge": self.op.bridge}]
3402
    ial = IAllocator(self,
3403
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3404
                     name=self.op.instance_name,
3405
                     disk_template=self.op.disk_template,
3406
                     tags=[],
3407
                     os=self.op.os_type,
3408
                     vcpus=self.be_full[constants.BE_VCPUS],
3409
                     mem_size=self.be_full[constants.BE_MEMORY],
3410
                     disks=disks,
3411
                     nics=nics,
3412
                     )
3413

    
3414
    ial.Run(self.op.iallocator)
3415

    
3416
    if not ial.success:
3417
      raise errors.OpPrereqError("Can't compute nodes using"
3418
                                 " iallocator '%s': %s" % (self.op.iallocator,
3419
                                                           ial.info))
3420
    if len(ial.nodes) != ial.required_nodes:
3421
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3422
                                 " of nodes (%s), required %s" %
3423
                                 (self.op.iallocator, len(ial.nodes),
3424
                                  ial.required_nodes))
3425
    self.op.pnode = ial.nodes[0]
3426
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3427
                 self.op.instance_name, self.op.iallocator,
3428
                 ", ".join(ial.nodes))
3429
    if ial.required_nodes == 2:
3430
      self.op.snode = ial.nodes[1]
3431

    
3432
  def BuildHooksEnv(self):
3433
    """Build hooks env.
3434

3435
    This runs on master, primary and secondary nodes of the instance.
3436

3437
    """
3438
    env = {
3439
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3440
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3441
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3442
      "INSTANCE_ADD_MODE": self.op.mode,
3443
      }
3444
    if self.op.mode == constants.INSTANCE_IMPORT:
3445
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3446
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3447
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3448

    
3449
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3450
      primary_node=self.op.pnode,
3451
      secondary_nodes=self.secondaries,
3452
      status=self.instance_status,
3453
      os_type=self.op.os_type,
3454
      memory=self.be_full[constants.BE_MEMORY],
3455
      vcpus=self.be_full[constants.BE_VCPUS],
3456
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3457
    ))
3458

    
3459
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3460
          self.secondaries)
3461
    return env, nl, nl
3462

    
3463

    
3464
  def CheckPrereq(self):
3465
    """Check prerequisites.
3466

3467
    """
3468
    if (not self.cfg.GetVGName() and
3469
        self.op.disk_template not in constants.DTS_NOT_LVM):
3470
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3471
                                 " instances")
3472

    
3473

    
3474
    if self.op.mode == constants.INSTANCE_IMPORT:
3475
      src_node = self.op.src_node
3476
      src_path = self.op.src_path
3477

    
3478
      export_info = self.rpc.call_export_info(src_node, src_path)
3479

    
3480
      if not export_info:
3481
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3482

    
3483
      if not export_info.has_section(constants.INISECT_EXP):
3484
        raise errors.ProgrammerError("Corrupted export config")
3485

    
3486
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3487
      if (int(ei_version) != constants.EXPORT_VERSION):
3488
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3489
                                   (ei_version, constants.EXPORT_VERSION))
3490

    
3491
      # Check that the new instance doesn't have less disks than the export
3492
      # TODO: substitute "2" with the actual number of disks requested
3493
      instance_disks = 2
3494
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3495
      if instance_disks < export_disks:
3496
        raise errors.OpPrereqError("Not enough disks to import."
3497
                                   " (instance: %d, export: %d)" %
3498
                                   (2, export_disks))
3499

    
3500
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3501
      disk_images = []
3502
      for idx in range(export_disks):
3503
        option = 'disk%d_dump' % idx
3504
        if export_info.has_option(constants.INISECT_INS, option):
3505
          # FIXME: are the old os-es, disk sizes, etc. useful?
3506
          export_name = export_info.get(constants.INISECT_INS, option)
3507
          image = os.path.join(src_path, export_name)
3508
          disk_images.append(image)
3509
        else:
3510
          disk_images.append(False)
3511

    
3512
      self.src_images = disk_images
3513

    
3514
      if self.op.mac == constants.VALUE_AUTO:
3515
        old_name = export_info.get(constants.INISECT_INS, 'name')
3516
        if self.op.instance_name == old_name:
3517
          # FIXME: adjust every nic, when we'll be able to create instances
3518
          # with more than one
3519
          if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3520
            self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3521

    
3522
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3523

    
3524
    if self.op.start and not self.op.ip_check:
3525
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3526
                                 " adding an instance in start mode")
3527

    
3528
    if self.op.ip_check:
3529
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3530
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3531
                                   (self.check_ip, self.op.instance_name))
3532

    
3533
    # bridge verification
3534
    bridge = getattr(self.op, "bridge", None)
3535
    if bridge is None:
3536
      self.op.bridge = self.cfg.GetDefBridge()
3537
    else:
3538
      self.op.bridge = bridge
3539

    
3540
    #### allocator run
3541

    
3542
    if self.op.iallocator is not None:
3543
      self._RunAllocator()
3544

    
3545
    #### node related checks
3546

    
3547
    # check primary node
3548
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3549
    assert self.pnode is not None, \
3550
      "Cannot retrieve locked node %s" % self.op.pnode
3551
    self.secondaries = []
3552

    
3553
    # mirror node verification
3554
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3555
      if self.op.snode is None:
3556
        raise errors.OpPrereqError("The networked disk templates need"
3557
                                   " a mirror node")
3558
      if self.op.snode == pnode.name:
3559
        raise errors.OpPrereqError("The secondary node cannot be"
3560
                                   " the primary node.")
3561
      self.secondaries.append(self.op.snode)
3562

    
3563
    nodenames = [pnode.name] + self.secondaries
3564

    
3565
    req_size = _ComputeDiskSize(self.op.disk_template,
3566
                                self.op.disk_size, self.op.swap_size)
3567

    
3568
    # Check lv size requirements
3569
    if req_size is not None:
3570
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3571
                                         self.op.hypervisor)
3572
      for node in nodenames:
3573
        info = nodeinfo.get(node, None)
3574
        if not info:
3575
          raise errors.OpPrereqError("Cannot get current information"
3576
                                     " from node '%s'" % node)
3577
        vg_free = info.get('vg_free', None)
3578
        if not isinstance(vg_free, int):
3579
          raise errors.OpPrereqError("Can't compute free disk space on"
3580
                                     " node %s" % node)
3581
        if req_size > info['vg_free']:
3582
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3583
                                     " %d MB available, %d MB required" %
3584
                                     (node, info['vg_free'], req_size))
3585

    
3586
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3587

    
3588
    # os verification
3589
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3590
    if not os_obj:
3591
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3592
                                 " primary node"  % self.op.os_type)
3593

    
3594
    # bridge check on primary node
3595
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3596
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3597
                                 " destination node '%s'" %
3598
                                 (self.op.bridge, pnode.name))
3599

    
3600
    # memory check on primary node
3601
    if self.op.start:
3602
      _CheckNodeFreeMemory(self, self.pnode.name,
3603
                           "creating instance %s" % self.op.instance_name,
3604
                           self.be_full[constants.BE_MEMORY],
3605
                           self.op.hypervisor)
3606

    
3607
    if self.op.start:
3608
      self.instance_status = 'up'
3609
    else:
3610
      self.instance_status = 'down'
3611

    
3612
  def Exec(self, feedback_fn):
3613
    """Create and add the instance to the cluster.
3614

3615
    """
3616
    instance = self.op.instance_name
3617
    pnode_name = self.pnode.name
3618

    
3619
    if self.op.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3620
      mac_address = self.cfg.GenerateMAC()
3621
    else:
3622
      mac_address = self.op.mac
3623

    
3624
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3625
    if self.inst_ip is not None:
3626
      nic.ip = self.inst_ip
3627

    
3628
    ht_kind = self.op.hypervisor
3629
    if ht_kind in constants.HTS_REQ_PORT:
3630
      network_port = self.cfg.AllocatePort()
3631
    else:
3632
      network_port = None
3633

    
3634
    ##if self.op.vnc_bind_address is None:
3635
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3636

    
3637
    # this is needed because os.path.join does not accept None arguments
3638
    if self.op.file_storage_dir is None:
3639
      string_file_storage_dir = ""
3640
    else:
3641
      string_file_storage_dir = self.op.file_storage_dir
3642

    
3643
    # build the full file storage dir path
3644
    file_storage_dir = os.path.normpath(os.path.join(
3645
                                        self.cfg.GetFileStorageDir(),
3646
                                        string_file_storage_dir, instance))
3647

    
3648

    
3649
    disks = _GenerateDiskTemplate(self,
3650
                                  self.op.disk_template,
3651
                                  instance, pnode_name,
3652
                                  self.secondaries, self.op.disk_size,
3653
                                  self.op.swap_size,
3654
                                  file_storage_dir,
3655
                                  self.op.file_driver)
3656

    
3657
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3658
                            primary_node=pnode_name,
3659
                            nics=[nic], disks=disks,
3660
                            disk_template=self.op.disk_template,
3661
                            status=self.instance_status,
3662
                            network_port=network_port,
3663
                            beparams=self.op.beparams,
3664
                            hvparams=self.op.hvparams,
3665
                            hypervisor=self.op.hypervisor,
3666
                            )
3667

    
3668
    feedback_fn("* creating instance disks...")
3669
    if not _CreateDisks(self, iobj):
3670
      _RemoveDisks(self, iobj)
3671
      self.cfg.ReleaseDRBDMinors(instance)
3672
      raise errors.OpExecError("Device creation failed, reverting...")
3673

    
3674
    feedback_fn("adding instance %s to cluster config" % instance)
3675

    
3676
    self.cfg.AddInstance(iobj)
3677
    # Declare that we don't want to remove the instance lock anymore, as we've
3678
    # added the instance to the config
3679
    del self.remove_locks[locking.LEVEL_INSTANCE]
3680
    # Remove the temp. assignements for the instance's drbds
3681
    self.cfg.ReleaseDRBDMinors(instance)
3682

    
3683
    if self.op.wait_for_sync:
3684
      disk_abort = not _WaitForSync(self, iobj)
3685
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3686
      # make sure the disks are not degraded (still sync-ing is ok)
3687
      time.sleep(15)
3688
      feedback_fn("* checking mirrors status")
3689
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3690
    else:
3691
      disk_abort = False
3692

    
3693
    if disk_abort:
3694
      _RemoveDisks(self, iobj)
3695
      self.cfg.RemoveInstance(iobj.name)
3696
      # Make sure the instance lock gets removed
3697
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3698
      raise errors.OpExecError("There are some degraded disks for"
3699
                               " this instance")
3700

    
3701
    feedback_fn("creating os for instance %s on node %s" %
3702
                (instance, pnode_name))
3703

    
3704
    if iobj.disk_template != constants.DT_DISKLESS:
3705
      if self.op.mode == constants.INSTANCE_CREATE:
3706
        feedback_fn("* running the instance OS create scripts...")
3707
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3708
          raise errors.OpExecError("could not add os for instance %s"
3709
                                   " on node %s" %
3710
                                   (instance, pnode_name))
3711

    
3712
      elif self.op.mode == constants.INSTANCE_IMPORT:
3713
        feedback_fn("* running the instance OS import scripts...")
3714
        src_node = self.op.src_node
3715
        src_images = self.src_images
3716
        cluster_name = self.cfg.GetClusterName()
3717
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3718
                                                         src_node, src_images,
3719
                                                         cluster_name)
3720
        for idx, result in enumerate(import_result):
3721
          if not result:
3722
            self.LogWarning("Could not image %s for on instance %s, disk %d,"
3723
                            " on node %s" % (src_images[idx], instance, idx,
3724
                                             pnode_name))
3725
      else:
3726
        # also checked in the prereq part
3727
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3728
                                     % self.op.mode)
3729

    
3730
    if self.op.start:
3731
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3732
      feedback_fn("* starting instance...")
3733
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3734
        raise errors.OpExecError("Could not start instance")
3735

    
3736

    
3737
class LUConnectConsole(NoHooksLU):
3738
  """Connect to an instance's console.
3739

3740
  This is somewhat special in that it returns the command line that
3741
  you need to run on the master node in order to connect to the
3742
  console.
3743

3744
  """
3745
  _OP_REQP = ["instance_name"]
3746
  REQ_BGL = False
3747

    
3748
  def ExpandNames(self):
3749
    self._ExpandAndLockInstance()
3750

    
3751
  def CheckPrereq(self):
3752
    """Check prerequisites.
3753

3754
    This checks that the instance is in the cluster.
3755

3756
    """
3757
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3758
    assert self.instance is not None, \
3759
      "Cannot retrieve locked instance %s" % self.op.instance_name
3760

    
3761
  def Exec(self, feedback_fn):
3762
    """Connect to the console of an instance
3763

3764
    """
3765
    instance = self.instance
3766
    node = instance.primary_node
3767

    
3768
    node_insts = self.rpc.call_instance_list([node],
3769
                                             [instance.hypervisor])[node]
3770
    if node_insts is False:
3771
      raise errors.OpExecError("Can't connect to node %s." % node)
3772

    
3773
    if instance.name not in node_insts:
3774
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3775

    
3776
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3777

    
3778
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3779
    console_cmd = hyper.GetShellCommandForConsole(instance)
3780

    
3781
    # build ssh cmdline
3782
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3783

    
3784

    
3785
class LUReplaceDisks(LogicalUnit):
3786
  """Replace the disks of an instance.
3787

3788
  """
3789
  HPATH = "mirrors-replace"
3790
  HTYPE = constants.HTYPE_INSTANCE
3791
  _OP_REQP = ["instance_name", "mode", "disks"]
3792
  REQ_BGL = False
3793

    
3794
  def ExpandNames(self):
3795
    self._ExpandAndLockInstance()
3796

    
3797
    if not hasattr(self.op, "remote_node"):
3798
      self.op.remote_node = None
3799

    
3800
    ia_name = getattr(self.op, "iallocator", None)
3801
    if ia_name is not None:
3802
      if self.op.remote_node is not None:
3803
        raise errors.OpPrereqError("Give either the iallocator or the new"
3804
                                   " secondary, not both")
3805
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3806
    elif self.op.remote_node is not None:
3807
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3808
      if remote_node is None:
3809
        raise errors.OpPrereqError("Node '%s' not known" %
3810
                                   self.op.remote_node)
3811
      self.op.remote_node = remote_node
3812
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3813
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3814
    else:
3815
      self.needed_locks[locking.LEVEL_NODE] = []
3816
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3817

    
3818
  def DeclareLocks(self, level):
3819
    # If we're not already locking all nodes in the set we have to declare the
3820
    # instance's primary/secondary nodes.
3821
    if (level == locking.LEVEL_NODE and
3822
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3823
      self._LockInstancesNodes()
3824

    
3825
  def _RunAllocator(self):
3826
    """Compute a new secondary node using an IAllocator.
3827

3828
    """
3829
    ial = IAllocator(self,
3830
                     mode=constants.IALLOCATOR_MODE_RELOC,
3831
                     name=self.op.instance_name,
3832
                     relocate_from=[self.sec_node])
3833

    
3834
    ial.Run(self.op.iallocator)
3835

    
3836
    if not ial.success:
3837
      raise errors.OpPrereqError("Can't compute nodes using"
3838
                                 " iallocator '%s': %s" % (self.op.iallocator,
3839
                                                           ial.info))
3840
    if len(ial.nodes) != ial.required_nodes:
3841
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3842
                                 " of nodes (%s), required %s" %
3843
                                 (len(ial.nodes), ial.required_nodes))
3844
    self.op.remote_node = ial.nodes[0]
3845
    self.LogInfo("Selected new secondary for the instance: %s",
3846
                 self.op.remote_node)
3847

    
3848
  def BuildHooksEnv(self):
3849
    """Build hooks env.
3850

3851
    This runs on the master, the primary and all the secondaries.
3852

3853
    """
3854
    env = {
3855
      "MODE": self.op.mode,
3856
      "NEW_SECONDARY": self.op.remote_node,
3857
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3858
      }
3859
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3860
    nl = [
3861
      self.cfg.GetMasterNode(),
3862
      self.instance.primary_node,
3863
      ]
3864
    if self.op.remote_node is not None:
3865
      nl.append(self.op.remote_node)
3866
    return env, nl, nl
3867

    
3868
  def CheckPrereq(self):
3869
    """Check prerequisites.
3870

3871
    This checks that the instance is in the cluster.
3872

3873
    """
3874
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3875
    assert instance is not None, \
3876
      "Cannot retrieve locked instance %s" % self.op.instance_name
3877
    self.instance = instance
3878

    
3879
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3880
      raise errors.OpPrereqError("Instance's disk layout is not"
3881
                                 " network mirrored.")
3882

    
3883
    if len(instance.secondary_nodes) != 1:
3884
      raise errors.OpPrereqError("The instance has a strange layout,"
3885
                                 " expected one secondary but found %d" %
3886
                                 len(instance.secondary_nodes))
3887

    
3888
    self.sec_node = instance.secondary_nodes[0]
3889

    
3890
    ia_name = getattr(self.op, "iallocator", None)
3891
    if ia_name is not None:
3892
      self._RunAllocator()
3893

    
3894
    remote_node = self.op.remote_node
3895
    if remote_node is not None:
3896
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3897
      assert self.remote_node_info is not None, \
3898
        "Cannot retrieve locked node %s" % remote_node
3899
    else:
3900
      self.remote_node_info = None
3901
    if remote_node == instance.primary_node:
3902
      raise errors.OpPrereqError("The specified node is the primary node of"
3903
                                 " the instance.")
3904
    elif remote_node == self.sec_node:
3905
      if self.op.mode == constants.REPLACE_DISK_SEC:
3906
        # this is for DRBD8, where we can't execute the same mode of
3907
        # replacement as for drbd7 (no different port allocated)
3908
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3909
                                   " replacement")
3910
    if instance.disk_template == constants.DT_DRBD8:
3911
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3912
          remote_node is not None):
3913
        # switch to replace secondary mode
3914
        self.op.mode = constants.REPLACE_DISK_SEC
3915

    
3916
      if self.op.mode == constants.REPLACE_DISK_ALL:
3917
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3918
                                   " secondary disk replacement, not"
3919
                                   " both at once")
3920
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3921
        if remote_node is not None:
3922
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3923
                                     " the secondary while doing a primary"
3924
                                     " node disk replacement")
3925
        self.tgt_node = instance.primary_node
3926
        self.oth_node = instance.secondary_nodes[0]
3927
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3928
        self.new_node = remote_node # this can be None, in which case
3929
                                    # we don't change the secondary
3930
        self.tgt_node = instance.secondary_nodes[0]
3931
        self.oth_node = instance.primary_node
3932
      else:
3933
        raise errors.ProgrammerError("Unhandled disk replace mode")
3934

    
3935
    for name in self.op.disks:
3936
      if instance.FindDisk(name) is None:
3937
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3938
                                   (name, instance.name))
3939

    
3940
  def _ExecD8DiskOnly(self, feedback_fn):
3941
    """Replace a disk on the primary or secondary for dbrd8.
3942

3943
    The algorithm for replace is quite complicated:
3944
      - for each disk to be replaced:
3945
        - create new LVs on the target node with unique names
3946
        - detach old LVs from the drbd device
3947
        - rename old LVs to name_replaced.<time_t>
3948
        - rename new LVs to old LVs
3949
        - attach the new LVs (with the old names now) to the drbd device
3950
      - wait for sync across all devices
3951
      - for each modified disk:
3952
        - remove old LVs (which have the name name_replaces.<time_t>)
3953

3954
    Failures are not very well handled.
3955

3956
    """
3957
    steps_total = 6
3958
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3959
    instance = self.instance
3960
    iv_names = {}
3961
    vgname = self.cfg.GetVGName()
3962
    # start of work
3963
    cfg = self.cfg
3964
    tgt_node = self.tgt_node
3965
    oth_node = self.oth_node
3966

    
3967
    # Step: check device activation
3968
    self.proc.LogStep(1, steps_total, "check device existence")
3969
    info("checking volume groups")
3970
    my_vg = cfg.GetVGName()
3971
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3972
    if not results:
3973
      raise errors.OpExecError("Can't list volume groups on the nodes")
3974
    for node in oth_node, tgt_node:
3975
      res = results.get(node, False)
3976
      if not res or my_vg not in res:
3977
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3978
                                 (my_vg, node))
3979
    for dev in instance.disks:
3980
      if not dev.iv_name in self.op.disks:
3981
        continue
3982
      for node in tgt_node, oth_node:
3983
        info("checking %s on %s" % (dev.iv_name, node))
3984
        cfg.SetDiskID(dev, node)
3985
        if not self.rpc.call_blockdev_find(node, dev):
3986
          raise errors.OpExecError("Can't find device %s on node %s" %
3987
                                   (dev.iv_name, node))
3988

    
3989
    # Step: check other node consistency
3990
    self.proc.LogStep(2, steps_total, "check peer consistency")
3991
    for dev in instance.disks:
3992
      if not dev.iv_name in self.op.disks:
3993
        continue
3994
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3995
      if not _CheckDiskConsistency(self, dev, oth_node,
3996
                                   oth_node==instance.primary_node):
3997
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3998
                                 " to replace disks on this node (%s)" %
3999
                                 (oth_node, tgt_node))
4000

    
4001
    # Step: create new storage
4002
    self.proc.LogStep(3, steps_total, "allocate new storage")
4003
    for dev in instance.disks:
4004
      if not dev.iv_name in self.op.disks:
4005
        continue
4006
      size = dev.size
4007
      cfg.SetDiskID(dev, tgt_node)
4008
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4009
      names = _GenerateUniqueNames(self, lv_names)
4010
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4011
                             logical_id=(vgname, names[0]))
4012
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4013
                             logical_id=(vgname, names[1]))
4014
      new_lvs = [lv_data, lv_meta]
4015
      old_lvs = dev.children
4016
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4017
      info("creating new local storage on %s for %s" %
4018
           (tgt_node, dev.iv_name))
4019
      # since we *always* want to create this LV, we use the
4020
      # _Create...OnPrimary (which forces the creation), even if we
4021
      # are talking about the secondary node
4022
      for new_lv in new_lvs:
4023
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4024
                                        _GetInstanceInfoText(instance)):
4025
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4026
                                   " node '%s'" %
4027
                                   (new_lv.logical_id[1], tgt_node))
4028

    
4029
    # Step: for each lv, detach+rename*2+attach
4030
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4031
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4032
      info("detaching %s drbd from local storage" % dev.iv_name)
4033
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4034
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4035
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4036
      #dev.children = []
4037
      #cfg.Update(instance)
4038

    
4039
      # ok, we created the new LVs, so now we know we have the needed
4040
      # storage; as such, we proceed on the target node to rename
4041
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4042
      # using the assumption that logical_id == physical_id (which in
4043
      # turn is the unique_id on that node)
4044

    
4045
      # FIXME(iustin): use a better name for the replaced LVs
4046
      temp_suffix = int(time.time())
4047
      ren_fn = lambda d, suff: (d.physical_id[0],
4048
                                d.physical_id[1] + "_replaced-%s" % suff)
4049
      # build the rename list based on what LVs exist on the node
4050
      rlist = []
4051
      for to_ren in old_lvs:
4052
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4053
        if find_res is not None: # device exists
4054
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4055

    
4056
      info("renaming the old LVs on the target node")
4057
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4058
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4059
      # now we rename the new LVs to the old LVs
4060
      info("renaming the new LVs on the target node")
4061
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4062
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4063
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4064

    
4065
      for old, new in zip(old_lvs, new_lvs):
4066
        new.logical_id = old.logical_id
4067
        cfg.SetDiskID(new, tgt_node)
4068

    
4069
      for disk in old_lvs:
4070
        disk.logical_id = ren_fn(disk, temp_suffix)
4071
        cfg.SetDiskID(disk, tgt_node)
4072

    
4073
      # now that the new lvs have the old name, we can add them to the device
4074
      info("adding new mirror component on %s" % tgt_node)
4075
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4076
        for new_lv in new_lvs:
4077
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4078
            warning("Can't rollback device %s", hint="manually cleanup unused"
4079
                    " logical volumes")
4080
        raise errors.OpExecError("Can't add local storage to drbd")
4081

    
4082
      dev.children = new_lvs
4083
      cfg.Update(instance)
4084

    
4085
    # Step: wait for sync
4086

    
4087
    # this can fail as the old devices are degraded and _WaitForSync
4088
    # does a combined result over all disks, so we don't check its
4089
    # return value
4090
    self.proc.LogStep(5, steps_total, "sync devices")
4091
    _WaitForSync(self, instance, unlock=True)
4092

    
4093
    # so check manually all the devices
4094
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4095
      cfg.SetDiskID(dev, instance.primary_node)
4096
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4097
      if is_degr:
4098
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4099

    
4100
    # Step: remove old storage
4101
    self.proc.LogStep(6, steps_total, "removing old storage")
4102
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4103
      info("remove logical volumes for %s" % name)
4104
      for lv in old_lvs:
4105
        cfg.SetDiskID(lv, tgt_node)
4106
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4107
          warning("Can't remove old LV", hint="manually remove unused LVs")
4108
          continue
4109

    
4110
  def _ExecD8Secondary(self, feedback_fn):
4111
    """Replace the secondary node for drbd8.
4112

4113
    The algorithm for replace is quite complicated:
4114
      - for all disks of the instance:
4115
        - create new LVs on the new node with same names
4116
        - shutdown the drbd device on the old secondary
4117
        - disconnect the drbd network on the primary
4118
        - create the drbd device on the new secondary
4119
        - network attach the drbd on the primary, using an artifice:
4120
          the drbd code for Attach() will connect to the network if it
4121
          finds a device which is connected to the good local disks but
4122
          not network enabled
4123
      - wait for sync across all devices
4124
      - remove all disks from the old secondary
4125

4126
    Failures are not very well handled.
4127

4128
    """
4129
    steps_total = 6
4130
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4131
    instance = self.instance
4132
    iv_names = {}
4133
    vgname = self.cfg.GetVGName()
4134
    # start of work
4135
    cfg = self.cfg
4136
    old_node = self.tgt_node
4137
    new_node = self.new_node
4138
    pri_node = instance.primary_node
4139

    
4140
    # Step: check device activation
4141
    self.proc.LogStep(1, steps_total, "check device existence")
4142
    info("checking volume groups")
4143
    my_vg = cfg.GetVGName()
4144
    results = self.rpc.call_vg_list([pri_node, new_node])
4145
    if not results:
4146
      raise errors.OpExecError("Can't list volume groups on the nodes")
4147
    for node in pri_node, new_node:
4148
      res = results.get(node, False)
4149
      if not res or my_vg not in res:
4150
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4151
                                 (my_vg, node))
4152
    for dev in instance.disks:
4153
      if not dev.iv_name in self.op.disks:
4154
        continue
4155
      info("checking %s on %s" % (dev.iv_name, pri_node))
4156
      cfg.SetDiskID(dev, pri_node)
4157
      if not self.rpc.call_blockdev_find(pri_node, dev):
4158
        raise errors.OpExecError("Can't find device %s on node %s" %
4159
                                 (dev.iv_name, pri_node))
4160

    
4161
    # Step: check other node consistency
4162
    self.proc.LogStep(2, steps_total, "check peer consistency")
4163
    for dev in instance.disks:
4164
      if not dev.iv_name in self.op.disks:
4165
        continue
4166
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4167
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4168
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4169
                                 " unsafe to replace the secondary" %
4170
                                 pri_node)
4171

    
4172
    # Step: create new storage
4173
    self.proc.LogStep(3, steps_total, "allocate new storage")
4174
    for dev in instance.disks:
4175
      size = dev.size
4176
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4177
      # since we *always* want to create this LV, we use the
4178
      # _Create...OnPrimary (which forces the creation), even if we
4179
      # are talking about the secondary node
4180
      for new_lv in dev.children:
4181
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4182
                                        _GetInstanceInfoText(instance)):
4183
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4184
                                   " node '%s'" %
4185
                                   (new_lv.logical_id[1], new_node))
4186

    
4187

    
4188
    # Step 4: dbrd minors and drbd setups changes
4189
    # after this, we must manually remove the drbd minors on both the
4190
    # error and the success paths
4191
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4192
                                   instance.name)
4193
    logging.debug("Allocated minors %s" % (minors,))
4194
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4195
    for dev, new_minor in zip(instance.disks, minors):
4196
      size = dev.size
4197
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4198
      # create new devices on new_node
4199
      if pri_node == dev.logical_id[0]:
4200
        new_logical_id = (pri_node, new_node,
4201
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4202
                          dev.logical_id[5])
4203
      else:
4204
        new_logical_id = (new_node, pri_node,
4205
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4206
                          dev.logical_id[5])
4207
      iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4208
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4209
                    new_logical_id)
4210
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4211
                              logical_id=new_logical_id,
4212
                              children=dev.children)
4213
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4214
                                        new_drbd, False,
4215
                                        _GetInstanceInfoText(instance)):
4216
        self.cfg.ReleaseDRBDMinors(instance.name)
4217
        raise errors.OpExecError("Failed to create new DRBD on"
4218
                                 " node '%s'" % new_node)
4219

    
4220
    for dev in instance.disks:
4221
      # we have new devices, shutdown the drbd on the old secondary
4222
      info("shutting down drbd for %s on old node" % dev.iv_name)
4223
      cfg.SetDiskID(dev, old_node)
4224
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4225
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4226
                hint="Please cleanup this device manually as soon as possible")
4227

    
4228
    info("detaching primary drbds from the network (=> standalone)")
4229
    done = 0
4230
    for dev in instance.disks:
4231
      cfg.SetDiskID(dev, pri_node)
4232
      # set the network part of the physical (unique in bdev terms) id
4233
      # to None, meaning detach from network
4234
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4235
      # and 'find' the device, which will 'fix' it to match the
4236
      # standalone state
4237
      if self.rpc.call_blockdev_find(pri_node, dev):
4238
        done += 1
4239
      else:
4240
        warning("Failed to detach drbd %s from network, unusual case" %
4241
                dev.iv_name)
4242

    
4243
    if not done:
4244
      # no detaches succeeded (very unlikely)
4245
      self.cfg.ReleaseDRBDMinors(instance.name)
4246
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4247

    
4248
    # if we managed to detach at least one, we update all the disks of
4249
    # the instance to point to the new secondary
4250
    info("updating instance configuration")
4251
    for dev, _, new_logical_id in iv_names.itervalues():
4252
      dev.logical_id = new_logical_id
4253
      cfg.SetDiskID(dev, pri_node)
4254
    cfg.Update(instance)
4255
    # we can remove now the temp minors as now the new values are
4256
    # written to the config file (and therefore stable)
4257
    self.cfg.ReleaseDRBDMinors(instance.name)
4258

    
4259
    # and now perform the drbd attach
4260
    info("attaching primary drbds to new secondary (standalone => connected)")
4261
    failures = []
4262
    for dev in instance.disks:
4263
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4264
      # since the attach is smart, it's enough to 'find' the device,
4265
      # it will automatically activate the network, if the physical_id
4266
      # is correct
4267
      cfg.SetDiskID(dev, pri_node)
4268
      logging.debug("Disk to attach: %s", dev)
4269
      if not self.rpc.call_blockdev_find(pri_node, dev):
4270
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4271
                "please do a gnt-instance info to see the status of disks")
4272

    
4273
    # this can fail as the old devices are degraded and _WaitForSync
4274
    # does a combined result over all disks, so we don't check its
4275
    # return value
4276
    self.proc.LogStep(5, steps_total, "sync devices")
4277
    _WaitForSync(self, instance, unlock=True)
4278

    
4279
    # so check manually all the devices
4280
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4281
      cfg.SetDiskID(dev, pri_node)
4282
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4283
      if is_degr:
4284
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4285

    
4286
    self.proc.LogStep(6, steps_total, "removing old storage")
4287
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4288
      info("remove logical volumes for %s" % name)
4289
      for lv in old_lvs:
4290
        cfg.SetDiskID(lv, old_node)
4291
        if not self.rpc.call_blockdev_remove(old_node, lv):
4292
          warning("Can't remove LV on old secondary",
4293
                  hint="Cleanup stale volumes by hand")
4294

    
4295
  def Exec(self, feedback_fn):
4296
    """Execute disk replacement.
4297

4298
    This dispatches the disk replacement to the appropriate handler.
4299

4300
    """
4301
    instance = self.instance
4302

    
4303
    # Activate the instance disks if we're replacing them on a down instance
4304
    if instance.status == "down":
4305
      _StartInstanceDisks(self, instance, True)
4306

    
4307
    if instance.disk_template == constants.DT_DRBD8:
4308
      if self.op.remote_node is None:
4309
        fn = self._ExecD8DiskOnly
4310
      else:
4311
        fn = self._ExecD8Secondary
4312
    else:
4313
      raise errors.ProgrammerError("Unhandled disk replacement case")
4314

    
4315
    ret = fn(feedback_fn)
4316

    
4317
    # Deactivate the instance disks if we're replacing them on a down instance
4318
    if instance.status == "down":
4319
      _SafeShutdownInstanceDisks(self, instance)
4320

    
4321
    return ret
4322

    
4323

    
4324
class LUGrowDisk(LogicalUnit):
4325
  """Grow a disk of an instance.
4326

4327
  """
4328
  HPATH = "disk-grow"
4329
  HTYPE = constants.HTYPE_INSTANCE
4330
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4331
  REQ_BGL = False
4332

    
4333
  def ExpandNames(self):
4334
    self._ExpandAndLockInstance()
4335
    self.needed_locks[locking.LEVEL_NODE] = []
4336
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4337

    
4338
  def DeclareLocks(self, level):
4339
    if level == locking.LEVEL_NODE:
4340
      self._LockInstancesNodes()
4341

    
4342
  def BuildHooksEnv(self):
4343
    """Build hooks env.
4344

4345
    This runs on the master, the primary and all the secondaries.
4346

4347
    """
4348
    env = {
4349
      "DISK": self.op.disk,
4350
      "AMOUNT": self.op.amount,
4351
      }
4352
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4353
    nl = [
4354
      self.cfg.GetMasterNode(),
4355
      self.instance.primary_node,
4356
      ]
4357
    return env, nl, nl
4358

    
4359
  def CheckPrereq(self):
4360
    """Check prerequisites.
4361

4362
    This checks that the instance is in the cluster.
4363

4364
    """
4365
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4366
    assert instance is not None, \
4367
      "Cannot retrieve locked instance %s" % self.op.instance_name
4368

    
4369
    self.instance = instance
4370

    
4371
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4372
      raise errors.OpPrereqError("Instance's disk layout does not support"
4373
                                 " growing.")
4374

    
4375
    if instance.FindDisk(self.op.disk) is None:
4376
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4377
                                 (self.op.disk, instance.name))
4378

    
4379
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4380
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4381
                                       instance.hypervisor)
4382
    for node in nodenames:
4383
      info = nodeinfo.get(node, None)
4384
      if not info:
4385
        raise errors.OpPrereqError("Cannot get current information"
4386
                                   " from node '%s'" % node)
4387
      vg_free = info.get('vg_free', None)
4388
      if not isinstance(vg_free, int):
4389
        raise errors.OpPrereqError("Can't compute free disk space on"
4390
                                   " node %s" % node)
4391
      if self.op.amount > info['vg_free']:
4392
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4393
                                   " %d MiB available, %d MiB required" %
4394
                                   (node, info['vg_free'], self.op.amount))
4395

    
4396
  def Exec(self, feedback_fn):
4397
    """Execute disk grow.
4398

4399
    """
4400
    instance = self.instance
4401
    disk = instance.FindDisk(self.op.disk)
4402
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4403
      self.cfg.SetDiskID(disk, node)
4404
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4405
      if (not result or not isinstance(result, (list, tuple)) or
4406
          len(result) != 2):
4407
        raise errors.OpExecError("grow request failed to node %s" % node)
4408
      elif not result[0]:
4409
        raise errors.OpExecError("grow request failed to node %s: %s" %
4410
                                 (node, result[1]))
4411
    disk.RecordGrow(self.op.amount)
4412
    self.cfg.Update(instance)
4413
    if self.op.wait_for_sync:
4414
      disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4415
      if disk_abort:
4416
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4417
                             " status.\nPlease check the instance.")
4418

    
4419

    
4420
class LUQueryInstanceData(NoHooksLU):
4421
  """Query runtime instance data.
4422

4423
  """
4424
  _OP_REQP = ["instances", "static"]
4425
  REQ_BGL = False
4426

    
4427
  def ExpandNames(self):
4428
    self.needed_locks = {}
4429
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4430

    
4431
    if not isinstance(self.op.instances, list):
4432
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4433

    
4434
    if self.op.instances:
4435
      self.wanted_names = []
4436
      for name in self.op.instances:
4437
        full_name = self.cfg.ExpandInstanceName(name)
4438
        if full_name is None:
4439
          raise errors.OpPrereqError("Instance '%s' not known" %
4440
                                     self.op.instance_name)
4441
        self.wanted_names.append(full_name)
4442
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4443
    else:
4444
      self.wanted_names = None
4445
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4446

    
4447
    self.needed_locks[locking.LEVEL_NODE] = []
4448
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4449

    
4450
  def DeclareLocks(self, level):
4451
    if level == locking.LEVEL_NODE:
4452
      self._LockInstancesNodes()
4453

    
4454
  def CheckPrereq(self):
4455
    """Check prerequisites.
4456

4457
    This only checks the optional instance list against the existing names.
4458

4459
    """
4460
    if self.wanted_names is None:
4461
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4462

    
4463
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4464
                             in self.wanted_names]
4465
    return
4466

    
4467
  def _ComputeDiskStatus(self, instance, snode, dev):
4468
    """Compute block device status.
4469

4470
    """
4471
    static = self.op.static
4472
    if not static:
4473
      self.cfg.SetDiskID(dev, instance.primary_node)
4474
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4475
    else:
4476
      dev_pstatus = None
4477

    
4478
    if dev.dev_type in constants.LDS_DRBD:
4479
      # we change the snode then (otherwise we use the one passed in)
4480
      if dev.logical_id[0] == instance.primary_node:
4481
        snode = dev.logical_id[1]
4482
      else:
4483
        snode = dev.logical_id[0]
4484

    
4485
    if snode and not static:
4486
      self.cfg.SetDiskID(dev, snode)
4487
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4488
    else:
4489
      dev_sstatus = None
4490

    
4491
    if dev.children:
4492
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4493
                      for child in dev.children]
4494
    else:
4495
      dev_children = []
4496

    
4497
    data = {
4498
      "iv_name": dev.iv_name,
4499
      "dev_type": dev.dev_type,
4500
      "logical_id": dev.logical_id,
4501
      "physical_id": dev.physical_id,
4502
      "pstatus": dev_pstatus,
4503
      "sstatus": dev_sstatus,
4504
      "children": dev_children,
4505
      }
4506

    
4507
    return data
4508

    
4509
  def Exec(self, feedback_fn):
4510
    """Gather and return data"""
4511
    result = {}
4512

    
4513
    cluster = self.cfg.GetClusterInfo()
4514

    
4515
    for instance in self.wanted_instances:
4516
      if not self.op.static:
4517
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4518
                                                  instance.name,
4519
                                                  instance.hypervisor)
4520
        if remote_info and "state" in remote_info:
4521
          remote_state = "up"
4522
        else:
4523
          remote_state = "down"
4524
      else:
4525
        remote_state = None
4526
      if instance.status == "down":
4527
        config_state = "down"
4528
      else:
4529
        config_state = "up"
4530

    
4531
      disks = [self._ComputeDiskStatus(instance, None, device)
4532
               for device in instance.disks]
4533

    
4534
      idict = {
4535
        "name": instance.name,
4536
        "config_state": config_state,
4537
        "run_state": remote_state,
4538
        "pnode": instance.primary_node,
4539
        "snodes": instance.secondary_nodes,
4540
        "os": instance.os,
4541
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4542
        "disks": disks,
4543
        "hypervisor": instance.hypervisor,
4544
        "network_port": instance.network_port,
4545
        "hv_instance": instance.hvparams,
4546
        "hv_actual": cluster.FillHV(instance),
4547
        "be_instance": instance.beparams,
4548
        "be_actual": cluster.FillBE(instance),
4549
        }
4550

    
4551
      result[instance.name] = idict
4552

    
4553
    return result
4554

    
4555

    
4556
class LUSetInstanceParams(LogicalUnit):
4557
  """Modifies an instances's parameters.
4558

4559
  """
4560
  HPATH = "instance-modify"
4561
  HTYPE = constants.HTYPE_INSTANCE
4562
  _OP_REQP = ["instance_name", "hvparams"]
4563
  REQ_BGL = False
4564

    
4565
  def ExpandNames(self):
4566
    self._ExpandAndLockInstance()
4567
    self.needed_locks[locking.LEVEL_NODE] = []
4568
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4569

    
4570

    
4571
  def DeclareLocks(self, level):
4572
    if level == locking.LEVEL_NODE:
4573
      self._LockInstancesNodes()
4574

    
4575
  def BuildHooksEnv(self):
4576
    """Build hooks env.
4577

4578
    This runs on the master, primary and secondaries.
4579

4580
    """
4581
    args = dict()
4582
    if constants.BE_MEMORY in self.be_new:
4583
      args['memory'] = self.be_new[constants.BE_MEMORY]
4584
    if constants.BE_VCPUS in self.be_new:
4585
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4586
    if self.do_ip or self.do_bridge or self.mac:
4587
      if self.do_ip:
4588
        ip = self.ip
4589
      else:
4590
        ip = self.instance.nics[0].ip
4591
      if self.bridge:
4592
        bridge = self.bridge
4593
      else:
4594
        bridge = self.instance.nics[0].bridge
4595
      if self.mac:
4596
        mac = self.mac
4597
      else:
4598
        mac = self.instance.nics[0].mac
4599
      args['nics'] = [(ip, bridge, mac)]
4600
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4601
    nl = [self.cfg.GetMasterNode(),
4602
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4603
    return env, nl, nl
4604

    
4605
  def CheckPrereq(self):
4606
    """Check prerequisites.
4607

4608
    This only checks the instance list against the existing names.
4609

4610
    """
4611
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4612
    # a separate CheckArguments function, if we implement one, so the operation
4613
    # can be aborted without waiting for any lock, should it have an error...
4614
    self.ip = getattr(self.op, "ip", None)
4615
    self.mac = getattr(self.op, "mac", None)
4616
    self.bridge = getattr(self.op, "bridge", None)
4617
    self.kernel_path = getattr(self.op, "kernel_path", None)
4618
    self.initrd_path = getattr(self.op, "initrd_path", None)
4619
    self.force = getattr(self.op, "force", None)
4620
    all_parms = [self.ip, self.bridge, self.mac]
4621
    if (all_parms.count(None) == len(all_parms) and
4622
        not self.op.hvparams and
4623
        not self.op.beparams):
4624
      raise errors.OpPrereqError("No changes submitted")
4625
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4626
      val = self.op.beparams.get(item, None)
4627
      if val is not None:
4628
        try:
4629
          val = int(val)
4630
        except ValueError, err:
4631
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4632
        self.op.beparams[item] = val
4633
    if self.ip is not None:
4634
      self.do_ip = True
4635
      if self.ip.lower() == "none":
4636
        self.ip = None
4637
      else:
4638
        if not utils.IsValidIP(self.ip):
4639
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4640
    else:
4641
      self.do_ip = False
4642
    self.do_bridge = (self.bridge is not None)
4643
    if self.mac is not None:
4644
      if self.cfg.IsMacInUse(self.mac):
4645
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4646
                                   self.mac)
4647
      if not utils.IsValidMac(self.mac):
4648
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4649

    
4650
    # checking the new params on the primary/secondary nodes
4651

    
4652
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4653
    assert self.instance is not None, \
4654
      "Cannot retrieve locked instance %s" % self.op.instance_name
4655
    pnode = self.instance.primary_node
4656
    nodelist = [pnode]
4657
    nodelist.extend(instance.secondary_nodes)
4658

    
4659
    # hvparams processing
4660
    if self.op.hvparams:
4661
      i_hvdict = copy.deepcopy(instance.hvparams)
4662
      for key, val in self.op.hvparams.iteritems():
4663
        if val is None:
4664
          try:
4665
            del i_hvdict[key]
4666
          except KeyError:
4667
            pass
4668
        else:
4669
          i_hvdict[key] = val
4670
      cluster = self.cfg.GetClusterInfo()
4671
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4672
                                i_hvdict)
4673
      # local check
4674
      hypervisor.GetHypervisor(
4675
        instance.hypervisor).CheckParameterSyntax(hv_new)
4676
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4677
      self.hv_new = hv_new # the new actual values
4678
      self.hv_inst = i_hvdict # the new dict (without defaults)
4679
    else:
4680
      self.hv_new = self.hv_inst = {}
4681

    
4682
    # beparams processing
4683
    if self.op.beparams:
4684
      i_bedict = copy.deepcopy(instance.beparams)
4685
      for key, val in self.op.beparams.iteritems():
4686
        if val is None:
4687
          try:
4688
            del i_bedict[key]
4689
          except KeyError:
4690
            pass
4691
        else:
4692
          i_bedict[key] = val
4693
      cluster = self.cfg.GetClusterInfo()
4694
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4695
                                i_bedict)
4696
      self.be_new = be_new # the new actual values
4697
      self.be_inst = i_bedict # the new dict (without defaults)
4698
    else:
4699
      self.hv_new = self.hv_inst = {}
4700

    
4701
    self.warn = []
4702

    
4703
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4704
      mem_check_list = [pnode]
4705
      if be_new[constants.BE_AUTO_BALANCE]:
4706
        # either we changed auto_balance to yes or it was from before
4707
        mem_check_list.extend(instance.secondary_nodes)
4708
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4709
                                                  instance.hypervisor)
4710
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4711
                                         instance.hypervisor)
4712

    
4713
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4714
        # Assume the primary node is unreachable and go ahead
4715
        self.warn.append("Can't get info from primary node %s" % pnode)
4716
      else:
4717
        if instance_info:
4718
          current_mem = instance_info['memory']
4719
        else:
4720
          # Assume instance not running
4721
          # (there is a slight race condition here, but it's not very probable,
4722
          # and we have no other way to check)
4723
          current_mem = 0
4724
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4725
                    nodeinfo[pnode]['memory_free'])
4726
        if miss_mem > 0:
4727
          raise errors.OpPrereqError("This change will prevent the instance"
4728
                                     " from starting, due to %d MB of memory"
4729
                                     " missing on its primary node" % miss_mem)
4730

    
4731
      if be_new[constants.BE_AUTO_BALANCE]:
4732
        for node in instance.secondary_nodes:
4733
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4734
            self.warn.append("Can't get info from secondary node %s" % node)
4735
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4736
            self.warn.append("Not enough memory to failover instance to"
4737
                             " secondary node %s" % node)
4738

    
4739
    return
4740

    
4741
  def Exec(self, feedback_fn):
4742
    """Modifies an instance.
4743

4744
    All parameters take effect only at the next restart of the instance.
4745
    """
4746
    # Process here the warnings from CheckPrereq, as we don't have a
4747
    # feedback_fn there.
4748
    for warn in self.warn:
4749
      feedback_fn("WARNING: %s" % warn)
4750

    
4751
    result = []
4752
    instance = self.instance
4753
    if self.do_ip:
4754
      instance.nics[0].ip = self.ip
4755
      result.append(("ip", self.ip))
4756
    if self.bridge:
4757
      instance.nics[0].bridge = self.bridge
4758
      result.append(("bridge", self.bridge))
4759
    if self.mac:
4760
      instance.nics[0].mac = self.mac
4761
      result.append(("mac", self.mac))
4762
    if self.op.hvparams:
4763
      instance.hvparams = self.hv_new
4764
      for key, val in self.op.hvparams.iteritems():
4765
        result.append(("hv/%s" % key, val))
4766
    if self.op.beparams:
4767
      instance.beparams = self.be_inst
4768
      for key, val in self.op.beparams.iteritems():
4769
        result.append(("be/%s" % key, val))
4770

    
4771
    self.cfg.Update(instance)
4772

    
4773
    return result
4774

    
4775

    
4776
class LUQueryExports(NoHooksLU):
4777
  """Query the exports list
4778

4779
  """
4780
  _OP_REQP = ['nodes']
4781
  REQ_BGL = False
4782

    
4783
  def ExpandNames(self):
4784
    self.needed_locks = {}
4785
    self.share_locks[locking.LEVEL_NODE] = 1
4786
    if not self.op.nodes:
4787
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4788
    else:
4789
      self.needed_locks[locking.LEVEL_NODE] = \
4790
        _GetWantedNodes(self, self.op.nodes)
4791

    
4792
  def CheckPrereq(self):
4793
    """Check prerequisites.
4794

4795
    """
4796
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4797

    
4798
  def Exec(self, feedback_fn):
4799
    """Compute the list of all the exported system images.
4800

4801
    Returns:
4802
      a dictionary with the structure node->(export-list)
4803
      where export-list is a list of the instances exported on
4804
      that node.
4805

4806
    """
4807
    return self.rpc.call_export_list(self.nodes)
4808

    
4809

    
4810
class LUExportInstance(LogicalUnit):
4811
  """Export an instance to an image in the cluster.
4812

4813
  """
4814
  HPATH = "instance-export"
4815
  HTYPE = constants.HTYPE_INSTANCE
4816
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4817
  REQ_BGL = False
4818

    
4819
  def ExpandNames(self):
4820
    self._ExpandAndLockInstance()
4821
    # FIXME: lock only instance primary and destination node
4822
    #
4823
    # Sad but true, for now we have do lock all nodes, as we don't know where
4824
    # the previous export might be, and and in this LU we search for it and
4825
    # remove it from its current node. In the future we could fix this by:
4826
    #  - making a tasklet to search (share-lock all), then create the new one,
4827
    #    then one to remove, after
4828
    #  - removing the removal operation altoghether
4829
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4830

    
4831
  def DeclareLocks(self, level):
4832
    """Last minute lock declaration."""
4833
    # All nodes are locked anyway, so nothing to do here.
4834

    
4835
  def BuildHooksEnv(self):
4836
    """Build hooks env.
4837

4838
    This will run on the master, primary node and target node.
4839

4840
    """
4841
    env = {
4842
      "EXPORT_NODE": self.op.target_node,
4843
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4844
      }
4845
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4846
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4847
          self.op.target_node]
4848
    return env, nl, nl
4849

    
4850
  def CheckPrereq(self):
4851
    """Check prerequisites.
4852

4853
    This checks that the instance and node names are valid.
4854

4855
    """
4856
    instance_name = self.op.instance_name
4857
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4858
    assert self.instance is not None, \
4859
          "Cannot retrieve locked instance %s" % self.op.instance_name
4860

    
4861
    self.dst_node = self.cfg.GetNodeInfo(
4862
      self.cfg.ExpandNodeName(self.op.target_node))
4863

    
4864
    assert self.dst_node is not None, \
4865
          "Cannot retrieve locked node %s" % self.op.target_node
4866

    
4867
    # instance disk type verification
4868
    for disk in self.instance.disks:
4869
      if disk.dev_type == constants.LD_FILE:
4870
        raise errors.OpPrereqError("Export not supported for instances with"
4871
                                   " file-based disks")
4872

    
4873
  def Exec(self, feedback_fn):
4874
    """Export an instance to an image in the cluster.
4875

4876
    """
4877
    instance = self.instance
4878
    dst_node = self.dst_node
4879
    src_node = instance.primary_node
4880
    if self.op.shutdown:
4881
      # shutdown the instance, but not the disks
4882
      if not self.rpc.call_instance_shutdown(src_node, instance):
4883
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4884
                                 (instance.name, src_node))
4885

    
4886
    vgname = self.cfg.GetVGName()
4887

    
4888
    snap_disks = []
4889

    
4890
    try:
4891
      for disk in instance.disks:
4892
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4893
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4894

    
4895
        if not new_dev_name:
4896
          self.LogWarning("Could not snapshot block device %s on node %s",
4897
                          disk.logical_id[1], src_node)
4898
          snap_disks.append(False)
4899
        else:
4900
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4901
                                 logical_id=(vgname, new_dev_name),
4902
                                 physical_id=(vgname, new_dev_name),
4903
                                 iv_name=disk.iv_name)
4904
          snap_disks.append(new_dev)
4905

    
4906
    finally:
4907
      if self.op.shutdown and instance.status == "up":
4908
        if not self.rpc.call_instance_start(src_node, instance, None):
4909
          _ShutdownInstanceDisks(self, instance)
4910
          raise errors.OpExecError("Could not start instance")
4911

    
4912
    # TODO: check for size
4913

    
4914
    cluster_name = self.cfg.GetClusterName()
4915
    for dev in snap_disks:
4916
      if dev:
4917
        if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4918
                                             instance, cluster_name):
4919
          self.LogWarning("Could not export block device %s from node %s to"
4920
                          " node %s", dev.logical_id[1], src_node,
4921
                          dst_node.name)
4922
        if not self.rpc.call_blockdev_remove(src_node, dev):
4923
          self.LogWarning("Could not remove snapshot block device %s from node"
4924
                          " %s", dev.logical_id[1], src_node)
4925

    
4926
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4927
      self.LogWarning("Could not finalize export for instance %s on node %s",
4928
                      instance.name, dst_node.name)
4929

    
4930
    nodelist = self.cfg.GetNodeList()
4931
    nodelist.remove(dst_node.name)
4932

    
4933
    # on one-node clusters nodelist will be empty after the removal
4934
    # if we proceed the backup would be removed because OpQueryExports
4935
    # substitutes an empty list with the full cluster node list.
4936
    if nodelist:
4937
      exportlist = self.rpc.call_export_list(nodelist)
4938
      for node in exportlist:
4939
        if instance.name in exportlist[node]:
4940
          if not self.rpc.call_export_remove(node, instance.name):
4941
            self.LogWarning("Could not remove older export for instance %s"
4942
                            " on node %s", instance.name, node)
4943

    
4944

    
4945
class LURemoveExport(NoHooksLU):
4946
  """Remove exports related to the named instance.
4947

4948
  """
4949
  _OP_REQP = ["instance_name"]
4950
  REQ_BGL = False
4951

    
4952
  def ExpandNames(self):
4953
    self.needed_locks = {}
4954
    # We need all nodes to be locked in order for RemoveExport to work, but we
4955
    # don't need to lock the instance itself, as nothing will happen to it (and
4956
    # we can remove exports also for a removed instance)
4957
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4958

    
4959
  def CheckPrereq(self):
4960
    """Check prerequisites.
4961
    """
4962
    pass
4963

    
4964
  def Exec(self, feedback_fn):
4965
    """Remove any export.
4966

4967
    """
4968
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4969
    # If the instance was not found we'll try with the name that was passed in.
4970
    # This will only work if it was an FQDN, though.
4971
    fqdn_warn = False
4972
    if not instance_name:
4973
      fqdn_warn = True
4974
      instance_name = self.op.instance_name
4975

    
4976
    exportlist = self.rpc.call_export_list(self.acquired_locks[
4977
      locking.LEVEL_NODE])
4978
    found = False
4979
    for node in exportlist:
4980
      if instance_name in exportlist[node]:
4981
        found = True
4982
        if not self.rpc.call_export_remove(node, instance_name):
4983
          logging.error("Could not remove export for instance %s"
4984
                        " on node %s", instance_name, node)
4985

    
4986
    if fqdn_warn and not found:
4987
      feedback_fn("Export not found. If trying to remove an export belonging"
4988
                  " to a deleted instance please use its Fully Qualified"
4989
                  " Domain Name.")
4990

    
4991

    
4992
class TagsLU(NoHooksLU):
4993
  """Generic tags LU.
4994

4995
  This is an abstract class which is the parent of all the other tags LUs.
4996

4997
  """
4998

    
4999
  def ExpandNames(self):
5000
    self.needed_locks = {}
5001
    if self.op.kind == constants.TAG_NODE:
5002
      name = self.cfg.ExpandNodeName(self.op.name)
5003
      if name is None:
5004
        raise errors.OpPrereqError("Invalid node name (%s)" %
5005
                                   (self.op.name,))
5006
      self.op.name = name
5007
      self.needed_locks[locking.LEVEL_NODE] = name
5008
    elif self.op.kind == constants.TAG_INSTANCE:
5009
      name = self.cfg.ExpandInstanceName(self.op.name)
5010
      if name is None:
5011
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5012
                                   (self.op.name,))
5013
      self.op.name = name
5014
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5015

    
5016
  def CheckPrereq(self):
5017
    """Check prerequisites.
5018

5019
    """
5020
    if self.op.kind == constants.TAG_CLUSTER:
5021
      self.target = self.cfg.GetClusterInfo()
5022
    elif self.op.kind == constants.TAG_NODE:
5023
      self.target = self.cfg.GetNodeInfo(self.op.name)
5024
    elif self.op.kind == constants.TAG_INSTANCE:
5025
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5026
    else:
5027
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5028
                                 str(self.op.kind))
5029

    
5030

    
5031
class LUGetTags(TagsLU):
5032
  """Returns the tags of a given object.
5033

5034
  """
5035
  _OP_REQP = ["kind", "name"]
5036
  REQ_BGL = False
5037

    
5038
  def Exec(self, feedback_fn):
5039
    """Returns the tag list.
5040

5041
    """
5042
    return list(self.target.GetTags())
5043

    
5044

    
5045
class LUSearchTags(NoHooksLU):
5046
  """Searches the tags for a given pattern.
5047

5048
  """
5049
  _OP_REQP = ["pattern"]
5050
  REQ_BGL = False
5051

    
5052
  def ExpandNames(self):
5053
    self.needed_locks = {}
5054

    
5055
  def CheckPrereq(self):
5056
    """Check prerequisites.
5057

5058
    This checks the pattern passed for validity by compiling it.
5059

5060
    """
5061
    try:
5062
      self.re = re.compile(self.op.pattern)
5063
    except re.error, err:
5064
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5065
                                 (self.op.pattern, err))
5066

    
5067
  def Exec(self, feedback_fn):
5068
    """Returns the tag list.
5069

5070
    """
5071
    cfg = self.cfg
5072
    tgts = [("/cluster", cfg.GetClusterInfo())]
5073
    ilist = cfg.GetAllInstancesInfo().values()
5074
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5075
    nlist = cfg.GetAllNodesInfo().values()
5076
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5077
    results = []
5078
    for path, target in tgts:
5079
      for tag in target.GetTags():
5080
        if self.re.search(tag):
5081
          results.append((path, tag))
5082
    return results
5083

    
5084

    
5085
class LUAddTags(TagsLU):
5086
  """Sets a tag on a given object.
5087

5088
  """
5089
  _OP_REQP = ["kind", "name", "tags"]
5090
  REQ_BGL = False
5091

    
5092
  def CheckPrereq(self):
5093
    """Check prerequisites.
5094

5095
    This checks the type and length of the tag name and value.
5096

5097
    """
5098
    TagsLU.CheckPrereq(self)
5099
    for tag in self.op.tags:
5100
      objects.TaggableObject.ValidateTag(tag)
5101

    
5102
  def Exec(self, feedback_fn):
5103
    """Sets the tag.
5104

5105
    """
5106
    try:
5107
      for tag in self.op.tags:
5108
        self.target.AddTag(tag)
5109
    except errors.TagError, err:
5110
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5111
    try:
5112
      self.cfg.Update(self.target)
5113
    except errors.ConfigurationError:
5114
      raise errors.OpRetryError("There has been a modification to the"
5115
                                " config file and the operation has been"
5116
                                " aborted. Please retry.")
5117

    
5118

    
5119
class LUDelTags(TagsLU):
5120
  """Delete a list of tags from a given object.
5121

5122
  """
5123
  _OP_REQP = ["kind", "name", "tags"]
5124
  REQ_BGL = False
5125

    
5126
  def CheckPrereq(self):
5127
    """Check prerequisites.
5128

5129
    This checks that we have the given tag.
5130

5131
    """
5132
    TagsLU.CheckPrereq(self)
5133
    for tag in self.op.tags:
5134
      objects.TaggableObject.ValidateTag(tag)
5135
    del_tags = frozenset(self.op.tags)
5136
    cur_tags = self.target.GetTags()
5137
    if not del_tags <= cur_tags:
5138
      diff_tags = del_tags - cur_tags
5139
      diff_names = ["'%s'" % tag for tag in diff_tags]
5140
      diff_names.sort()
5141
      raise errors.OpPrereqError("Tag(s) %s not found" %
5142
                                 (",".join(diff_names)))
5143

    
5144
  def Exec(self, feedback_fn):
5145
    """Remove the tag from the object.
5146

5147
    """
5148
    for tag in self.op.tags:
5149
      self.target.RemoveTag(tag)
5150
    try:
5151
      self.cfg.Update(self.target)
5152
    except errors.ConfigurationError:
5153
      raise errors.OpRetryError("There has been a modification to the"
5154
                                " config file and the operation has been"
5155
                                " aborted. Please retry.")
5156

    
5157

    
5158
class LUTestDelay(NoHooksLU):
5159
  """Sleep for a specified amount of time.
5160

5161
  This LU sleeps on the master and/or nodes for a specified amount of
5162
  time.
5163

5164
  """
5165
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5166
  REQ_BGL = False
5167

    
5168
  def ExpandNames(self):
5169
    """Expand names and set required locks.
5170

5171
    This expands the node list, if any.
5172

5173
    """
5174
    self.needed_locks = {}
5175
    if self.op.on_nodes:
5176
      # _GetWantedNodes can be used here, but is not always appropriate to use
5177
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5178
      # more information.
5179
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5180
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5181

    
5182
  def CheckPrereq(self):
5183
    """Check prerequisites.
5184

5185
    """
5186

    
5187
  def Exec(self, feedback_fn):
5188
    """Do the actual sleep.
5189

5190
    """
5191
    if self.op.on_master:
5192
      if not utils.TestDelay(self.op.duration):
5193
        raise errors.OpExecError("Error during master delay test")
5194
    if self.op.on_nodes:
5195
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5196
      if not result:
5197
        raise errors.OpExecError("Complete failure from rpc call")
5198
      for node, node_result in result.items():
5199
        if not node_result:
5200
          raise errors.OpExecError("Failure during rpc call to node %s,"
5201
                                   " result: %s" % (node, node_result))
5202

    
5203

    
5204
class IAllocator(object):
5205
  """IAllocator framework.
5206

5207
  An IAllocator instance has three sets of attributes:
5208
    - cfg that is needed to query the cluster
5209
    - input data (all members of the _KEYS class attribute are required)
5210
    - four buffer attributes (in|out_data|text), that represent the
5211
      input (to the external script) in text and data structure format,
5212
      and the output from it, again in two formats
5213
    - the result variables from the script (success, info, nodes) for
5214
      easy usage
5215

5216
  """
5217
  _ALLO_KEYS = [
5218
    "mem_size", "disks", "disk_template",
5219
    "os", "tags", "nics", "vcpus",
5220
    ]
5221
  _RELO_KEYS = [
5222
    "relocate_from",
5223
    ]
5224

    
5225
  def __init__(self, lu, mode, name, **kwargs):
5226
    self.lu = lu
5227
    # init buffer variables
5228
    self.in_text = self.out_text = self.in_data = self.out_data = None
5229
    # init all input fields so that pylint is happy
5230
    self.mode = mode
5231
    self.name = name
5232
    self.mem_size = self.disks = self.disk_template = None
5233
    self.os = self.tags = self.nics = self.vcpus = None
5234
    self.relocate_from = None
5235
    # computed fields
5236
    self.required_nodes = None
5237
    # init result fields
5238
    self.success = self.info = self.nodes = None
5239
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5240
      keyset = self._ALLO_KEYS
5241
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5242
      keyset = self._RELO_KEYS
5243
    else:
5244
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5245
                                   " IAllocator" % self.mode)
5246
    for key in kwargs:
5247
      if key not in keyset:
5248
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5249
                                     " IAllocator" % key)
5250
      setattr(self, key, kwargs[key])
5251
    for key in keyset:
5252
      if key not in kwargs:
5253
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5254
                                     " IAllocator" % key)
5255
    self._BuildInputData()
5256

    
5257
  def _ComputeClusterData(self):
5258
    """Compute the generic allocator input data.
5259

5260
    This is the data that is independent of the actual operation.
5261

5262
    """
5263
    cfg = self.lu.cfg
5264
    cluster_info = cfg.GetClusterInfo()
5265
    # cluster data
5266
    data = {
5267
      "version": 1,
5268
      "cluster_name": cfg.GetClusterName(),
5269
      "cluster_tags": list(cluster_info.GetTags()),
5270
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5271
      # we don't have job IDs
5272
      }
5273

    
5274
    i_list = []
5275
    cluster = self.cfg.GetClusterInfo()
5276
    for iname in cfg.GetInstanceList():
5277
      i_obj = cfg.GetInstanceInfo(iname)
5278
      i_list.append((i_obj, cluster.FillBE(i_obj)))
5279

    
5280
    # node data
5281
    node_results = {}
5282
    node_list = cfg.GetNodeList()
5283
    # FIXME: here we have only one hypervisor information, but
5284
    # instance can belong to different hypervisors
5285
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5286
                                           cfg.GetHypervisorType())
5287
    for nname in node_list:
5288
      ninfo = cfg.GetNodeInfo(nname)
5289
      if nname not in node_data or not isinstance(node_data[nname], dict):
5290
        raise errors.OpExecError("Can't get data for node %s" % nname)
5291
      remote_info = node_data[nname]
5292
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5293
                   'vg_size', 'vg_free', 'cpu_total']:
5294
        if attr not in remote_info:
5295
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5296
                                   (nname, attr))
5297
        try:
5298
          remote_info[attr] = int(remote_info[attr])
5299
        except ValueError, err:
5300
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5301
                                   " %s" % (nname, attr, str(err)))
5302
      # compute memory used by primary instances
5303
      i_p_mem = i_p_up_mem = 0
5304
      for iinfo, beinfo in i_list:
5305
        if iinfo.primary_node == nname:
5306
          i_p_mem += beinfo[constants.BE_MEMORY]
5307
          if iinfo.status == "up":
5308
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5309

    
5310
      # compute memory used by instances
5311
      pnr = {
5312
        "tags": list(ninfo.GetTags()),
5313
        "total_memory": remote_info['memory_total'],
5314
        "reserved_memory": remote_info['memory_dom0'],
5315
        "free_memory": remote_info['memory_free'],
5316
        "i_pri_memory": i_p_mem,
5317
        "i_pri_up_memory": i_p_up_mem,
5318
        "total_disk": remote_info['vg_size'],
5319
        "free_disk": remote_info['vg_free'],
5320
        "primary_ip": ninfo.primary_ip,
5321
        "secondary_ip": ninfo.secondary_ip,
5322
        "total_cpus": remote_info['cpu_total'],
5323
        }
5324
      node_results[nname] = pnr
5325
    data["nodes"] = node_results
5326

    
5327
    # instance data
5328
    instance_data = {}
5329
    for iinfo, beinfo in i_list:
5330
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5331
                  for n in iinfo.nics]
5332
      pir = {
5333
        "tags": list(iinfo.GetTags()),
5334
        "should_run": iinfo.status == "up",
5335
        "vcpus": beinfo[constants.BE_VCPUS],
5336
        "memory": beinfo[constants.BE_MEMORY],
5337
        "os": iinfo.os,
5338
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5339
        "nics": nic_data,
5340
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5341
        "disk_template": iinfo.disk_template,
5342
        "hypervisor": iinfo.hypervisor,
5343
        }
5344
      instance_data[iinfo.name] = pir
5345

    
5346
    data["instances"] = instance_data
5347

    
5348
    self.in_data = data
5349

    
5350
  def _AddNewInstance(self):
5351
    """Add new instance data to allocator structure.
5352

5353
    This in combination with _AllocatorGetClusterData will create the
5354
    correct structure needed as input for the allocator.
5355

5356
    The checks for the completeness of the opcode must have already been
5357
    done.
5358

5359
    """
5360
    data = self.in_data
5361
    if len(self.disks) != 2:
5362
      raise errors.OpExecError("Only two-disk configurations supported")
5363

    
5364
    disk_space = _ComputeDiskSize(self.disk_template,
5365
                                  self.disks[0]["size"], self.disks[1]["size"])
5366

    
5367
    if self.disk_template in constants.DTS_NET_MIRROR:
5368
      self.required_nodes = 2
5369
    else:
5370
      self.required_nodes = 1
5371
    request = {
5372
      "type": "allocate",
5373
      "name": self.name,
5374
      "disk_template": self.disk_template,
5375
      "tags": self.tags,
5376
      "os": self.os,
5377
      "vcpus": self.vcpus,
5378
      "memory": self.mem_size,
5379
      "disks": self.disks,
5380
      "disk_space_total": disk_space,
5381
      "nics": self.nics,
5382
      "required_nodes": self.required_nodes,
5383
      }
5384
    data["request"] = request
5385

    
5386
  def _AddRelocateInstance(self):
5387
    """Add relocate instance data to allocator structure.
5388

5389
    This in combination with _IAllocatorGetClusterData will create the
5390
    correct structure needed as input for the allocator.
5391

5392
    The checks for the completeness of the opcode must have already been
5393
    done.
5394

5395
    """
5396
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5397
    if instance is None:
5398
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5399
                                   " IAllocator" % self.name)
5400

    
5401
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5402
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5403

    
5404
    if len(instance.secondary_nodes) != 1:
5405
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5406

    
5407
    self.required_nodes = 1
5408

    
5409
    disk_space = _ComputeDiskSize(instance.disk_template,
5410
                                  instance.disks[0].size,
5411
                                  instance.disks[1].size)
5412

    
5413
    request = {
5414
      "type": "relocate",
5415
      "name": self.name,
5416
      "disk_space_total": disk_space,
5417
      "required_nodes": self.required_nodes,
5418
      "relocate_from": self.relocate_from,
5419
      }
5420
    self.in_data["request"] = request
5421

    
5422
  def _BuildInputData(self):
5423
    """Build input data structures.
5424

5425
    """
5426
    self._ComputeClusterData()
5427

    
5428
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5429
      self._AddNewInstance()
5430
    else:
5431
      self._AddRelocateInstance()
5432

    
5433
    self.in_text = serializer.Dump(self.in_data)
5434

    
5435
  def Run(self, name, validate=True, call_fn=None):
5436
    """Run an instance allocator and return the results.
5437

5438
    """
5439
    if call_fn is None:
5440
      call_fn = self.lu.rpc.call_iallocator_runner
5441
    data = self.in_text
5442

    
5443
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5444

    
5445
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5446
      raise errors.OpExecError("Invalid result from master iallocator runner")
5447

    
5448
    rcode, stdout, stderr, fail = result
5449

    
5450
    if rcode == constants.IARUN_NOTFOUND:
5451
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5452
    elif rcode == constants.IARUN_FAILURE:
5453
      raise errors.OpExecError("Instance allocator call failed: %s,"
5454
                               " output: %s" % (fail, stdout+stderr))
5455
    self.out_text = stdout
5456
    if validate:
5457
      self._ValidateResult()
5458

    
5459
  def _ValidateResult(self):
5460
    """Process the allocator results.
5461

5462
    This will process and if successful save the result in
5463
    self.out_data and the other parameters.
5464

5465
    """
5466
    try:
5467
      rdict = serializer.Load(self.out_text)
5468
    except Exception, err:
5469
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5470

    
5471
    if not isinstance(rdict, dict):
5472
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5473

    
5474
    for key in "success", "info", "nodes":
5475
      if key not in rdict:
5476
        raise errors.OpExecError("Can't parse iallocator results:"
5477
                                 " missing key '%s'" % key)
5478
      setattr(self, key, rdict[key])
5479

    
5480
    if not isinstance(rdict["nodes"], list):
5481
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5482
                               " is not a list")
5483
    self.out_data = rdict
5484

    
5485

    
5486
class LUTestAllocator(NoHooksLU):
5487
  """Run allocator tests.
5488

5489
  This LU runs the allocator tests
5490

5491
  """
5492
  _OP_REQP = ["direction", "mode", "name"]
5493

    
5494
  def CheckPrereq(self):
5495
    """Check prerequisites.
5496

5497
    This checks the opcode parameters depending on the director and mode test.
5498

5499
    """
5500
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5501
      for attr in ["name", "mem_size", "disks", "disk_template",
5502
                   "os", "tags", "nics", "vcpus"]:
5503
        if not hasattr(self.op, attr):
5504
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5505
                                     attr)
5506
      iname = self.cfg.ExpandInstanceName(self.op.name)
5507
      if iname is not None:
5508
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5509
                                   iname)
5510
      if not isinstance(self.op.nics, list):
5511
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5512
      for row in self.op.nics:
5513
        if (not isinstance(row, dict) or
5514
            "mac" not in row or
5515
            "ip" not in row or
5516
            "bridge" not in row):
5517
          raise errors.OpPrereqError("Invalid contents of the"
5518
                                     " 'nics' parameter")
5519
      if not isinstance(self.op.disks, list):
5520
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5521
      if len(self.op.disks) != 2:
5522
        raise errors.OpPrereqError("Only two-disk configurations supported")
5523
      for row in self.op.disks:
5524
        if (not isinstance(row, dict) or
5525
            "size" not in row or
5526
            not isinstance(row["size"], int) or
5527
            "mode" not in row or
5528
            row["mode"] not in ['r', 'w']):
5529
          raise errors.OpPrereqError("Invalid contents of the"
5530
                                     " 'disks' parameter")
5531
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5532
      if not hasattr(self.op, "name"):
5533
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5534
      fname = self.cfg.ExpandInstanceName(self.op.name)
5535
      if fname is None:
5536
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5537
                                   self.op.name)
5538
      self.op.name = fname
5539
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5540
    else:
5541
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5542
                                 self.op.mode)
5543

    
5544
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5545
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5546
        raise errors.OpPrereqError("Missing allocator name")
5547
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5548
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5549
                                 self.op.direction)
5550

    
5551
  def Exec(self, feedback_fn):
5552
    """Run the allocator test.
5553

5554
    """
5555
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5556
      ial = IAllocator(self,
5557
                       mode=self.op.mode,
5558
                       name=self.op.name,
5559
                       mem_size=self.op.mem_size,
5560
                       disks=self.op.disks,
5561
                       disk_template=self.op.disk_template,
5562
                       os=self.op.os,
5563
                       tags=self.op.tags,
5564
                       nics=self.op.nics,
5565
                       vcpus=self.op.vcpus,
5566
                       )
5567
    else:
5568
      ial = IAllocator(self,
5569
                       mode=self.op.mode,
5570
                       name=self.op.name,
5571
                       relocate_from=list(self.relocate_from),
5572
                       )
5573

    
5574
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5575
      result = ial.in_text
5576
    else:
5577
      ial.Run(self.op.allocator, validate=False)
5578
      result = ial.out_text
5579
    return result